Merge branch 'collectd-5.7'
[collectd.git] / src / write_sensu.c
1 /**
2  * collectd - src/write_sensu.c
3  * Copyright (C) 2015 Fabrice A. Marie
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Fabrice A. Marie <fabrice at kibinlabs.com>
25  */
26
27 #define _GNU_SOURCE
28
29 #include "collectd.h"
30
31 #include <arpa/inet.h>
32 #include <errno.h>
33 #include <inttypes.h>
34 #include <netdb.h>
35 #include <stddef.h>
36 #include "common.h"
37 #include "plugin.h"
38 #include "utils_cache.h"
39
40 #include <stdlib.h>
41 #define SENSU_HOST "localhost"
42 #define SENSU_PORT "3030"
43
44 #ifdef HAVE_ASPRINTF
45 #define my_asprintf asprintf
46 #define my_vasprintf vasprintf
47 #else
48 /*
49  * asprintf() is available from Solaris 10 update 11.
50  * For older versions, use asprintf() portable implementation from
51  * https://github.com/littlstar/asprintf.c/blob/master/
52  * copyright (c) 2014 joseph werle <joseph.werle@gmail.com> under MIT license.
53  */
54
55 static int my_vasprintf(char **str, const char *fmt, va_list args) {
56   int size = 0;
57   va_list tmpa;
58   // copy
59   va_copy(tmpa, args);
60   // apply variadic arguments to
61   // sprintf with format to get size
62   size = vsnprintf(NULL, size, fmt, tmpa);
63   // toss args
64   va_end(tmpa);
65   // return -1 to be compliant if
66   // size is less than 0
67   if (size < 0) {
68     return -1;
69   }
70   // alloc with size plus 1 for `\0'
71   *str = (char *)malloc(size + 1);
72   // return -1 to be compliant
73   // if pointer is `NULL'
74   if (NULL == *str) {
75     return -1;
76   }
77   // format string with original
78   // variadic arguments and set new size
79   size = vsprintf(*str, fmt, args);
80   return size;
81 }
82
83 static int my_asprintf(char **str, const char *fmt, ...) {
84   int size = 0;
85   va_list args;
86   // init variadic argumens
87   va_start(args, fmt);
88   // format and get size
89   size = my_vasprintf(str, fmt, args);
90   // toss args
91   va_end(args);
92   return size;
93 }
94
95 #endif
96
97 struct str_list {
98   int nb_strs;
99   char **strs;
100 };
101
102 struct sensu_host {
103   char *name;
104   char *event_service_prefix;
105   struct str_list metric_handlers;
106   struct str_list notification_handlers;
107 #define F_READY 0x01
108   uint8_t flags;
109   pthread_mutex_t lock;
110   _Bool notifications;
111   _Bool metrics;
112   _Bool store_rates;
113   _Bool always_append_ds;
114   char *separator;
115   char *node;
116   char *service;
117   int s;
118   struct addrinfo *res;
119   int reference_count;
120 };
121
122 static char *sensu_tags = NULL;
123 static char **sensu_attrs = NULL;
124 static size_t sensu_attrs_num;
125
126 static int add_str_to_list(struct str_list *strs,
127                            const char *str_to_add) /* {{{ */
128 {
129   char **old_strs_ptr = strs->strs;
130   char *newstr = strdup(str_to_add);
131   if (newstr == NULL) {
132     ERROR("write_sensu plugin: Unable to alloc memory");
133     return -1;
134   }
135   strs->strs = realloc(strs->strs, sizeof(char *) * (strs->nb_strs + 1));
136   if (strs->strs == NULL) {
137     strs->strs = old_strs_ptr;
138     free(newstr);
139     ERROR("write_sensu plugin: Unable to alloc memory");
140     return -1;
141   }
142   strs->strs[strs->nb_strs] = newstr;
143   strs->nb_strs++;
144   return 0;
145 }
146 /* }}} int add_str_to_list */
147
148 static void free_str_list(struct str_list *strs) /* {{{ */
149 {
150   for (int i = 0; i < strs->nb_strs; i++)
151     free(strs->strs[i]);
152   free(strs->strs);
153 }
154 /* }}} void free_str_list */
155
156 static int sensu_connect(struct sensu_host *host) /* {{{ */
157 {
158   int e;
159   char const *node;
160   char const *service;
161
162   // Resolve the target if we haven't done already
163   if (!(host->flags & F_READY)) {
164     memset(&service, 0, sizeof(service));
165     host->res = NULL;
166
167     node = (host->node != NULL) ? host->node : SENSU_HOST;
168     service = (host->service != NULL) ? host->service : SENSU_PORT;
169
170     struct addrinfo ai_hints = {.ai_family = AF_INET,
171                                 .ai_flags = AI_ADDRCONFIG,
172                                 .ai_socktype = SOCK_STREAM};
173
174     if ((e = getaddrinfo(node, service, &ai_hints, &(host->res))) != 0) {
175       ERROR("write_sensu plugin: Unable to resolve host \"%s\": %s", node,
176             gai_strerror(e));
177       return -1;
178     }
179     DEBUG("write_sensu plugin: successfully resolved host/port: %s/%s", node,
180           service);
181     host->flags |= F_READY;
182   }
183
184   struct linger so_linger;
185   host->s = -1;
186   for (struct addrinfo *ai = host->res; ai != NULL; ai = ai->ai_next) {
187     // create the socket
188     if ((host->s = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) ==
189         -1) {
190       continue;
191     }
192
193     // Set very low close() lingering
194     so_linger.l_onoff = 1;
195     so_linger.l_linger = 3;
196     if (setsockopt(host->s, SOL_SOCKET, SO_LINGER, &so_linger,
197                    sizeof so_linger) != 0)
198       WARNING("write_sensu plugin: failed to set socket close() lingering");
199
200     set_sock_opts(host->s);
201
202     // connect the socket
203     if (connect(host->s, ai->ai_addr, ai->ai_addrlen) != 0) {
204       close(host->s);
205       host->s = -1;
206       continue;
207     }
208     DEBUG("write_sensu plugin: connected");
209     break;
210   }
211
212   if (host->s < 0) {
213     WARNING("write_sensu plugin: Unable to connect to sensu client");
214     return -1;
215   }
216   return 0;
217 } /* }}} int sensu_connect */
218
219 static void sensu_close_socket(struct sensu_host *host) /* {{{ */
220 {
221   if (host->s != -1)
222     close(host->s);
223   host->s = -1;
224
225 } /* }}} void sensu_close_socket */
226
227 static char *build_json_str_list(const char *tag,
228                                  struct str_list const *list) /* {{{ */
229 {
230   int res;
231   char *ret_str = NULL;
232   char *temp_str;
233   if (list->nb_strs == 0) {
234     ret_str = malloc(sizeof(char));
235     if (ret_str == NULL) {
236       ERROR("write_sensu plugin: Unable to alloc memory");
237       return NULL;
238     }
239     ret_str[0] = '\0';
240   }
241
242   res = my_asprintf(&temp_str, "\"%s\": [\"%s\"", tag, list->strs[0]);
243   if (res == -1) {
244     ERROR("write_sensu plugin: Unable to alloc memory");
245     free(ret_str);
246     return NULL;
247   }
248   for (int i = 1; i < list->nb_strs; i++) {
249     res = my_asprintf(&ret_str, "%s, \"%s\"", temp_str, list->strs[i]);
250     free(temp_str);
251     if (res == -1) {
252       ERROR("write_sensu plugin: Unable to alloc memory");
253       return NULL;
254     }
255     temp_str = ret_str;
256   }
257   res = my_asprintf(&ret_str, "%s]", temp_str);
258   free(temp_str);
259   if (res == -1) {
260     ERROR("write_sensu plugin: Unable to alloc memory");
261     return NULL;
262   }
263
264   return ret_str;
265 } /* }}} char *build_json_str_list*/
266
267 static int sensu_format_name2(char *ret, int ret_len, const char *hostname,
268                               const char *plugin, const char *plugin_instance,
269                               const char *type, const char *type_instance,
270                               const char *separator) {
271   char *buffer;
272   size_t buffer_size;
273
274   buffer = ret;
275   buffer_size = (size_t)ret_len;
276
277 #define APPEND(str)                                                            \
278   do {                                                                         \
279     size_t l = strlen(str);                                                    \
280     if (l >= buffer_size)                                                      \
281       return (ENOBUFS);                                                        \
282     memcpy(buffer, (str), l);                                                  \
283     buffer += l;                                                               \
284     buffer_size -= l;                                                          \
285   } while (0)
286
287   assert(plugin != NULL);
288   assert(type != NULL);
289
290   APPEND(hostname);
291   APPEND(separator);
292   APPEND(plugin);
293   if ((plugin_instance != NULL) && (plugin_instance[0] != 0)) {
294     APPEND("-");
295     APPEND(plugin_instance);
296   }
297   APPEND(separator);
298   APPEND(type);
299   if ((type_instance != NULL) && (type_instance[0] != 0)) {
300     APPEND("-");
301     APPEND(type_instance);
302   }
303   assert(buffer_size > 0);
304   buffer[0] = 0;
305
306 #undef APPEND
307   return (0);
308 } /* int sensu_format_name2 */
309
310 static void in_place_replace_sensu_name_reserved(char *orig_name) /* {{{ */
311 {
312   int len = strlen(orig_name);
313   for (int i = 0; i < len; i++) {
314     // some plugins like ipmi generate special characters in metric name
315     switch (orig_name[i]) {
316     case '(':
317       orig_name[i] = '_';
318       break;
319     case ')':
320       orig_name[i] = '_';
321       break;
322     case ' ':
323       orig_name[i] = '_';
324       break;
325     case '"':
326       orig_name[i] = '_';
327       break;
328     case '\'':
329       orig_name[i] = '_';
330       break;
331     case '+':
332       orig_name[i] = '_';
333       break;
334     }
335   }
336 } /* }}} char *replace_sensu_name_reserved */
337
338 static char *sensu_value_to_json(struct sensu_host const *host, /* {{{ */
339                                  data_set_t const *ds, value_list_t const *vl,
340                                  size_t index, gauge_t const *rates,
341                                  int status) {
342   char name_buffer[5 * DATA_MAX_NAME_LEN];
343   char service_buffer[6 * DATA_MAX_NAME_LEN];
344   char *ret_str;
345   char *temp_str;
346   char *value_str;
347   int res;
348   // First part of the JSON string
349   const char *part1 = "{\"name\": \"collectd\", \"type\": \"metric\"";
350
351   char *handlers_str =
352       build_json_str_list("handlers", &(host->metric_handlers));
353   if (handlers_str == NULL) {
354     ERROR("write_sensu plugin: Unable to alloc memory");
355     return NULL;
356   }
357
358   // incorporate the handlers
359   if (strlen(handlers_str) == 0) {
360     free(handlers_str);
361     ret_str = strdup(part1);
362     if (ret_str == NULL) {
363       ERROR("write_sensu plugin: Unable to alloc memory");
364       return NULL;
365     }
366   } else {
367     res = my_asprintf(&ret_str, "%s, %s", part1, handlers_str);
368     free(handlers_str);
369     if (res == -1) {
370       ERROR("write_sensu plugin: Unable to alloc memory");
371       return NULL;
372     }
373   }
374
375   // incorporate the plugin name information
376   res = my_asprintf(&temp_str, "%s, \"collectd_plugin\": \"%s\"", ret_str,
377                     vl->plugin);
378   free(ret_str);
379   if (res == -1) {
380     ERROR("write_sensu plugin: Unable to alloc memory");
381     return NULL;
382   }
383   ret_str = temp_str;
384
385   // incorporate the plugin type
386   res = my_asprintf(&temp_str, "%s, \"collectd_plugin_type\": \"%s\"", ret_str,
387                     vl->type);
388   free(ret_str);
389   if (res == -1) {
390     ERROR("write_sensu plugin: Unable to alloc memory");
391     return NULL;
392   }
393   ret_str = temp_str;
394
395   // incorporate the plugin instance if any
396   if (vl->plugin_instance[0] != 0) {
397     res = my_asprintf(&temp_str, "%s, \"collectd_plugin_instance\": \"%s\"",
398                       ret_str, vl->plugin_instance);
399     free(ret_str);
400     if (res == -1) {
401       ERROR("write_sensu plugin: Unable to alloc memory");
402       return NULL;
403     }
404     ret_str = temp_str;
405   }
406
407   // incorporate the plugin type instance if any
408   if (vl->type_instance[0] != 0) {
409     res =
410         my_asprintf(&temp_str, "%s, \"collectd_plugin_type_instance\": \"%s\"",
411                     ret_str, vl->type_instance);
412     free(ret_str);
413     if (res == -1) {
414       ERROR("write_sensu plugin: Unable to alloc memory");
415       return NULL;
416     }
417     ret_str = temp_str;
418   }
419
420   // incorporate the data source type
421   if ((ds->ds[index].type != DS_TYPE_GAUGE) && (rates != NULL)) {
422     char ds_type[DATA_MAX_NAME_LEN];
423     ssnprintf(ds_type, sizeof(ds_type), "%s:rate",
424               DS_TYPE_TO_STRING(ds->ds[index].type));
425     res = my_asprintf(&temp_str, "%s, \"collectd_data_source_type\": \"%s\"",
426                       ret_str, ds_type);
427     free(ret_str);
428     if (res == -1) {
429       ERROR("write_sensu plugin: Unable to alloc memory");
430       return NULL;
431     }
432     ret_str = temp_str;
433   } else {
434     res = my_asprintf(&temp_str, "%s, \"collectd_data_source_type\": \"%s\"",
435                       ret_str, DS_TYPE_TO_STRING(ds->ds[index].type));
436     free(ret_str);
437     if (res == -1) {
438       ERROR("write_sensu plugin: Unable to alloc memory");
439       return NULL;
440     }
441     ret_str = temp_str;
442   }
443
444   // incorporate the data source name
445   res = my_asprintf(&temp_str, "%s, \"collectd_data_source_name\": \"%s\"",
446                     ret_str, ds->ds[index].name);
447   free(ret_str);
448   if (res == -1) {
449     ERROR("write_sensu plugin: Unable to alloc memory");
450     return NULL;
451   }
452   ret_str = temp_str;
453
454   // incorporate the data source index
455   {
456     char ds_index[DATA_MAX_NAME_LEN];
457     ssnprintf(ds_index, sizeof(ds_index), "%zu", index);
458     res = my_asprintf(&temp_str, "%s, \"collectd_data_source_index\": %s",
459                       ret_str, ds_index);
460     free(ret_str);
461     if (res == -1) {
462       ERROR("write_sensu plugin: Unable to alloc memory");
463       return NULL;
464     }
465     ret_str = temp_str;
466   }
467
468   // add key value attributes from config if any
469   for (size_t i = 0; i < sensu_attrs_num; i += 2) {
470     res = my_asprintf(&temp_str, "%s, \"%s\": \"%s\"", ret_str, sensu_attrs[i],
471                       sensu_attrs[i + 1]);
472     free(ret_str);
473     if (res == -1) {
474       ERROR("write_sensu plugin: Unable to alloc memory");
475       return NULL;
476     }
477     ret_str = temp_str;
478   }
479
480   // incorporate sensu tags from config if any
481   if ((sensu_tags != NULL) && (strlen(sensu_tags) != 0)) {
482     res = my_asprintf(&temp_str, "%s, %s", ret_str, sensu_tags);
483     free(ret_str);
484     if (res == -1) {
485       ERROR("write_sensu plugin: Unable to alloc memory");
486       return NULL;
487     }
488     ret_str = temp_str;
489   }
490
491   // calculate the value and set to a string
492   if (ds->ds[index].type == DS_TYPE_GAUGE) {
493     res = my_asprintf(&value_str, GAUGE_FORMAT, vl->values[index].gauge);
494     if (res == -1) {
495       free(ret_str);
496       ERROR("write_sensu plugin: Unable to alloc memory");
497       return NULL;
498     }
499   } else if (rates != NULL) {
500     res = my_asprintf(&value_str, GAUGE_FORMAT, rates[index]);
501     if (res == -1) {
502       free(ret_str);
503       ERROR("write_sensu plugin: Unable to alloc memory");
504       return NULL;
505     }
506   } else {
507     if (ds->ds[index].type == DS_TYPE_DERIVE) {
508       res = my_asprintf(&value_str, "%" PRIi64, vl->values[index].derive);
509       if (res == -1) {
510         free(ret_str);
511         ERROR("write_sensu plugin: Unable to alloc memory");
512         return NULL;
513       }
514     } else if (ds->ds[index].type == DS_TYPE_ABSOLUTE) {
515       res = my_asprintf(&value_str, "%" PRIu64, vl->values[index].absolute);
516       if (res == -1) {
517         free(ret_str);
518         ERROR("write_sensu plugin: Unable to alloc memory");
519         return NULL;
520       }
521     } else {
522       res = my_asprintf(&value_str, "%llu", vl->values[index].counter);
523       if (res == -1) {
524         free(ret_str);
525         ERROR("write_sensu plugin: Unable to alloc memory");
526         return NULL;
527       }
528     }
529   }
530
531   // Generate the full service name
532   sensu_format_name2(name_buffer, sizeof(name_buffer), vl->host, vl->plugin,
533                      vl->plugin_instance, vl->type, vl->type_instance,
534                      host->separator);
535   if (host->always_append_ds || (ds->ds_num > 1)) {
536     if (host->event_service_prefix == NULL)
537       ssnprintf(service_buffer, sizeof(service_buffer), "%s.%s", name_buffer,
538                 ds->ds[index].name);
539     else
540       ssnprintf(service_buffer, sizeof(service_buffer), "%s%s.%s",
541                 host->event_service_prefix, name_buffer, ds->ds[index].name);
542   } else {
543     if (host->event_service_prefix == NULL)
544       sstrncpy(service_buffer, name_buffer, sizeof(service_buffer));
545     else
546       ssnprintf(service_buffer, sizeof(service_buffer), "%s%s",
547                 host->event_service_prefix, name_buffer);
548   }
549
550   // Replace collectd sensor name reserved characters so that time series DB is
551   // happy
552   in_place_replace_sensu_name_reserved(service_buffer);
553
554   // finalize the buffer by setting the output and closing curly bracket
555   res = my_asprintf(&temp_str, "%s, \"output\": \"%s %s %lld\"}\n", ret_str,
556                     service_buffer, value_str,
557                     (long long)CDTIME_T_TO_TIME_T(vl->time));
558   free(ret_str);
559   free(value_str);
560   if (res == -1) {
561     ERROR("write_sensu plugin: Unable to alloc memory");
562     return NULL;
563   }
564   ret_str = temp_str;
565
566   DEBUG("write_sensu plugin: Successfully created json for metric: "
567         "host = \"%s\", service = \"%s\"",
568         vl->host, service_buffer);
569   return ret_str;
570 } /* }}} char *sensu_value_to_json */
571
572 /*
573  * Uses replace_str2() implementation from
574  * http://creativeandcritical.net/str-replace-c/
575  * copyright (c) Laird Shaw, under public domain.
576  */
577 static char *replace_str(const char *str, const char *old, /* {{{ */
578                          const char *new) {
579   char *ret, *r;
580   const char *p, *q;
581   size_t oldlen = strlen(old);
582   size_t count = strlen(new);
583   size_t retlen;
584   size_t newlen = count;
585   int samesize = (oldlen == newlen);
586
587   if (!samesize) {
588     for (count = 0, p = str; (q = strstr(p, old)) != NULL; p = q + oldlen)
589       count++;
590     /* This is undefined if p - str > PTRDIFF_MAX */
591     retlen = p - str + strlen(p) + count * (newlen - oldlen);
592   } else
593     retlen = strlen(str);
594
595   ret = calloc(1, retlen + 1);
596   if (ret == NULL)
597     return NULL;
598   // added to original: not optimized, but keeps valgrind happy.
599
600   r = ret;
601   p = str;
602   while (1) {
603     /* If the old and new strings are different lengths - in other
604      * words we have already iterated through with strstr above,
605      * and thus we know how many times we need to call it - then we
606      * can avoid the final (potentially lengthy) call to strstr,
607      * which we already know is going to return NULL, by
608      * decrementing and checking count.
609      */
610     if (!samesize && !count--)
611       break;
612     /* Otherwise i.e. when the old and new strings are the same
613      * length, and we don't know how many times to call strstr,
614      * we must check for a NULL return here (we check it in any
615      * event, to avoid further conditions, and because there's
616      * no harm done with the check even when the old and new
617      * strings are different lengths).
618      */
619     if ((q = strstr(p, old)) == NULL)
620       break;
621     /* This is undefined if q - p > PTRDIFF_MAX */
622     ptrdiff_t l = q - p;
623     memcpy(r, p, l);
624     r += l;
625     memcpy(r, new, newlen);
626     r += newlen;
627     p = q + oldlen;
628   }
629   strncpy(r, p, strlen(p));
630
631   return ret;
632 } /* }}} char *replace_str */
633
634 static char *replace_json_reserved(const char *message) /* {{{ */
635 {
636   char *msg = replace_str(message, "\\", "\\\\");
637   if (msg == NULL) {
638     ERROR("write_sensu plugin: Unable to alloc memory");
639     return NULL;
640   }
641   char *tmp = replace_str(msg, "\"", "\\\"");
642   free(msg);
643   if (tmp == NULL) {
644     ERROR("write_sensu plugin: Unable to alloc memory");
645     return NULL;
646   }
647   msg = replace_str(tmp, "\n", "\\\n");
648   free(tmp);
649   if (msg == NULL) {
650     ERROR("write_sensu plugin: Unable to alloc memory");
651     return NULL;
652   }
653   return msg;
654 } /* }}} char *replace_json_reserved */
655
656 static char *sensu_notification_to_json(struct sensu_host *host, /* {{{ */
657                                         notification_t const *n) {
658   char service_buffer[6 * DATA_MAX_NAME_LEN];
659   char const *severity;
660   char *ret_str;
661   char *temp_str;
662   int status;
663   size_t i;
664   int res;
665   // add the severity/status
666   switch (n->severity) {
667   case NOTIF_OKAY:
668     severity = "OK";
669     status = 0;
670     break;
671   case NOTIF_WARNING:
672     severity = "WARNING";
673     status = 1;
674     break;
675   case NOTIF_FAILURE:
676     severity = "CRITICAL";
677     status = 2;
678     break;
679   default:
680     severity = "UNKNOWN";
681     status = 3;
682   }
683   res = my_asprintf(&temp_str, "{\"status\": %d", status);
684   if (res == -1) {
685     ERROR("write_sensu plugin: Unable to alloc memory");
686     return NULL;
687   }
688   ret_str = temp_str;
689
690   // incorporate the timestamp
691   res = my_asprintf(&temp_str, "%s, \"timestamp\": %lld", ret_str,
692                     (long long)CDTIME_T_TO_TIME_T(n->time));
693   free(ret_str);
694   if (res == -1) {
695     ERROR("write_sensu plugin: Unable to alloc memory");
696     return NULL;
697   }
698   ret_str = temp_str;
699
700   char *handlers_str =
701       build_json_str_list("handlers", &(host->notification_handlers));
702   if (handlers_str == NULL) {
703     ERROR("write_sensu plugin: Unable to alloc memory");
704     free(ret_str);
705     return NULL;
706   }
707   // incorporate the handlers
708   if (strlen(handlers_str) != 0) {
709     res = my_asprintf(&temp_str, "%s, %s", ret_str, handlers_str);
710     free(ret_str);
711     free(handlers_str);
712     if (res == -1) {
713       ERROR("write_sensu plugin: Unable to alloc memory");
714       return NULL;
715     }
716     ret_str = temp_str;
717   } else {
718     free(handlers_str);
719   }
720
721   // incorporate the plugin name information if any
722   if (n->plugin[0] != 0) {
723     res = my_asprintf(&temp_str, "%s, \"collectd_plugin\": \"%s\"", ret_str,
724                       n->plugin);
725     free(ret_str);
726     if (res == -1) {
727       ERROR("write_sensu plugin: Unable to alloc memory");
728       return NULL;
729     }
730     ret_str = temp_str;
731   }
732
733   // incorporate the plugin type if any
734   if (n->type[0] != 0) {
735     res = my_asprintf(&temp_str, "%s, \"collectd_plugin_type\": \"%s\"",
736                       ret_str, n->type);
737     free(ret_str);
738     if (res == -1) {
739       ERROR("write_sensu plugin: Unable to alloc memory");
740       return NULL;
741     }
742     ret_str = temp_str;
743   }
744
745   // incorporate the plugin instance if any
746   if (n->plugin_instance[0] != 0) {
747     res = my_asprintf(&temp_str, "%s, \"collectd_plugin_instance\": \"%s\"",
748                       ret_str, n->plugin_instance);
749     free(ret_str);
750     if (res == -1) {
751       ERROR("write_sensu plugin: Unable to alloc memory");
752       return NULL;
753     }
754     ret_str = temp_str;
755   }
756
757   // incorporate the plugin type instance if any
758   if (n->type_instance[0] != 0) {
759     res =
760         my_asprintf(&temp_str, "%s, \"collectd_plugin_type_instance\": \"%s\"",
761                     ret_str, n->type_instance);
762     free(ret_str);
763     if (res == -1) {
764       ERROR("write_sensu plugin: Unable to alloc memory");
765       return NULL;
766     }
767     ret_str = temp_str;
768   }
769
770   // add key value attributes from config if any
771   for (i = 0; i < sensu_attrs_num; i += 2) {
772     res = my_asprintf(&temp_str, "%s, \"%s\": \"%s\"", ret_str, sensu_attrs[i],
773                       sensu_attrs[i + 1]);
774     free(ret_str);
775     if (res == -1) {
776       ERROR("write_sensu plugin: Unable to alloc memory");
777       return NULL;
778     }
779     ret_str = temp_str;
780   }
781
782   // incorporate sensu tags from config if any
783   if ((sensu_tags != NULL) && (strlen(sensu_tags) != 0)) {
784     res = my_asprintf(&temp_str, "%s, %s", ret_str, sensu_tags);
785     free(ret_str);
786     if (res == -1) {
787       ERROR("write_sensu plugin: Unable to alloc memory");
788       return NULL;
789     }
790     ret_str = temp_str;
791   }
792
793   // incorporate the service name
794   sensu_format_name2(service_buffer, sizeof(service_buffer),
795                      /* host */ "", n->plugin, n->plugin_instance, n->type,
796                      n->type_instance, host->separator);
797   // replace sensu event name chars that are considered illegal
798   in_place_replace_sensu_name_reserved(service_buffer);
799   res = my_asprintf(&temp_str, "%s, \"name\": \"%s\"", ret_str,
800                     &service_buffer[1]);
801   free(ret_str);
802   if (res == -1) {
803     ERROR("write_sensu plugin: Unable to alloc memory");
804     return NULL;
805   }
806   ret_str = temp_str;
807
808   // incorporate the check output
809   if (n->message[0] != 0) {
810     char *msg = replace_json_reserved(n->message);
811     if (msg == NULL) {
812       ERROR("write_sensu plugin: Unable to alloc memory");
813       free(ret_str);
814       return NULL;
815     }
816     res = my_asprintf(&temp_str, "%s, \"output\": \"%s - %s\"", ret_str,
817                       severity, msg);
818     free(ret_str);
819     free(msg);
820     if (res == -1) {
821       ERROR("write_sensu plugin: Unable to alloc memory");
822       return NULL;
823     }
824     ret_str = temp_str;
825   }
826
827   // Pull in values from threshold and add extra attributes
828   for (notification_meta_t *meta = n->meta; meta != NULL; meta = meta->next) {
829     if (strcasecmp("CurrentValue", meta->name) == 0 &&
830         meta->type == NM_TYPE_DOUBLE) {
831       res = my_asprintf(&temp_str, "%s, \"current_value\": \"%.8f\"", ret_str,
832                         meta->nm_value.nm_double);
833       free(ret_str);
834       if (res == -1) {
835         ERROR("write_sensu plugin: Unable to alloc memory");
836         return NULL;
837       }
838       ret_str = temp_str;
839     }
840     if (meta->type == NM_TYPE_STRING) {
841       res = my_asprintf(&temp_str, "%s, \"%s\": \"%s\"", ret_str, meta->name,
842                         meta->nm_value.nm_string);
843       free(ret_str);
844       if (res == -1) {
845         ERROR("write_sensu plugin: Unable to alloc memory");
846         return NULL;
847       }
848       ret_str = temp_str;
849     }
850   }
851
852   // close the curly bracket
853   res = my_asprintf(&temp_str, "%s}\n", ret_str);
854   free(ret_str);
855   if (res == -1) {
856     ERROR("write_sensu plugin: Unable to alloc memory");
857     return NULL;
858   }
859   ret_str = temp_str;
860
861   DEBUG("write_sensu plugin: Successfully created JSON for notification: "
862         "host = \"%s\", service = \"%s\", state = \"%s\"",
863         n->host, service_buffer, severity);
864   return ret_str;
865 } /* }}} char *sensu_notification_to_json */
866
867 static int sensu_send_msg(struct sensu_host *host, const char *msg) /* {{{ */
868 {
869   int status = 0;
870   size_t buffer_len;
871
872   status = sensu_connect(host);
873   if (status != 0)
874     return status;
875
876   buffer_len = strlen(msg);
877
878   status = (int)swrite(host->s, msg, buffer_len);
879   sensu_close_socket(host);
880
881   if (status != 0) {
882     char errbuf[1024];
883     ERROR("write_sensu plugin: Sending to Sensu at %s:%s failed: %s",
884           (host->node != NULL) ? host->node : SENSU_HOST,
885           (host->service != NULL) ? host->service : SENSU_PORT,
886           sstrerror(errno, errbuf, sizeof(errbuf)));
887     return -1;
888   }
889
890   return 0;
891 } /* }}} int sensu_send_msg */
892
893 static int sensu_send(struct sensu_host *host, char const *msg) /* {{{ */
894 {
895   int status = 0;
896
897   status = sensu_send_msg(host, msg);
898   if (status != 0) {
899     host->flags &= ~F_READY;
900     if (host->res != NULL) {
901       freeaddrinfo(host->res);
902       host->res = NULL;
903     }
904     return status;
905   }
906
907   return 0;
908 } /* }}} int sensu_send */
909
910 static int sensu_write(const data_set_t *ds, /* {{{ */
911                        const value_list_t *vl, user_data_t *ud) {
912   int status = 0;
913   int statuses[vl->values_len];
914   struct sensu_host *host = ud->data;
915   gauge_t *rates = NULL;
916   char *msg;
917
918   pthread_mutex_lock(&host->lock);
919   memset(statuses, 0, vl->values_len * sizeof(*statuses));
920
921   if (host->store_rates) {
922     rates = uc_get_rate(ds, vl);
923     if (rates == NULL) {
924       ERROR("write_sensu plugin: uc_get_rate failed.");
925       pthread_mutex_unlock(&host->lock);
926       return -1;
927     }
928   }
929   for (size_t i = 0; i < vl->values_len; i++) {
930     msg = sensu_value_to_json(host, ds, vl, (int)i, rates, statuses[i]);
931     if (msg == NULL) {
932       sfree(rates);
933       pthread_mutex_unlock(&host->lock);
934       return -1;
935     }
936     status = sensu_send(host, msg);
937     free(msg);
938     if (status != 0) {
939       ERROR("write_sensu plugin: sensu_send failed with status %i", status);
940       pthread_mutex_unlock(&host->lock);
941       sfree(rates);
942       return status;
943     }
944   }
945   sfree(rates);
946   pthread_mutex_unlock(&host->lock);
947   return status;
948 } /* }}} int sensu_write */
949
950 static int sensu_notification(const notification_t *n,
951                               user_data_t *ud) /* {{{ */
952 {
953   int status;
954   struct sensu_host *host = ud->data;
955   char *msg;
956
957   pthread_mutex_lock(&host->lock);
958
959   msg = sensu_notification_to_json(host, n);
960   if (msg == NULL) {
961     pthread_mutex_unlock(&host->lock);
962     return -1;
963   }
964
965   status = sensu_send(host, msg);
966   free(msg);
967   if (status != 0)
968     ERROR("write_sensu plugin: sensu_send failed with status %i", status);
969   pthread_mutex_unlock(&host->lock);
970
971   return status;
972 } /* }}} int sensu_notification */
973
974 static void sensu_free(void *p) /* {{{ */
975 {
976   struct sensu_host *host = p;
977
978   if (host == NULL)
979     return;
980
981   pthread_mutex_lock(&host->lock);
982
983   host->reference_count--;
984   if (host->reference_count > 0) {
985     pthread_mutex_unlock(&host->lock);
986     return;
987   }
988
989   sensu_close_socket(host);
990   if (host->res != NULL) {
991     freeaddrinfo(host->res);
992     host->res = NULL;
993   }
994   sfree(host->service);
995   sfree(host->event_service_prefix);
996   sfree(host->name);
997   sfree(host->node);
998   sfree(host->separator);
999   free_str_list(&(host->metric_handlers));
1000   free_str_list(&(host->notification_handlers));
1001   pthread_mutex_destroy(&host->lock);
1002   sfree(host);
1003 } /* }}} void sensu_free */
1004
1005 static int sensu_config_node(oconfig_item_t *ci) /* {{{ */
1006 {
1007   struct sensu_host *host = NULL;
1008   int status = 0;
1009   oconfig_item_t *child;
1010   char callback_name[DATA_MAX_NAME_LEN];
1011
1012   if ((host = calloc(1, sizeof(*host))) == NULL) {
1013     ERROR("write_sensu plugin: calloc failed.");
1014     return ENOMEM;
1015   }
1016   pthread_mutex_init(&host->lock, NULL);
1017   host->reference_count = 1;
1018   host->node = NULL;
1019   host->service = NULL;
1020   host->notifications = 0;
1021   host->metrics = 0;
1022   host->store_rates = 1;
1023   host->always_append_ds = 0;
1024   host->metric_handlers.nb_strs = 0;
1025   host->metric_handlers.strs = NULL;
1026   host->notification_handlers.nb_strs = 0;
1027   host->notification_handlers.strs = NULL;
1028   host->separator = strdup("/");
1029   if (host->separator == NULL) {
1030     ERROR("write_sensu plugin: Unable to alloc memory");
1031     sensu_free(host);
1032     return -1;
1033   }
1034
1035   status = cf_util_get_string(ci, &host->name);
1036   if (status != 0) {
1037     WARNING("write_sensu plugin: Required host name is missing.");
1038     sensu_free(host);
1039     return -1;
1040   }
1041
1042   for (int i = 0; i < ci->children_num; i++) {
1043     child = &ci->children[i];
1044     status = 0;
1045
1046     if (strcasecmp("Host", child->key) == 0) {
1047       status = cf_util_get_string(child, &host->node);
1048       if (status != 0)
1049         break;
1050     } else if (strcasecmp("Notifications", child->key) == 0) {
1051       status = cf_util_get_boolean(child, &host->notifications);
1052       if (status != 0)
1053         break;
1054     } else if (strcasecmp("Metrics", child->key) == 0) {
1055       status = cf_util_get_boolean(child, &host->metrics);
1056       if (status != 0)
1057         break;
1058     } else if (strcasecmp("EventServicePrefix", child->key) == 0) {
1059       status = cf_util_get_string(child, &host->event_service_prefix);
1060       if (status != 0)
1061         break;
1062     } else if (strcasecmp("Separator", child->key) == 0) {
1063       status = cf_util_get_string(child, &host->separator);
1064       if (status != 0)
1065         break;
1066     } else if (strcasecmp("MetricHandler", child->key) == 0) {
1067       char *temp_str = NULL;
1068       status = cf_util_get_string(child, &temp_str);
1069       if (status != 0)
1070         break;
1071       status = add_str_to_list(&(host->metric_handlers), temp_str);
1072       free(temp_str);
1073       if (status != 0)
1074         break;
1075     } else if (strcasecmp("NotificationHandler", child->key) == 0) {
1076       char *temp_str = NULL;
1077       status = cf_util_get_string(child, &temp_str);
1078       if (status != 0)
1079         break;
1080       status = add_str_to_list(&(host->notification_handlers), temp_str);
1081       free(temp_str);
1082       if (status != 0)
1083         break;
1084     } else if (strcasecmp("Port", child->key) == 0) {
1085       status = cf_util_get_service(child, &host->service);
1086       if (status != 0) {
1087         ERROR("write_sensu plugin: Invalid argument "
1088               "configured for the \"Port\" "
1089               "option.");
1090         break;
1091       }
1092     } else if (strcasecmp("StoreRates", child->key) == 0) {
1093       status = cf_util_get_boolean(child, &host->store_rates);
1094       if (status != 0)
1095         break;
1096     } else if (strcasecmp("AlwaysAppendDS", child->key) == 0) {
1097       status = cf_util_get_boolean(child, &host->always_append_ds);
1098       if (status != 0)
1099         break;
1100     } else {
1101       WARNING("write_sensu plugin: ignoring unknown config "
1102               "option: \"%s\"",
1103               child->key);
1104     }
1105   }
1106   if (status != 0) {
1107     sensu_free(host);
1108     return status;
1109   }
1110
1111   if (host->metrics && (host->metric_handlers.nb_strs == 0)) {
1112     sensu_free(host);
1113     WARNING("write_sensu plugin: metrics enabled but no MetricHandler defined. "
1114             "Giving up.");
1115     return -1;
1116   }
1117
1118   if (host->notifications && (host->notification_handlers.nb_strs == 0)) {
1119     sensu_free(host);
1120     WARNING("write_sensu plugin: notifications enabled but no "
1121             "NotificationHandler defined. Giving up.");
1122     return -1;
1123   }
1124
1125   if ((host->notification_handlers.nb_strs > 0) && (host->notifications == 0)) {
1126     WARNING("write_sensu plugin: NotificationHandler given so forcing "
1127             "notifications to be enabled");
1128     host->notifications = 1;
1129   }
1130
1131   if ((host->metric_handlers.nb_strs > 0) && (host->metrics == 0)) {
1132     WARNING("write_sensu plugin: MetricHandler given so forcing metrics to be "
1133             "enabled");
1134     host->metrics = 1;
1135   }
1136
1137   if (!(host->notifications || host->metrics)) {
1138     WARNING("write_sensu plugin: neither metrics nor notifications enabled. "
1139             "Giving up.");
1140     sensu_free(host);
1141     return -1;
1142   }
1143
1144   ssnprintf(callback_name, sizeof(callback_name), "write_sensu/%s", host->name);
1145
1146   user_data_t ud = {.data = host, .free_func = sensu_free};
1147
1148   pthread_mutex_lock(&host->lock);
1149
1150   if (host->metrics) {
1151     status = plugin_register_write(callback_name, sensu_write, &ud);
1152     if (status != 0)
1153       WARNING("write_sensu plugin: plugin_register_write (\"%s\") "
1154               "failed with status %i.",
1155               callback_name, status);
1156     else /* success */
1157       host->reference_count++;
1158   }
1159
1160   if (host->notifications) {
1161     status =
1162         plugin_register_notification(callback_name, sensu_notification, &ud);
1163     if (status != 0)
1164       WARNING("write_sensu plugin: plugin_register_notification (\"%s\") "
1165               "failed with status %i.",
1166               callback_name, status);
1167     else
1168       host->reference_count++;
1169   }
1170
1171   if (host->reference_count <= 1) {
1172     /* Both callbacks failed => free memory.
1173      * We need to unlock here, because sensu_free() will lock.
1174      * This is not a race condition, because we're the only one
1175      * holding a reference. */
1176     pthread_mutex_unlock(&host->lock);
1177     sensu_free(host);
1178     return -1;
1179   }
1180
1181   host->reference_count--;
1182   pthread_mutex_unlock(&host->lock);
1183
1184   return status;
1185 } /* }}} int sensu_config_node */
1186
1187 static int sensu_config(oconfig_item_t *ci) /* {{{ */
1188 {
1189   oconfig_item_t *child;
1190   int status;
1191   struct str_list sensu_tags_arr;
1192
1193   sensu_tags_arr.nb_strs = 0;
1194   sensu_tags_arr.strs = NULL;
1195
1196   for (int i = 0; i < ci->children_num; i++) {
1197     child = &ci->children[i];
1198
1199     if (strcasecmp("Node", child->key) == 0) {
1200       sensu_config_node(child);
1201     } else if (strcasecmp(child->key, "attribute") == 0) {
1202       if (child->values_num != 2) {
1203         WARNING("sensu attributes need both a key and a value.");
1204         continue;
1205       }
1206       if (child->values[0].type != OCONFIG_TYPE_STRING ||
1207           child->values[1].type != OCONFIG_TYPE_STRING) {
1208         WARNING("sensu attribute needs string arguments.");
1209         continue;
1210       }
1211
1212       strarray_add(&sensu_attrs, &sensu_attrs_num,
1213                    child->values[0].value.string);
1214       strarray_add(&sensu_attrs, &sensu_attrs_num,
1215                    child->values[1].value.string);
1216
1217       DEBUG("write_sensu plugin: New attribute: %s => %s",
1218             child->values[0].value.string, child->values[1].value.string);
1219     } else if (strcasecmp(child->key, "tag") == 0) {
1220       char *tmp = NULL;
1221       status = cf_util_get_string(child, &tmp);
1222       if (status != 0)
1223         continue;
1224
1225       status = add_str_to_list(&sensu_tags_arr, tmp);
1226       sfree(tmp);
1227       if (status != 0)
1228         continue;
1229       DEBUG("write_sensu plugin: Got tag: %s", tmp);
1230     } else {
1231       WARNING("write_sensu plugin: Ignoring unknown "
1232               "configuration option \"%s\" at top level.",
1233               child->key);
1234     }
1235   }
1236   if (sensu_tags_arr.nb_strs > 0) {
1237     sfree(sensu_tags);
1238     sensu_tags = build_json_str_list("tags", &sensu_tags_arr);
1239     free_str_list(&sensu_tags_arr);
1240     if (sensu_tags == NULL) {
1241       ERROR("write_sensu plugin: Unable to alloc memory");
1242       return -1;
1243     }
1244   }
1245   return 0;
1246 } /* }}} int sensu_config */
1247
1248 void module_register(void) {
1249   plugin_register_complex_config("write_sensu", sensu_config);
1250 }