Merge pull request #1843 from xinity/mysql-galera-stats
[collectd.git] / src / write_sensu.c
1 /**
2  * collectd - src/write_sensu.c
3  * Copyright (C) 2015 Fabrice A. Marie
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Fabrice A. Marie <fabrice at kibinlabs.com>
25  */
26
27 #define _GNU_SOURCE
28
29 #include "collectd.h"
30
31 #include "plugin.h"
32 #include "common.h"
33 #include "configfile.h"
34 #include "utils_cache.h"
35 #include <arpa/inet.h>
36 #include <errno.h>
37 #include <netdb.h>
38 #include <inttypes.h>
39 #include <stddef.h>
40
41 #include <stdlib.h>
42 #define SENSU_HOST              "localhost"
43 #define SENSU_PORT              "3030"
44
45 #ifdef HAVE_ASPRINTF
46 #define my_asprintf asprintf
47 #define my_vasprintf vasprintf
48 #else
49 /*
50  * asprintf() is available from Solaris 10 update 11.
51  * For older versions, use asprintf() portable implementation from
52  * https://github.com/littlstar/asprintf.c/blob/master/
53  * copyright (c) 2014 joseph werle <joseph.werle@gmail.com> under MIT license.
54  */
55
56 static int my_vasprintf(char **str, const char *fmt, va_list args) {
57        int size = 0;
58        va_list tmpa;
59        // copy
60        va_copy(tmpa, args);
61        // apply variadic arguments to
62        // sprintf with format to get size
63        size = vsnprintf(NULL, size, fmt, tmpa);
64        // toss args
65        va_end(tmpa);
66        // return -1 to be compliant if
67        // size is less than 0
68        if (size < 0) { return -1; }
69        // alloc with size plus 1 for `\0'
70        *str = (char *) malloc(size + 1);
71        // return -1 to be compliant
72        // if pointer is `NULL'
73        if (NULL == *str) { return -1; }
74        // format string with original
75        // variadic arguments and set new size
76        size = vsprintf(*str, fmt, args);
77        return size;
78 }
79
80 static int my_asprintf(char **str, const char *fmt, ...) {
81        int size = 0;
82        va_list args;
83        // init variadic argumens
84        va_start(args, fmt);
85        // format and get size
86        size = my_vasprintf(str, fmt, args);
87        // toss args
88        va_end(args);
89        return size;
90 }
91
92 #endif
93
94 struct str_list {
95         int nb_strs;
96         char **strs;
97 };
98
99 struct sensu_host {
100         char                    *name;
101         char                    *event_service_prefix;
102         struct str_list metric_handlers;
103         struct str_list notification_handlers;
104 #define F_READY      0x01
105         uint8_t                  flags;
106         pthread_mutex_t  lock;
107         _Bool            notifications;
108         _Bool            metrics;
109         _Bool                    store_rates;
110         _Bool                    always_append_ds;
111         char                    *separator;
112         char                    *node;
113         char                    *service;
114         int              s;
115         struct addrinfo *res;
116         int                          reference_count;
117 };
118
119 static char     *sensu_tags = NULL;
120 static char     **sensu_attrs = NULL;
121 static size_t sensu_attrs_num;
122
123 static int add_str_to_list(struct str_list *strs,
124                 const char *str_to_add) /* {{{ */
125 {
126         char **old_strs_ptr = strs->strs;
127         char *newstr = strdup(str_to_add);
128         if (newstr == NULL) {
129                 ERROR("write_sensu plugin: Unable to alloc memory");
130                 return -1;
131         }
132         strs->strs = realloc(strs->strs, sizeof(char *) *(strs->nb_strs + 1));
133         if (strs->strs == NULL) {
134                 strs->strs = old_strs_ptr;
135                 free(newstr);
136                 ERROR("write_sensu plugin: Unable to alloc memory");
137                 return -1;
138         }
139         strs->strs[strs->nb_strs] = newstr;
140         strs->nb_strs++;
141         return 0;
142 }
143 /* }}} int add_str_to_list */
144
145 static void free_str_list(struct str_list *strs) /* {{{ */
146 {
147         for (int i=0; i<strs->nb_strs; i++)
148                 free(strs->strs[i]);
149         free(strs->strs);
150 }
151 /* }}} void free_str_list */
152
153 static int sensu_connect(struct sensu_host *host) /* {{{ */
154 {
155         int                      e;
156         char const              *node;
157         char const              *service;
158
159         // Resolve the target if we haven't done already
160         if (!(host->flags & F_READY)) {
161                 memset(&service, 0, sizeof(service));
162                 host->res = NULL;
163
164                 node = (host->node != NULL) ? host->node : SENSU_HOST;
165                 service = (host->service != NULL) ? host->service : SENSU_PORT;
166
167                 struct addrinfo ai_hints = {
168                         .ai_family = AF_INET,
169                         .ai_flags = AI_ADDRCONFIG,
170                         .ai_socktype = SOCK_STREAM
171                 };
172
173                 if ((e = getaddrinfo(node, service, &ai_hints, &(host->res))) != 0) {
174                         ERROR("write_sensu plugin: Unable to resolve host \"%s\": %s",
175                                         node, gai_strerror(e));
176                         return -1;
177                 }
178                 DEBUG("write_sensu plugin: successfully resolved host/port: %s/%s",
179                                 node, service);
180                 host->flags |= F_READY;
181         }
182
183         struct linger so_linger;
184         host->s = -1;
185         for (struct addrinfo *ai = host->res; ai != NULL; ai = ai->ai_next) {
186                 // create the socket
187                 if ((host->s = socket(ai->ai_family,
188                                       ai->ai_socktype,
189                                       ai->ai_protocol)) == -1) {
190                         continue;
191                 }
192
193                 // Set very low close() lingering
194                 so_linger.l_onoff = 1;
195                 so_linger.l_linger = 3;
196                 if (setsockopt(host->s, SOL_SOCKET, SO_LINGER, &so_linger, sizeof so_linger) != 0)
197                         WARNING("write_sensu plugin: failed to set socket close() lingering");
198
199                 // connect the socket
200                 if (connect(host->s, ai->ai_addr, ai->ai_addrlen) != 0) {
201                         close(host->s);
202                         host->s = -1;
203                         continue;
204                 }
205                 DEBUG("write_sensu plugin: connected");
206                 break;
207         }
208
209         if (host->s < 0) {
210                 WARNING("write_sensu plugin: Unable to connect to sensu client");
211                 return -1;
212         }
213         return 0;
214 } /* }}} int sensu_connect */
215
216 static void sensu_close_socket(struct sensu_host *host) /* {{{ */
217 {
218         if (host->s != -1)
219                 close(host->s);
220         host->s = -1;
221
222 } /* }}} void sensu_close_socket */
223
224 static char *build_json_str_list(const char *tag, struct str_list const *list) /* {{{ */
225 {
226         int res;
227         char *ret_str = NULL;
228         char *temp_str;
229         if (list->nb_strs == 0) {
230                 ret_str = malloc(sizeof(char));
231                 if (ret_str == NULL) {
232                         ERROR("write_sensu plugin: Unable to alloc memory");
233                         return NULL;
234                 }
235                 ret_str[0] = '\0';
236         }
237
238         res = my_asprintf(&temp_str, "\"%s\": [\"%s\"", tag, list->strs[0]);
239         if (res == -1) {
240                 ERROR("write_sensu plugin: Unable to alloc memory");
241                 free(ret_str);
242                 return NULL;
243         }
244         for (int i=1; i<list->nb_strs; i++) {
245                 res = my_asprintf(&ret_str, "%s, \"%s\"", temp_str, list->strs[i]);
246                 free(temp_str);
247                 if (res == -1) {
248                         ERROR("write_sensu plugin: Unable to alloc memory");
249                         return NULL;
250                 }
251                 temp_str = ret_str;
252         }
253         res = my_asprintf(&ret_str, "%s]", temp_str);
254         free(temp_str);
255         if (res == -1) {
256                 ERROR("write_sensu plugin: Unable to alloc memory");
257                 return NULL;
258         }
259
260         return ret_str;
261 } /* }}} char *build_json_str_list*/
262
263 static int sensu_format_name2(char *ret, int ret_len,
264                 const char *hostname,
265                 const char *plugin, const char *plugin_instance,
266                 const char *type, const char *type_instance,
267                 const char *separator)
268 {
269         char *buffer;
270         size_t buffer_size;
271
272         buffer = ret;
273         buffer_size = (size_t) ret_len;
274
275 #define APPEND(str) do {          \
276         size_t l = strlen (str);        \
277         if (l >= buffer_size)           \
278                 return (ENOBUFS);             \
279         memcpy (buffer, (str), l);      \
280         buffer += l; buffer_size -= l;  \
281 } while (0)
282
283         assert (plugin != NULL);
284         assert (type != NULL);
285
286         APPEND (hostname);
287         APPEND (separator);
288         APPEND (plugin);
289         if ((plugin_instance != NULL) && (plugin_instance[0] != 0))
290         {
291                 APPEND ("-");
292                 APPEND (plugin_instance);
293         }
294         APPEND (separator);
295         APPEND (type);
296         if ((type_instance != NULL) && (type_instance[0] != 0))
297         {
298                 APPEND ("-");
299                 APPEND (type_instance);
300         }
301         assert (buffer_size > 0);
302         buffer[0] = 0;
303
304 #undef APPEND
305         return (0);
306 } /* int sensu_format_name2 */
307
308 static void in_place_replace_sensu_name_reserved(char *orig_name) /* {{{ */
309 {
310         int len=strlen(orig_name);
311         for (int i=0; i<len; i++) {
312                 // some plugins like ipmi generate special characters in metric name
313                 switch(orig_name[i]) {
314                         case '(': orig_name[i] = '_'; break;
315                         case ')': orig_name[i] = '_'; break;
316                         case ' ': orig_name[i] = '_'; break;
317                         case '"': orig_name[i] = '_'; break;
318                         case '\'': orig_name[i] = '_'; break;
319                         case '+': orig_name[i] = '_'; break;
320                 }
321         }
322 } /* }}} char *replace_sensu_name_reserved */
323
324 static char *sensu_value_to_json(struct sensu_host const *host, /* {{{ */
325                 data_set_t const *ds,
326                 value_list_t const *vl, size_t index,
327                 gauge_t const *rates,
328                 int status)
329 {
330         char name_buffer[5 * DATA_MAX_NAME_LEN];
331         char service_buffer[6 * DATA_MAX_NAME_LEN];
332         char *ret_str;
333         char *temp_str;
334         char *value_str;
335         int res;
336         // First part of the JSON string
337         const char *part1 = "{\"name\": \"collectd\", \"type\": \"metric\"";
338
339         char *handlers_str = build_json_str_list("handlers", &(host->metric_handlers));
340         if (handlers_str == NULL) {
341                 ERROR("write_sensu plugin: Unable to alloc memory");
342                 return NULL;
343         }
344
345         // incorporate the handlers
346         if (strlen(handlers_str) == 0) {
347                 free(handlers_str);
348                 ret_str = strdup(part1);
349                 if (ret_str == NULL) {
350                         ERROR("write_sensu plugin: Unable to alloc memory");
351                         return NULL;
352                 }
353         }
354         else {
355                 res = my_asprintf(&ret_str, "%s, %s", part1, handlers_str);
356                 free(handlers_str);
357                 if (res == -1) {
358                         ERROR("write_sensu plugin: Unable to alloc memory");
359                         return NULL;
360                 }
361         }
362
363         // incorporate the plugin name information
364         res = my_asprintf(&temp_str, "%s, \"collectd_plugin\": \"%s\"", ret_str, vl->plugin);
365         free(ret_str);
366         if (res == -1) {
367                 ERROR("write_sensu plugin: Unable to alloc memory");
368                 return NULL;
369         }
370         ret_str = temp_str;
371
372         // incorporate the plugin type
373         res = my_asprintf(&temp_str, "%s, \"collectd_plugin_type\": \"%s\"", ret_str, vl->type);
374         free(ret_str);
375         if (res == -1) {
376                 ERROR("write_sensu plugin: Unable to alloc memory");
377                 return NULL;
378         }
379         ret_str = temp_str;
380
381         // incorporate the plugin instance if any
382         if (vl->plugin_instance[0] != 0) {
383                 res = my_asprintf(&temp_str, "%s, \"collectd_plugin_instance\": \"%s\"", ret_str, vl->plugin_instance);
384                 free(ret_str);
385                 if (res == -1) {
386                         ERROR("write_sensu plugin: Unable to alloc memory");
387                         return NULL;
388                 }
389                 ret_str = temp_str;
390         }
391
392         // incorporate the plugin type instance if any
393         if (vl->type_instance[0] != 0) {
394                 res = my_asprintf(&temp_str, "%s, \"collectd_plugin_type_instance\": \"%s\"", ret_str, vl->type_instance);
395                 free(ret_str);
396                 if (res == -1) {
397                         ERROR("write_sensu plugin: Unable to alloc memory");
398                         return NULL;
399                 }
400                 ret_str = temp_str;
401         }
402
403         // incorporate the data source type
404         if ((ds->ds[index].type != DS_TYPE_GAUGE) && (rates != NULL)) {
405                 char ds_type[DATA_MAX_NAME_LEN];
406                 ssnprintf (ds_type, sizeof (ds_type), "%s:rate", DS_TYPE_TO_STRING(ds->ds[index].type));
407                 res = my_asprintf(&temp_str, "%s, \"collectd_data_source_type\": \"%s\"", ret_str, ds_type);
408                 free(ret_str);
409                 if (res == -1) {
410                         ERROR("write_sensu plugin: Unable to alloc memory");
411                         return NULL;
412                 }
413                 ret_str = temp_str;
414         } else {
415                 res = my_asprintf(&temp_str, "%s, \"collectd_data_source_type\": \"%s\"", ret_str, DS_TYPE_TO_STRING(ds->ds[index].type));
416                 free(ret_str);
417                 if (res == -1) {
418                         ERROR("write_sensu plugin: Unable to alloc memory");
419                         return NULL;
420                 }
421                 ret_str = temp_str;
422         }
423
424         // incorporate the data source name
425         res = my_asprintf(&temp_str, "%s, \"collectd_data_source_name\": \"%s\"", ret_str, ds->ds[index].name);
426         free(ret_str);
427         if (res == -1) {
428                 ERROR("write_sensu plugin: Unable to alloc memory");
429                 return NULL;
430         }
431         ret_str = temp_str;
432
433         // incorporate the data source index
434         {
435                 char ds_index[DATA_MAX_NAME_LEN];
436                 ssnprintf (ds_index, sizeof (ds_index), "%zu", index);
437                 res = my_asprintf(&temp_str, "%s, \"collectd_data_source_index\": %s", ret_str, ds_index);
438                 free(ret_str);
439                 if (res == -1) {
440                         ERROR("write_sensu plugin: Unable to alloc memory");
441                         return NULL;
442                 }
443                 ret_str = temp_str;
444         }
445
446         // add key value attributes from config if any
447         for (size_t i = 0; i < sensu_attrs_num; i += 2) {
448                 res = my_asprintf(&temp_str, "%s, \"%s\": \"%s\"", ret_str, sensu_attrs[i], sensu_attrs[i+1]);
449                 free(ret_str);
450                 if (res == -1) {
451                         ERROR("write_sensu plugin: Unable to alloc memory");
452                         return NULL;
453                 }
454                 ret_str = temp_str;
455         }
456
457         // incorporate sensu tags from config if any
458         if ((sensu_tags != NULL) && (strlen(sensu_tags) != 0)) {
459                 res = my_asprintf(&temp_str, "%s, %s", ret_str, sensu_tags);
460                 free(ret_str);
461                 if (res == -1) {
462                         ERROR("write_sensu plugin: Unable to alloc memory");
463                         return NULL;
464                 }
465                 ret_str = temp_str;
466         }
467
468         // calculate the value and set to a string
469         if (ds->ds[index].type == DS_TYPE_GAUGE) {
470                 res = my_asprintf(&value_str, GAUGE_FORMAT, vl->values[index].gauge);
471                 if (res == -1) {
472                         free(ret_str);
473                         ERROR("write_sensu plugin: Unable to alloc memory");
474                         return NULL;
475                 }
476         } else if (rates != NULL) {
477                 res = my_asprintf(&value_str, GAUGE_FORMAT, rates[index]);
478                 if (res == -1) {
479                         free(ret_str);
480                         ERROR("write_sensu plugin: Unable to alloc memory");
481                         return NULL;
482                 }
483         } else {
484                 if (ds->ds[index].type == DS_TYPE_DERIVE) {
485                         res = my_asprintf(&value_str, "%"PRIi64, vl->values[index].derive);
486                         if (res == -1) {
487                                 free(ret_str);
488                                 ERROR("write_sensu plugin: Unable to alloc memory");
489                                 return NULL;
490                         }
491                 }
492                 else if (ds->ds[index].type == DS_TYPE_ABSOLUTE) {
493                         res = my_asprintf(&value_str, "%"PRIu64, vl->values[index].absolute);
494                         if (res == -1) {
495                                 free(ret_str);
496                                 ERROR("write_sensu plugin: Unable to alloc memory");
497                                 return NULL;
498                         }
499                 }
500                 else {
501                         res = my_asprintf(&value_str, "%llu", vl->values[index].counter);
502                         if (res == -1) {
503                                 free(ret_str);
504                                 ERROR("write_sensu plugin: Unable to alloc memory");
505                                 return NULL;
506                         }
507                 }
508         }
509
510         // Generate the full service name
511         sensu_format_name2(name_buffer, sizeof(name_buffer),
512                 vl->host, vl->plugin, vl->plugin_instance,
513                 vl->type, vl->type_instance, host->separator);
514         if (host->always_append_ds || (ds->ds_num > 1)) {
515                 if (host->event_service_prefix == NULL)
516                         ssnprintf(service_buffer, sizeof(service_buffer), "%s.%s",
517                                         name_buffer, ds->ds[index].name);
518                 else
519                         ssnprintf(service_buffer, sizeof(service_buffer), "%s%s.%s",
520                                         host->event_service_prefix, name_buffer, ds->ds[index].name);
521         } else {
522                 if (host->event_service_prefix == NULL)
523                         sstrncpy(service_buffer, name_buffer, sizeof(service_buffer));
524                 else
525                         ssnprintf(service_buffer, sizeof(service_buffer), "%s%s",
526                                         host->event_service_prefix, name_buffer);
527         }
528
529         // Replace collectd sensor name reserved characters so that time series DB is happy
530         in_place_replace_sensu_name_reserved(service_buffer);
531
532         // finalize the buffer by setting the output and closing curly bracket
533         res = my_asprintf(&temp_str, "%s, \"output\": \"%s %s %ld\"}\n", ret_str, service_buffer, value_str, CDTIME_T_TO_TIME_T(vl->time));
534         free(ret_str);
535         free(value_str);
536         if (res == -1) {
537                 ERROR("write_sensu plugin: Unable to alloc memory");
538                 return NULL;
539         }
540         ret_str = temp_str;
541
542         DEBUG("write_sensu plugin: Successfully created json for metric: "
543                         "host = \"%s\", service = \"%s\"",
544                         vl->host, service_buffer);
545         return ret_str;
546 } /* }}} char *sensu_value_to_json */
547
548 /*
549  * Uses replace_str2() implementation from
550  * http://creativeandcritical.net/str-replace-c/
551  * copyright (c) Laird Shaw, under public domain.
552  */
553 static char *replace_str(const char *str, const char *old, /* {{{ */
554                 const char *new)
555 {
556         char *ret, *r;
557         const char *p, *q;
558         size_t oldlen = strlen(old);
559         size_t count = strlen(new);
560         size_t retlen;
561         size_t newlen = count;
562         int samesize = (oldlen == newlen);
563
564         if (!samesize) {
565                 for (count = 0, p = str; (q = strstr(p, old)) != NULL; p = q + oldlen)
566                         count++;
567                 /* This is undefined if p - str > PTRDIFF_MAX */
568                 retlen = p - str + strlen(p) + count * (newlen - oldlen);
569         } else
570                 retlen = strlen(str);
571
572         ret = calloc(1, retlen + 1);
573         if (ret == NULL)
574                 return NULL;
575         // added to original: not optimized, but keeps valgrind happy.
576
577         r = ret;
578         p = str;
579         while (1) {
580                 /* If the old and new strings are different lengths - in other
581                  * words we have already iterated through with strstr above,
582                  * and thus we know how many times we need to call it - then we
583                  * can avoid the final (potentially lengthy) call to strstr,
584                  * which we already know is going to return NULL, by
585                  * decrementing and checking count.
586                  */
587                 if (!samesize && !count--)
588                         break;
589                 /* Otherwise i.e. when the old and new strings are the same
590                  * length, and we don't know how many times to call strstr,
591                  * we must check for a NULL return here (we check it in any
592                  * event, to avoid further conditions, and because there's
593                  * no harm done with the check even when the old and new
594                  * strings are different lengths).
595                  */
596                 if ((q = strstr(p, old)) == NULL)
597                         break;
598                 /* This is undefined if q - p > PTRDIFF_MAX */
599                 ptrdiff_t l = q - p;
600                 memcpy(r, p, l);
601                 r += l;
602                 memcpy(r, new, newlen);
603                 r += newlen;
604                 p = q + oldlen;
605         }
606         strncpy(r, p, strlen(p));
607
608         return ret;
609 } /* }}} char *replace_str */
610
611 static char *replace_json_reserved(const char *message) /* {{{ */
612 {
613         char *msg = replace_str(message, "\\", "\\\\");
614         if (msg == NULL) {
615                 ERROR("write_sensu plugin: Unable to alloc memory");
616                 return NULL;
617         }
618         char *tmp = replace_str(msg, "\"", "\\\"");
619         free(msg);
620         if (tmp == NULL) {
621                 ERROR("write_sensu plugin: Unable to alloc memory");
622                 return NULL;
623         }
624         msg = replace_str(tmp, "\n", "\\\n");
625         free(tmp);
626         if (msg == NULL) {
627                 ERROR("write_sensu plugin: Unable to alloc memory");
628                 return NULL;
629         }
630         return msg;
631 } /* }}} char *replace_json_reserved */
632
633 static char *sensu_notification_to_json(struct sensu_host *host, /* {{{ */
634                 notification_t const *n)
635 {
636         char service_buffer[6 * DATA_MAX_NAME_LEN];
637         char const *severity;
638         char *ret_str;
639         char *temp_str;
640         int status;
641         size_t i;
642         int res;
643         // add the severity/status
644         switch (n->severity) {
645                 case NOTIF_OKAY:
646                         severity = "OK";
647                         status = 0;
648                         break;
649                 case NOTIF_WARNING:
650                         severity = "WARNING";
651                         status = 1;
652                         break;
653                 case NOTIF_FAILURE:
654                         severity = "CRITICAL";
655                         status = 2;
656                         break;
657                 default:
658                         severity = "UNKNOWN";
659                         status = 3;
660         }
661         res = my_asprintf(&temp_str, "{\"status\": %d", status);
662         if (res == -1) {
663                 ERROR("write_sensu plugin: Unable to alloc memory");
664                 return NULL;
665         }
666         ret_str = temp_str;
667
668         // incorporate the timestamp
669         res = my_asprintf(&temp_str, "%s, \"timestamp\": %ld", ret_str, CDTIME_T_TO_TIME_T(n->time));
670         free(ret_str);
671         if (res == -1) {
672                 ERROR("write_sensu plugin: Unable to alloc memory");
673                 return NULL;
674         }
675         ret_str = temp_str;
676
677         char *handlers_str = build_json_str_list("handlers", &(host->notification_handlers));
678         if (handlers_str == NULL) {
679                 ERROR("write_sensu plugin: Unable to alloc memory");
680                 free(ret_str);
681                 return NULL;
682         }
683         // incorporate the handlers
684         if (strlen(handlers_str) != 0) {
685                 res = my_asprintf(&temp_str, "%s, %s", ret_str, handlers_str);
686                 free(ret_str);
687                 free(handlers_str);
688                 if (res == -1) {
689                         ERROR("write_sensu plugin: Unable to alloc memory");
690                         return NULL;
691                 }
692                 ret_str = temp_str;
693         } else {
694                 free(handlers_str);
695         }
696
697         // incorporate the plugin name information if any
698         if (n->plugin[0] != 0) {
699                 res = my_asprintf(&temp_str, "%s, \"collectd_plugin\": \"%s\"", ret_str, n->plugin);
700                 free(ret_str);
701                 if (res == -1) {
702                         ERROR("write_sensu plugin: Unable to alloc memory");
703                         return NULL;
704                 }
705                 ret_str = temp_str;
706         }
707
708         // incorporate the plugin type if any
709         if (n->type[0] != 0) {
710                 res = my_asprintf(&temp_str, "%s, \"collectd_plugin_type\": \"%s\"", ret_str, n->type);
711                 free(ret_str);
712                 if (res == -1) {
713                         ERROR("write_sensu plugin: Unable to alloc memory");
714                         return NULL;
715                 }
716                 ret_str = temp_str;
717         }
718
719         // incorporate the plugin instance if any
720         if (n->plugin_instance[0] != 0) {
721                 res = my_asprintf(&temp_str, "%s, \"collectd_plugin_instance\": \"%s\"", ret_str, n->plugin_instance);
722                 free(ret_str);
723                 if (res == -1) {
724                         ERROR("write_sensu plugin: Unable to alloc memory");
725                         return NULL;
726                 }
727                 ret_str = temp_str;
728         }
729
730         // incorporate the plugin type instance if any
731         if (n->type_instance[0] != 0) {
732                 res = my_asprintf(&temp_str, "%s, \"collectd_plugin_type_instance\": \"%s\"", ret_str, n->type_instance);
733                 free(ret_str);
734                 if (res == -1) {
735                         ERROR("write_sensu plugin: Unable to alloc memory");
736                         return NULL;
737                 }
738                 ret_str = temp_str;
739         }
740
741         // add key value attributes from config if any
742         for (i = 0; i < sensu_attrs_num; i += 2) {
743                 res = my_asprintf(&temp_str, "%s, \"%s\": \"%s\"", ret_str, sensu_attrs[i], sensu_attrs[i+1]);
744                 free(ret_str);
745                 if (res == -1) {
746                         ERROR("write_sensu plugin: Unable to alloc memory");
747                         return NULL;
748                 }
749                 ret_str = temp_str;
750         }
751
752         // incorporate sensu tags from config if any
753         if ((sensu_tags != NULL) && (strlen(sensu_tags) != 0)) {
754                 res = my_asprintf(&temp_str, "%s, %s", ret_str, sensu_tags);
755                 free(ret_str);
756                 if (res == -1) {
757                         ERROR("write_sensu plugin: Unable to alloc memory");
758                         return NULL;
759                 }
760                 ret_str = temp_str;
761         }
762
763         // incorporate the service name
764         sensu_format_name2(service_buffer, sizeof(service_buffer),
765                                 /* host */ "", n->plugin, n->plugin_instance,
766                                 n->type, n->type_instance, host->separator);
767         // replace sensu event name chars that are considered illegal
768         in_place_replace_sensu_name_reserved(service_buffer);
769         res = my_asprintf(&temp_str, "%s, \"name\": \"%s\"", ret_str, &service_buffer[1]);
770         free(ret_str);
771         if (res == -1) {
772                 ERROR("write_sensu plugin: Unable to alloc memory");
773                 return NULL;
774         }
775         ret_str = temp_str;
776
777         // incorporate the check output
778         if (n->message[0] != 0) {
779                 char *msg = replace_json_reserved(n->message);
780                 if (msg == NULL) {
781                         ERROR("write_sensu plugin: Unable to alloc memory");
782                         free(ret_str);
783                         return NULL;
784                 }
785                 res = my_asprintf(&temp_str, "%s, \"output\": \"%s - %s\"", ret_str, severity, msg);
786                 free(ret_str);
787                 free(msg);
788                 if (res == -1) {
789                         ERROR("write_sensu plugin: Unable to alloc memory");
790                         return NULL;
791                 }
792                 ret_str = temp_str;
793         }
794
795         // Pull in values from threshold and add extra attributes
796         for (notification_meta_t *meta = n->meta; meta != NULL; meta = meta->next) {
797                 if (strcasecmp("CurrentValue", meta->name) == 0 && meta->type == NM_TYPE_DOUBLE) {
798                         res = my_asprintf(&temp_str, "%s, \"current_value\": \"%.8f\"", ret_str, meta->nm_value.nm_double);
799                         free(ret_str);
800                         if (res == -1) {
801                                 ERROR("write_sensu plugin: Unable to alloc memory");
802                                 return NULL;
803                         }
804                         ret_str = temp_str;
805                 }
806                 if (meta->type == NM_TYPE_STRING) {
807                         res = my_asprintf(&temp_str, "%s, \"%s\": \"%s\"", ret_str, meta->name, meta->nm_value.nm_string);
808                         free(ret_str);
809                         if (res == -1) {
810                                 ERROR("write_sensu plugin: Unable to alloc memory");
811                                 return NULL;
812                         }
813                         ret_str = temp_str;
814                 }
815         }
816
817         // close the curly bracket
818         res = my_asprintf(&temp_str, "%s}\n", ret_str);
819         free(ret_str);
820         if (res == -1) {
821                 ERROR("write_sensu plugin: Unable to alloc memory");
822                 return NULL;
823         }
824         ret_str = temp_str;
825
826         DEBUG("write_sensu plugin: Successfully created JSON for notification: "
827                                 "host = \"%s\", service = \"%s\", state = \"%s\"",
828                                 n->host, service_buffer, severity);
829         return ret_str;
830 } /* }}} char *sensu_notification_to_json */
831
832 static int sensu_send_msg(struct sensu_host *host, const char *msg) /* {{{ */
833 {
834         int status = 0;
835         size_t  buffer_len;
836
837         status = sensu_connect(host);
838         if (status != 0)
839                 return status;
840
841         buffer_len = strlen(msg);
842
843         status = (int) swrite(host->s, msg, buffer_len);
844         sensu_close_socket(host);
845
846         if (status != 0) {
847                 char errbuf[1024];
848                 ERROR("write_sensu plugin: Sending to Sensu at %s:%s failed: %s",
849                                 (host->node != NULL) ? host->node : SENSU_HOST,
850                                 (host->service != NULL) ? host->service : SENSU_PORT,
851                                 sstrerror(errno, errbuf, sizeof(errbuf)));
852                 return -1;
853         }
854
855         return 0;
856 } /* }}} int sensu_send_msg */
857
858
859 static int sensu_send(struct sensu_host *host, char const *msg) /* {{{ */
860 {
861         int status = 0;
862
863         status = sensu_send_msg(host, msg);
864         if (status != 0) {
865                 host->flags &= ~F_READY;
866                 if (host->res != NULL) {
867                         freeaddrinfo(host->res);
868                         host->res = NULL;
869                 }
870                 return status;
871         }
872
873         return 0;
874 } /* }}} int sensu_send */
875
876
877 static int sensu_write(const data_set_t *ds, /* {{{ */
878               const value_list_t *vl,
879               user_data_t *ud)
880 {
881         int status = 0;
882         int statuses[vl->values_len];
883         struct sensu_host       *host = ud->data;
884         gauge_t *rates = NULL;
885         char *msg;
886
887         pthread_mutex_lock(&host->lock);
888         memset(statuses, 0, vl->values_len * sizeof(*statuses));
889
890         if (host->store_rates) {
891                 rates = uc_get_rate(ds, vl);
892                 if (rates == NULL) {
893                         ERROR("write_sensu plugin: uc_get_rate failed.");
894                         pthread_mutex_unlock(&host->lock);
895                         return -1;
896                 }
897         }
898         for (size_t i = 0; i < vl->values_len; i++) {
899                 msg = sensu_value_to_json(host, ds, vl, (int) i, rates, statuses[i]);
900                 if (msg == NULL) {
901                         sfree(rates);
902                         pthread_mutex_unlock(&host->lock);
903                         return -1;
904                 }
905                 status = sensu_send(host, msg);
906                 free(msg);
907                 if (status != 0) {
908                         ERROR("write_sensu plugin: sensu_send failed with status %i", status);
909                         pthread_mutex_unlock(&host->lock);
910                         sfree(rates);
911                         return status;
912                 }
913         }
914         sfree(rates);
915         pthread_mutex_unlock(&host->lock);
916         return status;
917 } /* }}} int sensu_write */
918
919 static int sensu_notification(const notification_t *n, user_data_t *ud) /* {{{ */
920 {
921         int     status;
922         struct sensu_host *host = ud->data;
923         char *msg;
924
925         pthread_mutex_lock(&host->lock);
926
927         msg = sensu_notification_to_json(host, n);
928         if (msg == NULL) {
929                 pthread_mutex_unlock(&host->lock);
930                 return -1;
931         }
932
933         status = sensu_send(host, msg);
934         free(msg);
935         if (status != 0)
936                 ERROR("write_sensu plugin: sensu_send failed with status %i", status);
937         pthread_mutex_unlock(&host->lock);
938
939         return status;
940 } /* }}} int sensu_notification */
941
942 static void sensu_free(void *p) /* {{{ */
943 {
944         struct sensu_host *host = p;
945
946         if (host == NULL)
947                 return;
948
949         pthread_mutex_lock(&host->lock);
950
951         host->reference_count--;
952         if (host->reference_count > 0) {
953                 pthread_mutex_unlock(&host->lock);
954                 return;
955         }
956
957         sensu_close_socket(host);
958         if (host->res != NULL) {
959                 freeaddrinfo(host->res);
960                 host->res = NULL;
961         }
962         sfree(host->service);
963         sfree(host->event_service_prefix);
964         sfree(host->name);
965         sfree(host->node);
966         sfree(host->separator);
967         free_str_list(&(host->metric_handlers));
968         free_str_list(&(host->notification_handlers));
969         pthread_mutex_destroy(&host->lock);
970         sfree(host);
971 } /* }}} void sensu_free */
972
973
974 static int sensu_config_node(oconfig_item_t *ci) /* {{{ */
975 {
976         struct sensu_host       *host = NULL;
977         int                                     status = 0;
978         oconfig_item_t          *child;
979         char                            callback_name[DATA_MAX_NAME_LEN];
980         user_data_t                     ud;
981
982         if ((host = calloc(1, sizeof(*host))) == NULL) {
983                 ERROR("write_sensu plugin: calloc failed.");
984                 return ENOMEM;
985         }
986         pthread_mutex_init(&host->lock, NULL);
987         host->reference_count = 1;
988         host->node = NULL;
989         host->service = NULL;
990         host->notifications = 0;
991         host->metrics = 0;
992         host->store_rates = 1;
993         host->always_append_ds = 0;
994         host->metric_handlers.nb_strs = 0;
995         host->metric_handlers.strs = NULL;
996         host->notification_handlers.nb_strs = 0;
997         host->notification_handlers.strs = NULL;
998         host->separator = strdup("/");
999         if (host->separator == NULL) {
1000                 ERROR("write_sensu plugin: Unable to alloc memory");
1001                 sensu_free(host);
1002                 return -1;
1003         }
1004
1005         status = cf_util_get_string(ci, &host->name);
1006         if (status != 0) {
1007                 WARNING("write_sensu plugin: Required host name is missing.");
1008                 sensu_free(host);
1009                 return -1;
1010         }
1011
1012         for (int i = 0; i < ci->children_num; i++) {
1013                 child = &ci->children[i];
1014                 status = 0;
1015
1016                 if (strcasecmp("Host", child->key) == 0) {
1017                         status = cf_util_get_string(child, &host->node);
1018                         if (status != 0)
1019                                 break;
1020                 } else if (strcasecmp("Notifications", child->key) == 0) {
1021                         status = cf_util_get_boolean(child, &host->notifications);
1022                         if (status != 0)
1023                                 break;
1024                 } else if (strcasecmp("Metrics", child->key) == 0) {
1025                                         status = cf_util_get_boolean(child, &host->metrics);
1026                                         if (status != 0)
1027                                                 break;
1028                 } else if (strcasecmp("EventServicePrefix", child->key) == 0) {
1029                         status = cf_util_get_string(child, &host->event_service_prefix);
1030                         if (status != 0)
1031                                 break;
1032                 } else if (strcasecmp("Separator", child->key) == 0) {
1033                                 status = cf_util_get_string(child, &host->separator);
1034                                 if (status != 0)
1035                                         break;
1036                 } else if (strcasecmp("MetricHandler", child->key) == 0) {
1037                         char *temp_str = NULL;
1038                         status = cf_util_get_string(child, &temp_str);
1039                         if (status != 0)
1040                                 break;
1041                         status = add_str_to_list(&(host->metric_handlers), temp_str);
1042                         free(temp_str);
1043                         if (status != 0)
1044                                 break;
1045                 } else if (strcasecmp("NotificationHandler", child->key) == 0) {
1046                         char *temp_str = NULL;
1047                         status = cf_util_get_string(child, &temp_str);
1048                         if (status != 0)
1049                                 break;
1050                         status = add_str_to_list(&(host->notification_handlers), temp_str);
1051                         free(temp_str);
1052                         if (status != 0)
1053                                 break;
1054                 } else if (strcasecmp("Port", child->key) == 0) {
1055                         status = cf_util_get_service(child, &host->service);
1056                         if (status != 0) {
1057                                 ERROR("write_sensu plugin: Invalid argument "
1058                                                 "configured for the \"Port\" "
1059                                                 "option.");
1060                                 break;
1061                         }
1062                 } else if (strcasecmp("StoreRates", child->key) == 0) {
1063                         status = cf_util_get_boolean(child, &host->store_rates);
1064                         if (status != 0)
1065                                 break;
1066                 } else if (strcasecmp("AlwaysAppendDS", child->key) == 0) {
1067                         status = cf_util_get_boolean(child,
1068                                         &host->always_append_ds);
1069                         if (status != 0)
1070                                 break;
1071                 } else {
1072                         WARNING("write_sensu plugin: ignoring unknown config "
1073                                 "option: \"%s\"", child->key);
1074                 }
1075         }
1076         if (status != 0) {
1077                 sensu_free(host);
1078                 return status;
1079         }
1080
1081         if (host->metrics && (host->metric_handlers.nb_strs == 0)) {
1082                         sensu_free(host);
1083                         WARNING("write_sensu plugin: metrics enabled but no MetricHandler defined. Giving up.");
1084                         return -1;
1085                 }
1086
1087         if (host->notifications && (host->notification_handlers.nb_strs == 0)) {
1088                 sensu_free(host);
1089                 WARNING("write_sensu plugin: notifications enabled but no NotificationHandler defined. Giving up.");
1090                 return -1;
1091         }
1092
1093         if ((host->notification_handlers.nb_strs > 0) && (host->notifications == 0)) {
1094                 WARNING("write_sensu plugin: NotificationHandler given so forcing notifications to be enabled");
1095                 host->notifications = 1;
1096         }
1097
1098         if ((host->metric_handlers.nb_strs > 0) && (host->metrics == 0)) {
1099                 WARNING("write_sensu plugin: MetricHandler given so forcing metrics to be enabled");
1100                 host->metrics = 1;
1101         }
1102
1103         if (!(host->notifications || host->metrics)) {
1104                 WARNING("write_sensu plugin: neither metrics nor notifications enabled. Giving up.");
1105                 sensu_free(host);
1106                 return -1;
1107         }
1108
1109         ssnprintf(callback_name, sizeof(callback_name), "write_sensu/%s", host->name);
1110         ud.data = host;
1111         ud.free_func = sensu_free;
1112
1113         pthread_mutex_lock(&host->lock);
1114
1115         if (host->metrics) {
1116                 status = plugin_register_write(callback_name, sensu_write, &ud);
1117                 if (status != 0)
1118                         WARNING("write_sensu plugin: plugin_register_write (\"%s\") "
1119                                         "failed with status %i.",
1120                                         callback_name, status);
1121                 else /* success */
1122                         host->reference_count++;
1123         }
1124
1125         if (host->notifications) {
1126                 status = plugin_register_notification(callback_name, sensu_notification, &ud);
1127                 if (status != 0)
1128                         WARNING("write_sensu plugin: plugin_register_notification (\"%s\") "
1129                                         "failed with status %i.",
1130                                         callback_name, status);
1131                 else
1132                         host->reference_count++;
1133         }
1134
1135         if (host->reference_count <= 1) {
1136                 /* Both callbacks failed => free memory.
1137                  * We need to unlock here, because sensu_free() will lock.
1138                  * This is not a race condition, because we're the only one
1139                  * holding a reference. */
1140                 pthread_mutex_unlock(&host->lock);
1141                 sensu_free(host);
1142                 return -1;
1143         }
1144
1145         host->reference_count--;
1146         pthread_mutex_unlock(&host->lock);
1147
1148         return status;
1149 } /* }}} int sensu_config_node */
1150
1151 static int sensu_config(oconfig_item_t *ci) /* {{{ */
1152 {
1153         oconfig_item_t  *child;
1154         int              status;
1155         struct str_list sensu_tags_arr;
1156
1157         sensu_tags_arr.nb_strs = 0;
1158         sensu_tags_arr.strs = NULL;
1159
1160         for (int i = 0; i < ci->children_num; i++)  {
1161                 child = &ci->children[i];
1162
1163                 if (strcasecmp("Node", child->key) == 0) {
1164                         sensu_config_node(child);
1165                 } else if (strcasecmp(child->key, "attribute") == 0) {
1166                         if (child->values_num != 2) {
1167                                 WARNING("sensu attributes need both a key and a value.");
1168                                 continue;
1169                         }
1170                         if (child->values[0].type != OCONFIG_TYPE_STRING ||
1171                                         child->values[1].type != OCONFIG_TYPE_STRING) {
1172                                 WARNING("sensu attribute needs string arguments.");
1173                                 continue;
1174                         }
1175
1176                         strarray_add(&sensu_attrs, &sensu_attrs_num, child->values[0].value.string);
1177                         strarray_add(&sensu_attrs, &sensu_attrs_num, child->values[1].value.string);
1178
1179                         DEBUG("write_sensu plugin: New attribute: %s => %s",
1180                                         child->values[0].value.string,
1181                                         child->values[1].value.string);
1182                 } else if (strcasecmp(child->key, "tag") == 0) {
1183                         char *tmp = NULL;
1184                         status = cf_util_get_string(child, &tmp);
1185                         if (status != 0)
1186                                 continue;
1187
1188                         status = add_str_to_list(&sensu_tags_arr, tmp);
1189                         sfree(tmp);
1190                         if (status != 0)
1191                                 continue;
1192                         DEBUG("write_sensu plugin: Got tag: %s", tmp);
1193                 } else {
1194                         WARNING("write_sensu plugin: Ignoring unknown "
1195                                  "configuration option \"%s\" at top level.",
1196                                  child->key);
1197                 }
1198         }
1199         if (sensu_tags_arr.nb_strs > 0) {
1200                 sfree (sensu_tags);
1201                 sensu_tags = build_json_str_list("tags", &sensu_tags_arr);
1202                 free_str_list(&sensu_tags_arr);
1203                 if (sensu_tags == NULL) {
1204                         ERROR("write_sensu plugin: Unable to alloc memory");
1205                         return -1;
1206                 }
1207         }
1208         return 0;
1209 } /* }}} int sensu_config */
1210
1211 void module_register(void)
1212 {
1213         plugin_register_complex_config("write_sensu", sensu_config);
1214 }
1215
1216 /* vim: set sw=8 sts=8 ts=8 noet : */