src/riemann.proto: Import the protocol buffer for Riemann 0.2.0.
[collectd.git] / src / write_riemann.c
1 /**
2  * collectd - src/write_riemann.c
3  *
4  * Copyright (C) 2012,2013  Pierre-Yves Ritschard
5  * Copyright (C) 2013       Florian octo Forster
6  *
7  * Permission to use, copy, modify, and distribute this software for any
8  * purpose with or without fee is hereby granted, provided that the above
9  * copyright notice and this permission notice appear in all copies.
10  *
11  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
12  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
13  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
14  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
15  * WHATSOEVER RESULTING FROM LOSS OF MIND, USE, DATA OR PROFITS, WHETHER
16  * IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING
17  * OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
18  *
19  * Authors:
20  *   Pierre-Yves Ritschard <pyr at spootnik.org>
21  *   Florian octo Forster <octo at collectd.org>
22  */
23
24 #include "collectd.h"
25 #include "plugin.h"
26 #include "common.h"
27 #include "configfile.h"
28 #include "utils_cache.h"
29 #include "riemann.pb-c.h"
30
31 #include <sys/socket.h>
32 #include <arpa/inet.h>
33 #include <errno.h>
34 #include <netdb.h>
35 #include <inttypes.h>
36 #include <pthread.h>
37
38 #define RIEMANN_HOST            "localhost"
39 #define RIEMANN_PORT            "5555"
40
41 struct riemann_host {
42         char                    *name;
43 #define F_CONNECT                0x01
44         uint8_t                  flags;
45         pthread_mutex_t          lock;
46         _Bool                    store_rates;
47         _Bool                    always_append_ds;
48         char                    *node;
49         char                    *service;
50         _Bool                    use_tcp;
51         int                      s;
52
53         int                      reference_count;
54 };
55
56 static char     **riemann_tags;
57 static size_t     riemann_tags_num;
58
59 static int      riemann_send(struct riemann_host *, Msg const *);
60 static int      riemann_notification(const notification_t *, user_data_t *);
61 static int      riemann_write(const data_set_t *, const value_list_t *, user_data_t *);
62 static int      riemann_connect(struct riemann_host *);
63 static int      riemann_disconnect (struct riemann_host *host);
64 static void     riemann_free(void *);
65 static int      riemann_config_node(oconfig_item_t *);
66 static int      riemann_config(oconfig_item_t *);
67 void    module_register(void);
68
69 static void riemann_event_protobuf_free (Event *event) /* {{{ */
70 {
71         if (event == NULL)
72                 return;
73
74         sfree (event->state);
75         sfree (event->service);
76         sfree (event->host);
77         sfree (event->description);
78
79         strarray_free (event->tags, event->n_tags);
80         event->tags = NULL;
81         event->n_tags = 0;
82
83         sfree (event);
84 } /* }}} void riemann_event_protobuf_free */
85
86 static void riemann_msg_protobuf_free (Msg *msg) /* {{{ */
87 {
88         size_t i;
89
90         if (msg == NULL)
91                 return;
92
93         for (i = 0; i < msg->n_events; i++)
94         {
95                 riemann_event_protobuf_free (msg->events[i]);
96                 msg->events[i] = NULL;
97         }
98
99         sfree (msg->events);
100         msg->n_events = 0;
101
102         sfree (msg);
103 } /* }}} void riemann_msg_protobuf_free */
104
105 static int
106 riemann_send(struct riemann_host *host, Msg const *msg)
107 {
108         u_char *buffer;
109         size_t  buffer_len;
110         int status;
111
112         pthread_mutex_lock (&host->lock);
113
114         status = riemann_connect (host);
115         if (status != 0)
116         {
117                 pthread_mutex_unlock (&host->lock);
118                 return status;
119         }
120
121         buffer_len = msg__get_packed_size(msg);
122         if (host->use_tcp)
123                 buffer_len += 4;
124
125         buffer = malloc (buffer_len);
126         if (buffer == NULL) {
127                 pthread_mutex_unlock (&host->lock);
128                 ERROR ("write_riemann plugin: malloc failed.");
129                 return ENOMEM;
130         }
131         memset (buffer, 0, buffer_len);
132
133         if (host->use_tcp)
134         {
135                 uint32_t length = htonl ((uint32_t) (buffer_len - 4));
136                 memcpy (buffer, &length, 4);
137                 msg__pack(msg, buffer + 4);
138         }
139         else
140         {
141                 msg__pack(msg, buffer);
142         }
143
144         status = (int) swrite (host->s, buffer, buffer_len);
145         if (status != 0)
146         {
147                 char errbuf[1024];
148
149                 riemann_disconnect (host);
150                 pthread_mutex_unlock (&host->lock);
151
152                 ERROR ("write_riemann plugin: Sending to Riemann at %s:%s failed: %s",
153                                 (host->node != NULL) ? host->node : RIEMANN_HOST,
154                                 (host->service != NULL) ? host->service : RIEMANN_PORT,
155                                 sstrerror (errno, errbuf, sizeof (errbuf)));
156                 sfree (buffer);
157                 return -1;
158         }
159
160         pthread_mutex_unlock (&host->lock);
161         sfree (buffer);
162         return 0;
163 }
164
165 static int riemann_event_add_tag (Event *event, /* {{{ */
166                 char const *format, ...)
167 {
168         va_list ap;
169         char buffer[1024];
170         size_t ret;
171
172         va_start (ap, format);
173         ret = vsnprintf (buffer, sizeof (buffer), format, ap);
174         if (ret >= sizeof (buffer))
175                 ret = sizeof (buffer) - 1;
176         buffer[ret] = 0;
177         va_end (ap);
178
179         return (strarray_add (&event->tags, &event->n_tags, buffer));
180 } /* }}} int riemann_event_add_tag */
181
182 static Msg *riemann_notification_to_protobuf (struct riemann_host *host, /* {{{ */
183                 notification_t const *n)
184 {
185         Msg *msg;
186         Event *event;
187         char service_buffer[6 * DATA_MAX_NAME_LEN];
188         char const *severity;
189         notification_meta_t *meta;
190         int i;
191
192         msg = malloc (sizeof (*msg));
193         if (msg == NULL)
194         {
195                 ERROR ("write_riemann plugin: malloc failed.");
196                 return (NULL);
197         }
198         memset (msg, 0, sizeof (*msg));
199         msg__init (msg);
200
201         msg->events = malloc (sizeof (*msg->events));
202         if (msg->events == NULL)
203         {
204                 ERROR ("write_riemann plugin: malloc failed.");
205                 sfree (msg);
206                 return (NULL);
207         }
208
209         event = malloc (sizeof (*event));
210         if (event == NULL)
211         {
212                 ERROR ("write_riemann plugin: malloc failed.");
213                 sfree (msg->events);
214                 sfree (msg);
215                 return (NULL);
216         }
217         memset (event, 0, sizeof (*event));
218         event__init (event);
219
220         msg->events[0] = event;
221         msg->n_events = 1;
222
223         event->host = strdup (n->host);
224         event->time = CDTIME_T_TO_TIME_T (n->time);
225         event->has_time = 1;
226
227         switch (n->severity)
228         {
229                 case NOTIF_OKAY:        severity = "ok"; break;
230                 case NOTIF_WARNING:     severity = "warning"; break;
231                 case NOTIF_FAILURE:     severity = "critical"; break;
232                 default:                severity = "unknown";
233         }
234         event->state = strdup (severity);
235
236         riemann_event_add_tag (event, "notification");
237         if (n->plugin[0] != 0)
238                 riemann_event_add_tag (event, "plugin:%s", n->plugin);
239         if (n->plugin_instance[0] != 0)
240                 riemann_event_add_tag (event, "plugin_instance:%s",
241                                 n->plugin_instance);
242
243         if (n->type[0] != 0)
244                 riemann_event_add_tag (event, "type:%s", n->type);
245         if (n->type_instance[0] != 0)
246                 riemann_event_add_tag (event, "type_instance:%s",
247                                 n->type_instance);
248
249         for (i = 0; i < riemann_tags_num; i++)
250                 riemann_event_add_tag (event, "%s", riemann_tags[i]);
251
252         format_name (service_buffer, sizeof (service_buffer),
253                         /* host = */ "", n->plugin, n->plugin_instance,
254                         n->type, n->type_instance);
255         event->service = strdup (&service_buffer[1]);
256
257         /* Pull in values from threshold */
258         for (meta = n->meta; meta != NULL; meta = meta->next)
259         {
260                 if (strcasecmp ("CurrentValue", meta->name) != 0)
261                         continue;
262
263                 event->metric_d = meta->nm_value.nm_double;
264                 event->has_metric_d = 1;
265                 break;
266         }
267
268         DEBUG ("write_riemann plugin: Successfully created protobuf for notification: "
269                         "host = \"%s\", service = \"%s\", state = \"%s\"",
270                         event->host, event->service, event->state);
271         return (msg);
272 } /* }}} Msg *riemann_notification_to_protobuf */
273
274 static Event *riemann_value_to_protobuf (struct riemann_host const *host, /* {{{ */
275                 data_set_t const *ds,
276                 value_list_t const *vl, size_t index,
277                 gauge_t const *rates)
278 {
279         Event *event;
280         char name_buffer[5 * DATA_MAX_NAME_LEN];
281         char service_buffer[6 * DATA_MAX_NAME_LEN];
282         int i;
283
284         event = malloc (sizeof (*event));
285         if (event == NULL)
286         {
287                 ERROR ("write_riemann plugin: malloc failed.");
288                 return (NULL);
289         }
290         memset (event, 0, sizeof (*event));
291         event__init (event);
292
293         event->host = strdup (vl->host);
294         event->time = CDTIME_T_TO_TIME_T (vl->time);
295         event->has_time = 1;
296         event->ttl = CDTIME_T_TO_TIME_T (2 * vl->interval);
297         event->has_ttl = 1;
298
299         riemann_event_add_tag (event, "plugin:%s", vl->plugin);
300         if (vl->plugin_instance[0] != 0)
301                 riemann_event_add_tag (event, "plugin_instance:%s",
302                                 vl->plugin_instance);
303
304         riemann_event_add_tag (event, "type:%s", vl->type);
305         if (vl->type_instance[0] != 0)
306                 riemann_event_add_tag (event, "type_instance:%s",
307                                 vl->type_instance);
308
309         if ((ds->ds[index].type != DS_TYPE_GAUGE) && (rates != NULL))
310         {
311                 riemann_event_add_tag (event, "ds_type:%s:rate",
312                                 DS_TYPE_TO_STRING(ds->ds[index].type));
313         }
314         else
315         {
316                 riemann_event_add_tag (event, "ds_type:%s",
317                                 DS_TYPE_TO_STRING(ds->ds[index].type));
318         }
319         riemann_event_add_tag (event, "ds_name:%s", ds->ds[index].name);
320         riemann_event_add_tag (event, "ds_index:%zu", index);
321
322         for (i = 0; i < riemann_tags_num; i++)
323                 riemann_event_add_tag (event, "%s", riemann_tags[i]);
324
325         if (ds->ds[index].type == DS_TYPE_GAUGE)
326         {
327                 event->has_metric_d = 1;
328                 event->metric_d = (double) vl->values[index].gauge;
329         }
330         else if (rates != NULL)
331         {
332                 event->has_metric_d = 1;
333                 event->metric_d = (double) rates[index];
334         }
335         else
336         {
337                 event->has_metric_sint64 = 1;
338                 if (ds->ds[index].type == DS_TYPE_DERIVE)
339                         event->metric_sint64 = (int64_t) vl->values[index].derive;
340                 else if (ds->ds[index].type == DS_TYPE_ABSOLUTE)
341                         event->metric_sint64 = (int64_t) vl->values[index].absolute;
342                 else
343                         event->metric_sint64 = (int64_t) vl->values[index].counter;
344         }
345
346         format_name (name_buffer, sizeof (name_buffer),
347                         /* host = */ "", vl->plugin, vl->plugin_instance,
348                         vl->type, vl->type_instance);
349         if (host->always_append_ds || (ds->ds_num > 1))
350                 ssnprintf (service_buffer, sizeof (service_buffer),
351                                 "%s/%s", &name_buffer[1], ds->ds[index].name);
352         else
353                 sstrncpy (service_buffer, &name_buffer[1],
354                                 sizeof (service_buffer));
355
356         event->service = strdup (service_buffer);
357
358         DEBUG ("write_riemann plugin: Successfully created protobuf for metric: "
359                         "host = \"%s\", service = \"%s\"",
360                         event->host, event->service);
361         return (event);
362 } /* }}} Event *riemann_value_to_protobuf */
363
364 static Msg *riemann_value_list_to_protobuf (struct riemann_host const *host, /* {{{ */
365                 data_set_t const *ds,
366                 value_list_t const *vl)
367 {
368         Msg *msg;
369         size_t i;
370         gauge_t *rates = NULL;
371
372         /* Initialize the Msg structure. */
373         msg = malloc (sizeof (*msg));
374         if (msg == NULL)
375         {
376                 ERROR ("write_riemann plugin: malloc failed.");
377                 return (NULL);
378         }
379         memset (msg, 0, sizeof (*msg));
380         msg__init (msg);
381
382         /* Set up events. First, the list of pointers. */
383         msg->n_events = (size_t) vl->values_len;
384         msg->events = calloc (msg->n_events, sizeof (*msg->events));
385         if (msg->events == NULL)
386         {
387                 ERROR ("write_riemann plugin: calloc failed.");
388                 riemann_msg_protobuf_free (msg);
389                 return (NULL);
390         }
391
392         if (host->store_rates)
393         {
394                 rates = uc_get_rate (ds, vl);
395                 if (rates == NULL)
396                 {
397                         ERROR ("write_riemann plugin: uc_get_rate failed.");
398                         riemann_msg_protobuf_free (msg);
399                         return (NULL);
400                 }
401         }
402
403         for (i = 0; i < msg->n_events; i++)
404         {
405                 msg->events[i] = riemann_value_to_protobuf (host, ds, vl,
406                                 (int) i, rates);
407                 if (msg->events[i] == NULL)
408                 {
409                         riemann_msg_protobuf_free (msg);
410                         sfree (rates);
411                         return (NULL);
412                 }
413         }
414
415         sfree (rates);
416         return (msg);
417 } /* }}} Msg *riemann_value_list_to_protobuf */
418
419 static int
420 riemann_notification(const notification_t *n, user_data_t *ud)
421 {
422         int                      status;
423         struct riemann_host     *host = ud->data;
424         Msg                     *msg;
425
426         msg = riemann_notification_to_protobuf (host, n);
427         if (msg == NULL)
428                 return (-1);
429
430         status = riemann_send (host, msg);
431         if (status != 0)
432                 ERROR ("write_riemann plugin: riemann_send failed with status %i",
433                                 status);
434
435         riemann_msg_protobuf_free (msg);
436         return (status);
437 } /* }}} int riemann_notification */
438
439 static int
440 riemann_write(const data_set_t *ds,
441               const value_list_t *vl,
442               user_data_t *ud)
443 {
444         int                      status;
445         struct riemann_host     *host = ud->data;
446         Msg                     *msg;
447
448         msg = riemann_value_list_to_protobuf (host, ds, vl);
449         if (msg == NULL)
450                 return (-1);
451
452         status = riemann_send (host, msg);
453         if (status != 0)
454                 ERROR ("write_riemann plugin: riemann_send failed with status %i",
455                                 status);
456
457         riemann_msg_protobuf_free (msg);
458         return status;
459 }
460
461 /* host->lock must be held when calling this function. */
462 static int
463 riemann_connect(struct riemann_host *host)
464 {
465         int                      e;
466         struct addrinfo         *ai, *res, hints;
467         char const              *node;
468         char const              *service;
469
470         if (host->flags & F_CONNECT)
471                 return 0;
472
473         memset(&hints, 0, sizeof(hints));
474         memset(&service, 0, sizeof(service));
475         hints.ai_family = AF_UNSPEC;
476         hints.ai_socktype = host->use_tcp ? SOCK_STREAM : SOCK_DGRAM;
477 #ifdef AI_ADDRCONFIG
478         hints.ai_flags |= AI_ADDRCONFIG;
479 #endif
480
481         node = (host->node != NULL) ? host->node : RIEMANN_HOST;
482         service = (host->service != NULL) ? host->service : RIEMANN_PORT;
483
484         if ((e = getaddrinfo(node, service, &hints, &res)) != 0) {
485                 ERROR ("write_riemann plugin: Unable to resolve host \"%s\": %s",
486                         node, gai_strerror(e));
487                 return -1;
488         }
489
490         host->s = -1;
491         for (ai = res; ai != NULL; ai = ai->ai_next) {
492                 if ((host->s = socket(ai->ai_family,
493                                       ai->ai_socktype,
494                                       ai->ai_protocol)) == -1) {
495                         continue;
496                 }
497
498                 if (connect(host->s, ai->ai_addr, ai->ai_addrlen) != 0) {
499                         close(host->s);
500                         host->s = -1;
501                         continue;
502                 }
503
504                 host->flags |= F_CONNECT;
505                 DEBUG("write_riemann plugin: got a succesful connection for: %s:%s",
506                                 node, service);
507                 break;
508         }
509
510         freeaddrinfo(res);
511
512         if (host->s < 0) {
513                 WARNING("write_riemann plugin: Unable to connect to Riemann at %s:%s",
514                                 node, service);
515                 return -1;
516         }
517         return 0;
518 }
519
520 /* host->lock must be held when calling this function. */
521 static int
522 riemann_disconnect (struct riemann_host *host)
523 {
524         if ((host->flags & F_CONNECT) == 0)
525                 return (0);
526
527         close (host->s);
528         host->s = -1;
529         host->flags &= ~F_CONNECT;
530
531         return (0);
532 }
533
534 static void
535 riemann_free(void *p)
536 {
537         struct riemann_host     *host = p;
538
539         if (host == NULL)
540                 return;
541
542         pthread_mutex_lock (&host->lock);
543
544         host->reference_count--;
545         if (host->reference_count > 0)
546         {
547                 pthread_mutex_unlock (&host->lock);
548                 return;
549         }
550
551         riemann_disconnect (host);
552
553         sfree(host->service);
554         pthread_mutex_destroy (&host->lock);
555         sfree(host);
556 }
557
558 static int
559 riemann_config_node(oconfig_item_t *ci)
560 {
561         struct riemann_host     *host = NULL;
562         int                      status = 0;
563         int                      i;
564         oconfig_item_t          *child;
565         char                     callback_name[DATA_MAX_NAME_LEN];
566         user_data_t              ud;
567
568         if ((host = calloc(1, sizeof (*host))) == NULL) {
569                 ERROR ("write_riemann plugin: calloc failed.");
570                 return ENOMEM;
571         }
572         pthread_mutex_init (&host->lock, NULL);
573         host->reference_count = 1;
574         host->node = NULL;
575         host->service = NULL;
576         host->store_rates = 1;
577         host->always_append_ds = 0;
578         host->use_tcp = 0;
579
580         status = cf_util_get_string (ci, &host->name);
581         if (status != 0) {
582                 WARNING("write_riemann plugin: Required host name is missing.");
583                 riemann_free (host);
584                 return -1;
585         }
586
587         for (i = 0; i < ci->children_num; i++) {
588                 /*
589                  * The code here could be simplified but makes room
590                  * for easy adding of new options later on.
591                  */
592                 child = &ci->children[i];
593                 status = 0;
594
595                 if (strcasecmp ("Host", child->key) == 0) {
596                         status = cf_util_get_string (child, &host->node);
597                         if (status != 0)
598                                 break;
599                 } else if (strcasecmp ("Port", child->key) == 0) {
600                         status = cf_util_get_service (child, &host->service);
601                         if (status != 0) {
602                                 ERROR ("write_riemann plugin: Invalid argument "
603                                                 "configured for the \"Port\" "
604                                                 "option.");
605                                 break;
606                         }
607                 } else if (strcasecmp ("Protocol", child->key) == 0) {
608                         char tmp[16];
609                         status = cf_util_get_string_buffer (child,
610                                         tmp, sizeof (tmp));
611                         if (status != 0)
612                         {
613                                 ERROR ("write_riemann plugin: cf_util_get_"
614                                                 "string_buffer failed with "
615                                                 "status %i.", status);
616                                 break;
617                         }
618
619                         if (strcasecmp ("UDP", tmp) == 0)
620                                 host->use_tcp = 0;
621                         else if (strcasecmp ("TCP", tmp) == 0)
622                                 host->use_tcp = 1;
623                         else
624                                 WARNING ("write_riemann plugin: The value "
625                                                 "\"%s\" is not valid for the "
626                                                 "\"Protocol\" option. Use "
627                                                 "either \"UDP\" or \"TCP\".",
628                                                 tmp);
629                 } else if (strcasecmp ("StoreRates", child->key) == 0) {
630                         status = cf_util_get_boolean (child, &host->store_rates);
631                         if (status != 0)
632                                 break;
633                 } else if (strcasecmp ("AlwaysAppendDS", child->key) == 0) {
634                         status = cf_util_get_boolean (child,
635                                         &host->always_append_ds);
636                         if (status != 0)
637                                 break;
638                 } else {
639                         WARNING("write_riemann plugin: ignoring unknown config "
640                                 "option: \"%s\"", child->key);
641                 }
642         }
643         if (status != 0) {
644                 riemann_free (host);
645                 return status;
646         }
647
648         ssnprintf (callback_name, sizeof (callback_name), "write_riemann/%s",
649                         host->name);
650         ud.data = host;
651         ud.free_func = riemann_free;
652
653         pthread_mutex_lock (&host->lock);
654
655         status = plugin_register_write (callback_name, riemann_write, &ud);
656         if (status != 0)
657                 WARNING ("write_riemann plugin: plugin_register_write (\"%s\") "
658                                 "failed with status %i.",
659                                 callback_name, status);
660         else /* success */
661                 host->reference_count++;
662
663         status = plugin_register_notification (callback_name,
664                         riemann_notification, &ud);
665         if (status != 0)
666                 WARNING ("write_riemann plugin: plugin_register_notification (\"%s\") "
667                                 "failed with status %i.",
668                                 callback_name, status);
669         else /* success */
670                 host->reference_count++;
671
672         if (host->reference_count <= 1)
673         {
674                 /* Both callbacks failed => free memory.
675                  * We need to unlock here, because riemann_free() will lock.
676                  * This is not a race condition, because we're the only one
677                  * holding a reference. */
678                 pthread_mutex_unlock (&host->lock);
679                 riemann_free (host);
680                 return (-1);
681         }
682
683         host->reference_count--;
684         pthread_mutex_unlock (&host->lock);
685
686         return status;
687 }
688
689 static int
690 riemann_config(oconfig_item_t *ci)
691 {
692         int              i;
693         oconfig_item_t  *child;
694         int              status;
695
696         for (i = 0; i < ci->children_num; i++)  {
697                 child = &ci->children[i];
698
699                 if (strcasecmp("Node", child->key) == 0) {
700                         riemann_config_node (child);
701                 } else if (strcasecmp(child->key, "tag") == 0) {
702                         char *tmp = NULL;
703                         status = cf_util_get_string(child, &tmp);
704                         if (status != 0)
705                                 continue;
706
707                         strarray_add (&riemann_tags, &riemann_tags_num, tmp);
708                         DEBUG("write_riemann plugin: Got tag: %s", tmp);
709                         sfree (tmp);
710                 } else {
711                         WARNING ("write_riemann plugin: Ignoring unknown "
712                                  "configuration option \"%s\" at top level.",
713                                  child->key);
714                 }
715         }
716         return (0);
717 }
718
719 void
720 module_register(void)
721 {
722         plugin_register_complex_config ("write_riemann", riemann_config);
723 }
724
725 /* vim: set sw=8 sts=8 ts=8 noet : */