Merge branch 'collectd-5.7' into collectd-5.8
[collectd.git] / src / write_sensu.c
1 /**
2  * collectd - src/write_sensu.c
3  * Copyright (C) 2015 Fabrice A. Marie
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Fabrice A. Marie <fabrice at kibinlabs.com>
25  */
26
27 #define _GNU_SOURCE
28
29 #include "collectd.h"
30
31 #include "common.h"
32 #include "plugin.h"
33 #include "utils_cache.h"
34
35 #include <arpa/inet.h>
36 #include <errno.h>
37 #include <inttypes.h>
38 #include <netdb.h>
39 #include <stddef.h>
40
41 #include <stdlib.h>
42 #define SENSU_HOST "localhost"
43 #define SENSU_PORT "3030"
44
45 #ifdef HAVE_ASPRINTF
46 #define my_asprintf asprintf
47 #define my_vasprintf vasprintf
48 #else
49 /*
50  * asprintf() is available from Solaris 10 update 11.
51  * For older versions, use asprintf() portable implementation from
52  * https://github.com/littlstar/asprintf.c/blob/master/
53  * copyright (c) 2014 joseph werle <joseph.werle@gmail.com> under MIT license.
54  */
55
56 static int my_vasprintf(char **str, const char *fmt, va_list args) {
57   int size = 0;
58   va_list tmpa;
59   // copy
60   va_copy(tmpa, args);
61   // apply variadic arguments to
62   // sprintf with format to get size
63   size = vsnprintf(NULL, size, fmt, tmpa);
64   // toss args
65   va_end(tmpa);
66   // return -1 to be compliant if
67   // size is less than 0
68   if (size < 0) {
69     return -1;
70   }
71   // alloc with size plus 1 for `\0'
72   *str = (char *)malloc(size + 1);
73   // return -1 to be compliant
74   // if pointer is `NULL'
75   if (NULL == *str) {
76     return -1;
77   }
78   // format string with original
79   // variadic arguments and set new size
80   size = vsprintf(*str, fmt, args);
81   return size;
82 }
83
84 static int my_asprintf(char **str, const char *fmt, ...) {
85   int size = 0;
86   va_list args;
87   // init variadic argumens
88   va_start(args, fmt);
89   // format and get size
90   size = my_vasprintf(str, fmt, args);
91   // toss args
92   va_end(args);
93   return size;
94 }
95
96 #endif
97
98 struct str_list {
99   int nb_strs;
100   char **strs;
101 };
102
103 struct sensu_host {
104   char *name;
105   char *event_service_prefix;
106   struct str_list metric_handlers;
107   struct str_list notification_handlers;
108 #define F_READY 0x01
109   uint8_t flags;
110   pthread_mutex_t lock;
111   _Bool notifications;
112   _Bool metrics;
113   _Bool store_rates;
114   _Bool always_append_ds;
115   char *separator;
116   char *node;
117   char *service;
118   int s;
119   struct addrinfo *res;
120   int reference_count;
121 };
122
123 static char *sensu_tags = NULL;
124 static char **sensu_attrs = NULL;
125 static size_t sensu_attrs_num;
126
127 static int add_str_to_list(struct str_list *strs,
128                            const char *str_to_add) /* {{{ */
129 {
130   char **old_strs_ptr = strs->strs;
131   char *newstr = strdup(str_to_add);
132   if (newstr == NULL) {
133     ERROR("write_sensu plugin: Unable to alloc memory");
134     return -1;
135   }
136   strs->strs = realloc(strs->strs, strs->nb_strs + 1);
137   if (strs->strs == NULL) {
138     strs->strs = old_strs_ptr;
139     free(newstr);
140     ERROR("write_sensu plugin: Unable to alloc memory");
141     return -1;
142   }
143   strs->strs[strs->nb_strs] = newstr;
144   strs->nb_strs++;
145   return 0;
146 }
147 /* }}} int add_str_to_list */
148
149 static void free_str_list(struct str_list *strs) /* {{{ */
150 {
151   for (int i = 0; i < strs->nb_strs; i++)
152     free(strs->strs[i]);
153   free(strs->strs);
154 }
155 /* }}} void free_str_list */
156
157 static int sensu_connect(struct sensu_host *host) /* {{{ */
158 {
159   int e;
160   char const *node;
161   char const *service;
162
163   // Resolve the target if we haven't done already
164   if (!(host->flags & F_READY)) {
165     memset(&service, 0, sizeof(service));
166     host->res = NULL;
167
168     node = (host->node != NULL) ? host->node : SENSU_HOST;
169     service = (host->service != NULL) ? host->service : SENSU_PORT;
170
171     struct addrinfo ai_hints = {.ai_family = AF_INET,
172                                 .ai_flags = AI_ADDRCONFIG,
173                                 .ai_socktype = SOCK_STREAM};
174
175     if ((e = getaddrinfo(node, service, &ai_hints, &(host->res))) != 0) {
176       ERROR("write_sensu plugin: Unable to resolve host \"%s\": %s", node,
177             gai_strerror(e));
178       return -1;
179     }
180     DEBUG("write_sensu plugin: successfully resolved host/port: %s/%s", node,
181           service);
182     host->flags |= F_READY;
183   }
184
185   struct linger so_linger;
186   host->s = -1;
187   for (struct addrinfo *ai = host->res; ai != NULL; ai = ai->ai_next) {
188     // create the socket
189     if ((host->s = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) ==
190         -1) {
191       continue;
192     }
193
194     // Set very low close() lingering
195     so_linger.l_onoff = 1;
196     so_linger.l_linger = 3;
197     if (setsockopt(host->s, SOL_SOCKET, SO_LINGER, &so_linger,
198                    sizeof so_linger) != 0)
199       WARNING("write_sensu plugin: failed to set socket close() lingering");
200
201     set_sock_opts(host->s);
202
203     // connect the socket
204     if (connect(host->s, ai->ai_addr, ai->ai_addrlen) != 0) {
205       close(host->s);
206       host->s = -1;
207       continue;
208     }
209     DEBUG("write_sensu plugin: connected");
210     break;
211   }
212
213   if (host->s < 0) {
214     WARNING("write_sensu plugin: Unable to connect to sensu client");
215     return -1;
216   }
217   return 0;
218 } /* }}} int sensu_connect */
219
220 static void sensu_close_socket(struct sensu_host *host) /* {{{ */
221 {
222   if (host->s != -1)
223     close(host->s);
224   host->s = -1;
225
226 } /* }}} void sensu_close_socket */
227
228 static char *build_json_str_list(const char *tag,
229                                  struct str_list const *list) /* {{{ */
230 {
231   int res;
232   char *ret_str = NULL;
233   char *temp_str;
234   if (list->nb_strs == 0) {
235     ret_str = malloc(1);
236     if (ret_str == NULL) {
237       ERROR("write_sensu plugin: Unable to alloc memory");
238       return NULL;
239     }
240     ret_str[0] = '\0';
241   }
242
243   res = my_asprintf(&temp_str, "\"%s\": [\"%s\"", tag, list->strs[0]);
244   if (res == -1) {
245     ERROR("write_sensu plugin: Unable to alloc memory");
246     free(ret_str);
247     return NULL;
248   }
249   for (int i = 1; i < list->nb_strs; i++) {
250     res = my_asprintf(&ret_str, "%s, \"%s\"", temp_str, list->strs[i]);
251     free(temp_str);
252     if (res == -1) {
253       ERROR("write_sensu plugin: Unable to alloc memory");
254       return NULL;
255     }
256     temp_str = ret_str;
257   }
258   res = my_asprintf(&ret_str, "%s]", temp_str);
259   free(temp_str);
260   if (res == -1) {
261     ERROR("write_sensu plugin: Unable to alloc memory");
262     return NULL;
263   }
264
265   return ret_str;
266 } /* }}} char *build_json_str_list*/
267
268 static int sensu_format_name2(char *ret, int ret_len, const char *hostname,
269                               const char *plugin, const char *plugin_instance,
270                               const char *type, const char *type_instance,
271                               const char *separator) {
272   char *buffer;
273   size_t buffer_size;
274
275   buffer = ret;
276   buffer_size = (size_t)ret_len;
277
278 #define APPEND(str)                                                            \
279   do {                                                                         \
280     size_t l = strlen(str);                                                    \
281     if (l >= buffer_size)                                                      \
282       return ENOBUFS;                                                          \
283     memcpy(buffer, (str), l);                                                  \
284     buffer += l;                                                               \
285     buffer_size -= l;                                                          \
286   } while (0)
287
288   assert(plugin != NULL);
289   assert(type != NULL);
290
291   APPEND(hostname);
292   APPEND(separator);
293   APPEND(plugin);
294   if ((plugin_instance != NULL) && (plugin_instance[0] != 0)) {
295     APPEND("-");
296     APPEND(plugin_instance);
297   }
298   APPEND(separator);
299   APPEND(type);
300   if ((type_instance != NULL) && (type_instance[0] != 0)) {
301     APPEND("-");
302     APPEND(type_instance);
303   }
304   assert(buffer_size > 0);
305   buffer[0] = 0;
306
307 #undef APPEND
308   return 0;
309 } /* int sensu_format_name2 */
310
311 static void in_place_replace_sensu_name_reserved(char *orig_name) /* {{{ */
312 {
313   int len = strlen(orig_name);
314   for (int i = 0; i < len; i++) {
315     // some plugins like ipmi generate special characters in metric name
316     switch (orig_name[i]) {
317     case '(':
318       orig_name[i] = '_';
319       break;
320     case ')':
321       orig_name[i] = '_';
322       break;
323     case ' ':
324       orig_name[i] = '_';
325       break;
326     case '"':
327       orig_name[i] = '_';
328       break;
329     case '\'':
330       orig_name[i] = '_';
331       break;
332     case '+':
333       orig_name[i] = '_';
334       break;
335     }
336   }
337 } /* }}} char *replace_sensu_name_reserved */
338
339 static char *sensu_value_to_json(struct sensu_host const *host, /* {{{ */
340                                  data_set_t const *ds, value_list_t const *vl,
341                                  size_t index, gauge_t const *rates,
342                                  int status) {
343   char name_buffer[5 * DATA_MAX_NAME_LEN];
344   char service_buffer[6 * DATA_MAX_NAME_LEN];
345   char *ret_str;
346   char *temp_str;
347   char *value_str;
348   int res;
349   // First part of the JSON string
350   const char *part1 = "{\"name\": \"collectd\", \"type\": \"metric\"";
351
352   char *handlers_str =
353       build_json_str_list("handlers", &(host->metric_handlers));
354   if (handlers_str == NULL) {
355     ERROR("write_sensu plugin: Unable to alloc memory");
356     return NULL;
357   }
358
359   // incorporate the handlers
360   if (strlen(handlers_str) == 0) {
361     free(handlers_str);
362     ret_str = strdup(part1);
363     if (ret_str == NULL) {
364       ERROR("write_sensu plugin: Unable to alloc memory");
365       return NULL;
366     }
367   } else {
368     res = my_asprintf(&ret_str, "%s, %s", part1, handlers_str);
369     free(handlers_str);
370     if (res == -1) {
371       ERROR("write_sensu plugin: Unable to alloc memory");
372       return NULL;
373     }
374   }
375
376   // incorporate the plugin name information
377   res = my_asprintf(&temp_str, "%s, \"collectd_plugin\": \"%s\"", ret_str,
378                     vl->plugin);
379   free(ret_str);
380   if (res == -1) {
381     ERROR("write_sensu plugin: Unable to alloc memory");
382     return NULL;
383   }
384   ret_str = temp_str;
385
386   // incorporate the plugin type
387   res = my_asprintf(&temp_str, "%s, \"collectd_plugin_type\": \"%s\"", ret_str,
388                     vl->type);
389   free(ret_str);
390   if (res == -1) {
391     ERROR("write_sensu plugin: Unable to alloc memory");
392     return NULL;
393   }
394   ret_str = temp_str;
395
396   // incorporate the plugin instance if any
397   if (vl->plugin_instance[0] != 0) {
398     res = my_asprintf(&temp_str, "%s, \"collectd_plugin_instance\": \"%s\"",
399                       ret_str, vl->plugin_instance);
400     free(ret_str);
401     if (res == -1) {
402       ERROR("write_sensu plugin: Unable to alloc memory");
403       return NULL;
404     }
405     ret_str = temp_str;
406   }
407
408   // incorporate the plugin type instance if any
409   if (vl->type_instance[0] != 0) {
410     res =
411         my_asprintf(&temp_str, "%s, \"collectd_plugin_type_instance\": \"%s\"",
412                     ret_str, vl->type_instance);
413     free(ret_str);
414     if (res == -1) {
415       ERROR("write_sensu plugin: Unable to alloc memory");
416       return NULL;
417     }
418     ret_str = temp_str;
419   }
420
421   // incorporate the data source type
422   if ((ds->ds[index].type != DS_TYPE_GAUGE) && (rates != NULL)) {
423     char ds_type[DATA_MAX_NAME_LEN];
424     snprintf(ds_type, sizeof(ds_type), "%s:rate",
425              DS_TYPE_TO_STRING(ds->ds[index].type));
426     res = my_asprintf(&temp_str, "%s, \"collectd_data_source_type\": \"%s\"",
427                       ret_str, ds_type);
428     free(ret_str);
429     if (res == -1) {
430       ERROR("write_sensu plugin: Unable to alloc memory");
431       return NULL;
432     }
433     ret_str = temp_str;
434   } else {
435     res = my_asprintf(&temp_str, "%s, \"collectd_data_source_type\": \"%s\"",
436                       ret_str, DS_TYPE_TO_STRING(ds->ds[index].type));
437     free(ret_str);
438     if (res == -1) {
439       ERROR("write_sensu plugin: Unable to alloc memory");
440       return NULL;
441     }
442     ret_str = temp_str;
443   }
444
445   // incorporate the data source name
446   res = my_asprintf(&temp_str, "%s, \"collectd_data_source_name\": \"%s\"",
447                     ret_str, ds->ds[index].name);
448   free(ret_str);
449   if (res == -1) {
450     ERROR("write_sensu plugin: Unable to alloc memory");
451     return NULL;
452   }
453   ret_str = temp_str;
454
455   // incorporate the data source index
456   {
457     char ds_index[DATA_MAX_NAME_LEN];
458     snprintf(ds_index, sizeof(ds_index), "%zu", index);
459     res = my_asprintf(&temp_str, "%s, \"collectd_data_source_index\": %s",
460                       ret_str, ds_index);
461     free(ret_str);
462     if (res == -1) {
463       ERROR("write_sensu plugin: Unable to alloc memory");
464       return NULL;
465     }
466     ret_str = temp_str;
467   }
468
469   // add key value attributes from config if any
470   for (size_t i = 0; i < sensu_attrs_num; i += 2) {
471     res = my_asprintf(&temp_str, "%s, \"%s\": \"%s\"", ret_str, sensu_attrs[i],
472                       sensu_attrs[i + 1]);
473     free(ret_str);
474     if (res == -1) {
475       ERROR("write_sensu plugin: Unable to alloc memory");
476       return NULL;
477     }
478     ret_str = temp_str;
479   }
480
481   // incorporate sensu tags from config if any
482   if ((sensu_tags != NULL) && (strlen(sensu_tags) != 0)) {
483     res = my_asprintf(&temp_str, "%s, %s", ret_str, sensu_tags);
484     free(ret_str);
485     if (res == -1) {
486       ERROR("write_sensu plugin: Unable to alloc memory");
487       return NULL;
488     }
489     ret_str = temp_str;
490   }
491
492   // calculate the value and set to a string
493   if (ds->ds[index].type == DS_TYPE_GAUGE) {
494     res = my_asprintf(&value_str, GAUGE_FORMAT, vl->values[index].gauge);
495     if (res == -1) {
496       free(ret_str);
497       ERROR("write_sensu plugin: Unable to alloc memory");
498       return NULL;
499     }
500   } else if (rates != NULL) {
501     res = my_asprintf(&value_str, GAUGE_FORMAT, rates[index]);
502     if (res == -1) {
503       free(ret_str);
504       ERROR("write_sensu plugin: Unable to alloc memory");
505       return NULL;
506     }
507   } else {
508     if (ds->ds[index].type == DS_TYPE_DERIVE) {
509       res = my_asprintf(&value_str, "%" PRIi64, vl->values[index].derive);
510       if (res == -1) {
511         free(ret_str);
512         ERROR("write_sensu plugin: Unable to alloc memory");
513         return NULL;
514       }
515     } else if (ds->ds[index].type == DS_TYPE_ABSOLUTE) {
516       res = my_asprintf(&value_str, "%" PRIu64, vl->values[index].absolute);
517       if (res == -1) {
518         free(ret_str);
519         ERROR("write_sensu plugin: Unable to alloc memory");
520         return NULL;
521       }
522     } else {
523       res = my_asprintf(&value_str, "%llu", vl->values[index].counter);
524       if (res == -1) {
525         free(ret_str);
526         ERROR("write_sensu plugin: Unable to alloc memory");
527         return NULL;
528       }
529     }
530   }
531
532   // Generate the full service name
533   sensu_format_name2(name_buffer, sizeof(name_buffer), vl->host, vl->plugin,
534                      vl->plugin_instance, vl->type, vl->type_instance,
535                      host->separator);
536   if (host->always_append_ds || (ds->ds_num > 1)) {
537     if (host->event_service_prefix == NULL)
538       snprintf(service_buffer, sizeof(service_buffer), "%s.%s", name_buffer,
539                ds->ds[index].name);
540     else
541       snprintf(service_buffer, sizeof(service_buffer), "%s%s.%s",
542                host->event_service_prefix, name_buffer, ds->ds[index].name);
543   } else {
544     if (host->event_service_prefix == NULL)
545       sstrncpy(service_buffer, name_buffer, sizeof(service_buffer));
546     else
547       snprintf(service_buffer, sizeof(service_buffer), "%s%s",
548                host->event_service_prefix, name_buffer);
549   }
550
551   // Replace collectd sensor name reserved characters so that time series DB is
552   // happy
553   in_place_replace_sensu_name_reserved(service_buffer);
554
555   // finalize the buffer by setting the output and closing curly bracket
556   res = my_asprintf(&temp_str, "%s, \"output\": \"%s %s %lld\"}\n", ret_str,
557                     service_buffer, value_str,
558                     (long long)CDTIME_T_TO_TIME_T(vl->time));
559   free(ret_str);
560   free(value_str);
561   if (res == -1) {
562     ERROR("write_sensu plugin: Unable to alloc memory");
563     return NULL;
564   }
565   ret_str = temp_str;
566
567   DEBUG("write_sensu plugin: Successfully created json for metric: "
568         "host = \"%s\", service = \"%s\"",
569         vl->host, service_buffer);
570   return ret_str;
571 } /* }}} char *sensu_value_to_json */
572
573 /*
574  * Uses replace_str2() implementation from
575  * http://creativeandcritical.net/str-replace-c/
576  * copyright (c) Laird Shaw, under public domain.
577  */
578 static char *replace_str(const char *str, const char *old, /* {{{ */
579                          const char *new) {
580   char *ret, *r;
581   const char *p, *q;
582   size_t oldlen = strlen(old);
583   size_t count = strlen(new);
584   size_t retlen;
585   size_t newlen = count;
586   int samesize = (oldlen == newlen);
587
588   if (!samesize) {
589     for (count = 0, p = str; (q = strstr(p, old)) != NULL; p = q + oldlen)
590       count++;
591     /* This is undefined if p - str > PTRDIFF_MAX */
592     retlen = p - str + strlen(p) + count * (newlen - oldlen);
593   } else
594     retlen = strlen(str);
595
596   ret = calloc(1, retlen + 1);
597   if (ret == NULL)
598     return NULL;
599   // added to original: not optimized, but keeps valgrind happy.
600
601   r = ret;
602   p = str;
603   while (1) {
604     /* If the old and new strings are different lengths - in other
605      * words we have already iterated through with strstr above,
606      * and thus we know how many times we need to call it - then we
607      * can avoid the final (potentially lengthy) call to strstr,
608      * which we already know is going to return NULL, by
609      * decrementing and checking count.
610      */
611     if (!samesize && !count--)
612       break;
613     /* Otherwise i.e. when the old and new strings are the same
614      * length, and we don't know how many times to call strstr,
615      * we must check for a NULL return here (we check it in any
616      * event, to avoid further conditions, and because there's
617      * no harm done with the check even when the old and new
618      * strings are different lengths).
619      */
620     if ((q = strstr(p, old)) == NULL)
621       break;
622     /* This is undefined if q - p > PTRDIFF_MAX */
623     ptrdiff_t l = q - p;
624     memcpy(r, p, l);
625     r += l;
626     memcpy(r, new, newlen);
627     r += newlen;
628     p = q + oldlen;
629   }
630   strncpy(r, p, strlen(p));
631
632   return ret;
633 } /* }}} char *replace_str */
634
635 static char *replace_json_reserved(const char *message) /* {{{ */
636 {
637   char *msg = replace_str(message, "\\", "\\\\");
638   if (msg == NULL) {
639     ERROR("write_sensu plugin: Unable to alloc memory");
640     return NULL;
641   }
642   char *tmp = replace_str(msg, "\"", "\\\"");
643   free(msg);
644   if (tmp == NULL) {
645     ERROR("write_sensu plugin: Unable to alloc memory");
646     return NULL;
647   }
648   msg = replace_str(tmp, "\n", "\\\n");
649   free(tmp);
650   if (msg == NULL) {
651     ERROR("write_sensu plugin: Unable to alloc memory");
652     return NULL;
653   }
654   return msg;
655 } /* }}} char *replace_json_reserved */
656
657 static char *sensu_notification_to_json(struct sensu_host *host, /* {{{ */
658                                         notification_t const *n) {
659   char service_buffer[6 * DATA_MAX_NAME_LEN];
660   char const *severity;
661   char *ret_str;
662   char *temp_str;
663   int status;
664   size_t i;
665   int res;
666   // add the severity/status
667   switch (n->severity) {
668   case NOTIF_OKAY:
669     severity = "OK";
670     status = 0;
671     break;
672   case NOTIF_WARNING:
673     severity = "WARNING";
674     status = 1;
675     break;
676   case NOTIF_FAILURE:
677     severity = "CRITICAL";
678     status = 2;
679     break;
680   default:
681     severity = "UNKNOWN";
682     status = 3;
683   }
684   res = my_asprintf(&temp_str, "{\"status\": %d", status);
685   if (res == -1) {
686     ERROR("write_sensu plugin: Unable to alloc memory");
687     return NULL;
688   }
689   ret_str = temp_str;
690
691   // incorporate the timestamp
692   res = my_asprintf(&temp_str, "%s, \"timestamp\": %lld", ret_str,
693                     (long long)CDTIME_T_TO_TIME_T(n->time));
694   free(ret_str);
695   if (res == -1) {
696     ERROR("write_sensu plugin: Unable to alloc memory");
697     return NULL;
698   }
699   ret_str = temp_str;
700
701   char *handlers_str =
702       build_json_str_list("handlers", &(host->notification_handlers));
703   if (handlers_str == NULL) {
704     ERROR("write_sensu plugin: Unable to alloc memory");
705     free(ret_str);
706     return NULL;
707   }
708   // incorporate the handlers
709   if (strlen(handlers_str) != 0) {
710     res = my_asprintf(&temp_str, "%s, %s", ret_str, handlers_str);
711     free(ret_str);
712     free(handlers_str);
713     if (res == -1) {
714       ERROR("write_sensu plugin: Unable to alloc memory");
715       return NULL;
716     }
717     ret_str = temp_str;
718   } else {
719     free(handlers_str);
720   }
721
722   // incorporate the plugin name information if any
723   if (n->plugin[0] != 0) {
724     res = my_asprintf(&temp_str, "%s, \"collectd_plugin\": \"%s\"", ret_str,
725                       n->plugin);
726     free(ret_str);
727     if (res == -1) {
728       ERROR("write_sensu plugin: Unable to alloc memory");
729       return NULL;
730     }
731     ret_str = temp_str;
732   }
733
734   // incorporate the plugin type if any
735   if (n->type[0] != 0) {
736     res = my_asprintf(&temp_str, "%s, \"collectd_plugin_type\": \"%s\"",
737                       ret_str, n->type);
738     free(ret_str);
739     if (res == -1) {
740       ERROR("write_sensu plugin: Unable to alloc memory");
741       return NULL;
742     }
743     ret_str = temp_str;
744   }
745
746   // incorporate the plugin instance if any
747   if (n->plugin_instance[0] != 0) {
748     res = my_asprintf(&temp_str, "%s, \"collectd_plugin_instance\": \"%s\"",
749                       ret_str, n->plugin_instance);
750     free(ret_str);
751     if (res == -1) {
752       ERROR("write_sensu plugin: Unable to alloc memory");
753       return NULL;
754     }
755     ret_str = temp_str;
756   }
757
758   // incorporate the plugin type instance if any
759   if (n->type_instance[0] != 0) {
760     res =
761         my_asprintf(&temp_str, "%s, \"collectd_plugin_type_instance\": \"%s\"",
762                     ret_str, n->type_instance);
763     free(ret_str);
764     if (res == -1) {
765       ERROR("write_sensu plugin: Unable to alloc memory");
766       return NULL;
767     }
768     ret_str = temp_str;
769   }
770
771   // add key value attributes from config if any
772   for (i = 0; i < sensu_attrs_num; i += 2) {
773     res = my_asprintf(&temp_str, "%s, \"%s\": \"%s\"", ret_str, sensu_attrs[i],
774                       sensu_attrs[i + 1]);
775     free(ret_str);
776     if (res == -1) {
777       ERROR("write_sensu plugin: Unable to alloc memory");
778       return NULL;
779     }
780     ret_str = temp_str;
781   }
782
783   // incorporate sensu tags from config if any
784   if ((sensu_tags != NULL) && (strlen(sensu_tags) != 0)) {
785     res = my_asprintf(&temp_str, "%s, %s", ret_str, sensu_tags);
786     free(ret_str);
787     if (res == -1) {
788       ERROR("write_sensu plugin: Unable to alloc memory");
789       return NULL;
790     }
791     ret_str = temp_str;
792   }
793
794   // incorporate the service name
795   sensu_format_name2(service_buffer, sizeof(service_buffer),
796                      /* host */ "", n->plugin, n->plugin_instance, n->type,
797                      n->type_instance, host->separator);
798   // replace sensu event name chars that are considered illegal
799   in_place_replace_sensu_name_reserved(service_buffer);
800   res = my_asprintf(&temp_str, "%s, \"name\": \"%s\"", ret_str,
801                     &service_buffer[1]);
802   free(ret_str);
803   if (res == -1) {
804     ERROR("write_sensu plugin: Unable to alloc memory");
805     return NULL;
806   }
807   ret_str = temp_str;
808
809   // incorporate the check output
810   if (n->message[0] != 0) {
811     char *msg = replace_json_reserved(n->message);
812     if (msg == NULL) {
813       ERROR("write_sensu plugin: Unable to alloc memory");
814       free(ret_str);
815       return NULL;
816     }
817     res = my_asprintf(&temp_str, "%s, \"output\": \"%s - %s\"", ret_str,
818                       severity, msg);
819     free(ret_str);
820     free(msg);
821     if (res == -1) {
822       ERROR("write_sensu plugin: Unable to alloc memory");
823       return NULL;
824     }
825     ret_str = temp_str;
826   }
827
828   // Pull in values from threshold and add extra attributes
829   for (notification_meta_t *meta = n->meta; meta != NULL; meta = meta->next) {
830     if (strcasecmp("CurrentValue", meta->name) == 0 &&
831         meta->type == NM_TYPE_DOUBLE) {
832       res = my_asprintf(&temp_str, "%s, \"current_value\": \"%.8f\"", ret_str,
833                         meta->nm_value.nm_double);
834       free(ret_str);
835       if (res == -1) {
836         ERROR("write_sensu plugin: Unable to alloc memory");
837         return NULL;
838       }
839       ret_str = temp_str;
840     }
841     if (meta->type == NM_TYPE_STRING) {
842       res = my_asprintf(&temp_str, "%s, \"%s\": \"%s\"", ret_str, meta->name,
843                         meta->nm_value.nm_string);
844       free(ret_str);
845       if (res == -1) {
846         ERROR("write_sensu plugin: Unable to alloc memory");
847         return NULL;
848       }
849       ret_str = temp_str;
850     }
851   }
852
853   // close the curly bracket
854   res = my_asprintf(&temp_str, "%s}\n", ret_str);
855   free(ret_str);
856   if (res == -1) {
857     ERROR("write_sensu plugin: Unable to alloc memory");
858     return NULL;
859   }
860   ret_str = temp_str;
861
862   DEBUG("write_sensu plugin: Successfully created JSON for notification: "
863         "host = \"%s\", service = \"%s\", state = \"%s\"",
864         n->host, service_buffer, severity);
865   return ret_str;
866 } /* }}} char *sensu_notification_to_json */
867
868 static int sensu_send_msg(struct sensu_host *host, const char *msg) /* {{{ */
869 {
870   int status = 0;
871   size_t buffer_len;
872
873   status = sensu_connect(host);
874   if (status != 0)
875     return status;
876
877   buffer_len = strlen(msg);
878
879   status = (int)swrite(host->s, msg, buffer_len);
880   sensu_close_socket(host);
881
882   if (status != 0) {
883     char errbuf[1024];
884     ERROR("write_sensu plugin: Sending to Sensu at %s:%s failed: %s",
885           (host->node != NULL) ? host->node : SENSU_HOST,
886           (host->service != NULL) ? host->service : SENSU_PORT,
887           sstrerror(errno, errbuf, sizeof(errbuf)));
888     return -1;
889   }
890
891   return 0;
892 } /* }}} int sensu_send_msg */
893
894 static int sensu_send(struct sensu_host *host, char const *msg) /* {{{ */
895 {
896   int status = 0;
897
898   status = sensu_send_msg(host, msg);
899   if (status != 0) {
900     host->flags &= ~F_READY;
901     if (host->res != NULL) {
902       freeaddrinfo(host->res);
903       host->res = NULL;
904     }
905     return status;
906   }
907
908   return 0;
909 } /* }}} int sensu_send */
910
911 static int sensu_write(const data_set_t *ds, /* {{{ */
912                        const value_list_t *vl, user_data_t *ud) {
913   int status = 0;
914   int statuses[vl->values_len];
915   struct sensu_host *host = ud->data;
916   gauge_t *rates = NULL;
917   char *msg;
918
919   pthread_mutex_lock(&host->lock);
920   memset(statuses, 0, vl->values_len * sizeof(*statuses));
921
922   if (host->store_rates) {
923     rates = uc_get_rate(ds, vl);
924     if (rates == NULL) {
925       ERROR("write_sensu plugin: uc_get_rate failed.");
926       pthread_mutex_unlock(&host->lock);
927       return -1;
928     }
929   }
930   for (size_t i = 0; i < vl->values_len; i++) {
931     msg = sensu_value_to_json(host, ds, vl, (int)i, rates, statuses[i]);
932     if (msg == NULL) {
933       sfree(rates);
934       pthread_mutex_unlock(&host->lock);
935       return -1;
936     }
937     status = sensu_send(host, msg);
938     free(msg);
939     if (status != 0) {
940       ERROR("write_sensu plugin: sensu_send failed with status %i", status);
941       pthread_mutex_unlock(&host->lock);
942       sfree(rates);
943       return status;
944     }
945   }
946   sfree(rates);
947   pthread_mutex_unlock(&host->lock);
948   return status;
949 } /* }}} int sensu_write */
950
951 static int sensu_notification(const notification_t *n,
952                               user_data_t *ud) /* {{{ */
953 {
954   int status;
955   struct sensu_host *host = ud->data;
956   char *msg;
957
958   pthread_mutex_lock(&host->lock);
959
960   msg = sensu_notification_to_json(host, n);
961   if (msg == NULL) {
962     pthread_mutex_unlock(&host->lock);
963     return -1;
964   }
965
966   status = sensu_send(host, msg);
967   free(msg);
968   if (status != 0)
969     ERROR("write_sensu plugin: sensu_send failed with status %i", status);
970   pthread_mutex_unlock(&host->lock);
971
972   return status;
973 } /* }}} int sensu_notification */
974
975 static void sensu_free(void *p) /* {{{ */
976 {
977   struct sensu_host *host = p;
978
979   if (host == NULL)
980     return;
981
982   pthread_mutex_lock(&host->lock);
983
984   host->reference_count--;
985   if (host->reference_count > 0) {
986     pthread_mutex_unlock(&host->lock);
987     return;
988   }
989
990   sensu_close_socket(host);
991   if (host->res != NULL) {
992     freeaddrinfo(host->res);
993     host->res = NULL;
994   }
995   sfree(host->service);
996   sfree(host->event_service_prefix);
997   sfree(host->name);
998   sfree(host->node);
999   sfree(host->separator);
1000   free_str_list(&(host->metric_handlers));
1001   free_str_list(&(host->notification_handlers));
1002
1003   pthread_mutex_unlock(&host->lock);
1004   pthread_mutex_destroy(&host->lock);
1005
1006   sfree(host);
1007 } /* }}} void sensu_free */
1008
1009 static int sensu_config_node(oconfig_item_t *ci) /* {{{ */
1010 {
1011   struct sensu_host *host = NULL;
1012   int status = 0;
1013   oconfig_item_t *child;
1014   char callback_name[DATA_MAX_NAME_LEN];
1015
1016   if ((host = calloc(1, sizeof(*host))) == NULL) {
1017     ERROR("write_sensu plugin: calloc failed.");
1018     return ENOMEM;
1019   }
1020   pthread_mutex_init(&host->lock, NULL);
1021   host->reference_count = 1;
1022   host->node = NULL;
1023   host->service = NULL;
1024   host->notifications = 0;
1025   host->metrics = 0;
1026   host->store_rates = 1;
1027   host->always_append_ds = 0;
1028   host->metric_handlers.nb_strs = 0;
1029   host->metric_handlers.strs = NULL;
1030   host->notification_handlers.nb_strs = 0;
1031   host->notification_handlers.strs = NULL;
1032   host->separator = strdup("/");
1033   if (host->separator == NULL) {
1034     ERROR("write_sensu plugin: Unable to alloc memory");
1035     sensu_free(host);
1036     return -1;
1037   }
1038
1039   status = cf_util_get_string(ci, &host->name);
1040   if (status != 0) {
1041     WARNING("write_sensu plugin: Required host name is missing.");
1042     sensu_free(host);
1043     return -1;
1044   }
1045
1046   for (int i = 0; i < ci->children_num; i++) {
1047     child = &ci->children[i];
1048     status = 0;
1049
1050     if (strcasecmp("Host", child->key) == 0) {
1051       status = cf_util_get_string(child, &host->node);
1052       if (status != 0)
1053         break;
1054     } else if (strcasecmp("Notifications", child->key) == 0) {
1055       status = cf_util_get_boolean(child, &host->notifications);
1056       if (status != 0)
1057         break;
1058     } else if (strcasecmp("Metrics", child->key) == 0) {
1059       status = cf_util_get_boolean(child, &host->metrics);
1060       if (status != 0)
1061         break;
1062     } else if (strcasecmp("EventServicePrefix", child->key) == 0) {
1063       status = cf_util_get_string(child, &host->event_service_prefix);
1064       if (status != 0)
1065         break;
1066     } else if (strcasecmp("Separator", child->key) == 0) {
1067       status = cf_util_get_string(child, &host->separator);
1068       if (status != 0)
1069         break;
1070     } else if (strcasecmp("MetricHandler", child->key) == 0) {
1071       char *temp_str = NULL;
1072       status = cf_util_get_string(child, &temp_str);
1073       if (status != 0)
1074         break;
1075       status = add_str_to_list(&(host->metric_handlers), temp_str);
1076       free(temp_str);
1077       if (status != 0)
1078         break;
1079     } else if (strcasecmp("NotificationHandler", child->key) == 0) {
1080       char *temp_str = NULL;
1081       status = cf_util_get_string(child, &temp_str);
1082       if (status != 0)
1083         break;
1084       status = add_str_to_list(&(host->notification_handlers), temp_str);
1085       free(temp_str);
1086       if (status != 0)
1087         break;
1088     } else if (strcasecmp("Port", child->key) == 0) {
1089       status = cf_util_get_service(child, &host->service);
1090       if (status != 0) {
1091         ERROR("write_sensu plugin: Invalid argument "
1092               "configured for the \"Port\" "
1093               "option.");
1094         break;
1095       }
1096     } else if (strcasecmp("StoreRates", child->key) == 0) {
1097       status = cf_util_get_boolean(child, &host->store_rates);
1098       if (status != 0)
1099         break;
1100     } else if (strcasecmp("AlwaysAppendDS", child->key) == 0) {
1101       status = cf_util_get_boolean(child, &host->always_append_ds);
1102       if (status != 0)
1103         break;
1104     } else {
1105       WARNING("write_sensu plugin: ignoring unknown config "
1106               "option: \"%s\"",
1107               child->key);
1108     }
1109   }
1110   if (status != 0) {
1111     sensu_free(host);
1112     return status;
1113   }
1114
1115   if (host->metrics && (host->metric_handlers.nb_strs == 0)) {
1116     sensu_free(host);
1117     WARNING("write_sensu plugin: metrics enabled but no MetricHandler defined. "
1118             "Giving up.");
1119     return -1;
1120   }
1121
1122   if (host->notifications && (host->notification_handlers.nb_strs == 0)) {
1123     sensu_free(host);
1124     WARNING("write_sensu plugin: notifications enabled but no "
1125             "NotificationHandler defined. Giving up.");
1126     return -1;
1127   }
1128
1129   if ((host->notification_handlers.nb_strs > 0) && (host->notifications == 0)) {
1130     WARNING("write_sensu plugin: NotificationHandler given so forcing "
1131             "notifications to be enabled");
1132     host->notifications = 1;
1133   }
1134
1135   if ((host->metric_handlers.nb_strs > 0) && (host->metrics == 0)) {
1136     WARNING("write_sensu plugin: MetricHandler given so forcing metrics to be "
1137             "enabled");
1138     host->metrics = 1;
1139   }
1140
1141   if (!(host->notifications || host->metrics)) {
1142     WARNING("write_sensu plugin: neither metrics nor notifications enabled. "
1143             "Giving up.");
1144     sensu_free(host);
1145     return -1;
1146   }
1147
1148   snprintf(callback_name, sizeof(callback_name), "write_sensu/%s", host->name);
1149
1150   user_data_t ud = {.data = host, .free_func = sensu_free};
1151
1152   pthread_mutex_lock(&host->lock);
1153
1154   if (host->metrics) {
1155     status = plugin_register_write(callback_name, sensu_write, &ud);
1156     if (status != 0)
1157       WARNING("write_sensu plugin: plugin_register_write (\"%s\") "
1158               "failed with status %i.",
1159               callback_name, status);
1160     else /* success */
1161       host->reference_count++;
1162   }
1163
1164   if (host->notifications) {
1165     status =
1166         plugin_register_notification(callback_name, sensu_notification, &ud);
1167     if (status != 0)
1168       WARNING("write_sensu plugin: plugin_register_notification (\"%s\") "
1169               "failed with status %i.",
1170               callback_name, status);
1171     else
1172       host->reference_count++;
1173   }
1174
1175   if (host->reference_count <= 1) {
1176     /* Both callbacks failed => free memory.
1177      * We need to unlock here, because sensu_free() will lock.
1178      * This is not a race condition, because we're the only one
1179      * holding a reference. */
1180     pthread_mutex_unlock(&host->lock);
1181     sensu_free(host);
1182     return -1;
1183   }
1184
1185   host->reference_count--;
1186   pthread_mutex_unlock(&host->lock);
1187
1188   return status;
1189 } /* }}} int sensu_config_node */
1190
1191 static int sensu_config(oconfig_item_t *ci) /* {{{ */
1192 {
1193   oconfig_item_t *child;
1194   int status;
1195   struct str_list sensu_tags_arr;
1196
1197   sensu_tags_arr.nb_strs = 0;
1198   sensu_tags_arr.strs = NULL;
1199
1200   for (int i = 0; i < ci->children_num; i++) {
1201     child = &ci->children[i];
1202
1203     if (strcasecmp("Node", child->key) == 0) {
1204       sensu_config_node(child);
1205     } else if (strcasecmp(child->key, "attribute") == 0) {
1206       if (child->values_num != 2) {
1207         WARNING("sensu attributes need both a key and a value.");
1208         continue;
1209       }
1210       if (child->values[0].type != OCONFIG_TYPE_STRING ||
1211           child->values[1].type != OCONFIG_TYPE_STRING) {
1212         WARNING("sensu attribute needs string arguments.");
1213         continue;
1214       }
1215
1216       strarray_add(&sensu_attrs, &sensu_attrs_num,
1217                    child->values[0].value.string);
1218       strarray_add(&sensu_attrs, &sensu_attrs_num,
1219                    child->values[1].value.string);
1220
1221       DEBUG("write_sensu plugin: New attribute: %s => %s",
1222             child->values[0].value.string, child->values[1].value.string);
1223     } else if (strcasecmp(child->key, "tag") == 0) {
1224       char *tmp = NULL;
1225       status = cf_util_get_string(child, &tmp);
1226       if (status != 0)
1227         continue;
1228
1229       status = add_str_to_list(&sensu_tags_arr, tmp);
1230       sfree(tmp);
1231       if (status != 0)
1232         continue;
1233       DEBUG("write_sensu plugin: Got tag: %s", tmp);
1234     } else {
1235       WARNING("write_sensu plugin: Ignoring unknown "
1236               "configuration option \"%s\" at top level.",
1237               child->key);
1238     }
1239   }
1240   if (sensu_tags_arr.nb_strs > 0) {
1241     sfree(sensu_tags);
1242     sensu_tags = build_json_str_list("tags", &sensu_tags_arr);
1243     free_str_list(&sensu_tags_arr);
1244     if (sensu_tags == NULL) {
1245       ERROR("write_sensu plugin: Unable to alloc memory");
1246       return -1;
1247     }
1248   }
1249   return 0;
1250 } /* }}} int sensu_config */
1251
1252 void module_register(void) {
1253   plugin_register_complex_config("write_sensu", sensu_config);
1254 }