Merge pull request #1873 from rubenk/fix-duplicate-label_t
[collectd.git] / src / write_sensu.c
1 /**
2  * collectd - src/write_sensu.c
3  * Copyright (C) 2015 Fabrice A. Marie
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Fabrice A. Marie <fabrice at kibinlabs.com>
25  */
26
27 #define _GNU_SOURCE
28
29 #include "collectd.h"
30
31 #include "plugin.h"
32 #include "common.h"
33 #include "utils_cache.h"
34 #include <arpa/inet.h>
35 #include <errno.h>
36 #include <netdb.h>
37 #include <inttypes.h>
38 #include <stddef.h>
39
40 #include <stdlib.h>
41 #define SENSU_HOST              "localhost"
42 #define SENSU_PORT              "3030"
43
44 #ifdef HAVE_ASPRINTF
45 #define my_asprintf asprintf
46 #define my_vasprintf vasprintf
47 #else
48 /*
49  * asprintf() is available from Solaris 10 update 11.
50  * For older versions, use asprintf() portable implementation from
51  * https://github.com/littlstar/asprintf.c/blob/master/
52  * copyright (c) 2014 joseph werle <joseph.werle@gmail.com> under MIT license.
53  */
54
55 static int my_vasprintf(char **str, const char *fmt, va_list args) {
56        int size = 0;
57        va_list tmpa;
58        // copy
59        va_copy(tmpa, args);
60        // apply variadic arguments to
61        // sprintf with format to get size
62        size = vsnprintf(NULL, size, fmt, tmpa);
63        // toss args
64        va_end(tmpa);
65        // return -1 to be compliant if
66        // size is less than 0
67        if (size < 0) { return -1; }
68        // alloc with size plus 1 for `\0'
69        *str = (char *) malloc(size + 1);
70        // return -1 to be compliant
71        // if pointer is `NULL'
72        if (NULL == *str) { return -1; }
73        // format string with original
74        // variadic arguments and set new size
75        size = vsprintf(*str, fmt, args);
76        return size;
77 }
78
79 static int my_asprintf(char **str, const char *fmt, ...) {
80        int size = 0;
81        va_list args;
82        // init variadic argumens
83        va_start(args, fmt);
84        // format and get size
85        size = my_vasprintf(str, fmt, args);
86        // toss args
87        va_end(args);
88        return size;
89 }
90
91 #endif
92
93 struct str_list {
94         int nb_strs;
95         char **strs;
96 };
97
98 struct sensu_host {
99         char                    *name;
100         char                    *event_service_prefix;
101         struct str_list metric_handlers;
102         struct str_list notification_handlers;
103 #define F_READY      0x01
104         uint8_t                  flags;
105         pthread_mutex_t  lock;
106         _Bool            notifications;
107         _Bool            metrics;
108         _Bool                    store_rates;
109         _Bool                    always_append_ds;
110         char                    *separator;
111         char                    *node;
112         char                    *service;
113         int              s;
114         struct addrinfo *res;
115         int                          reference_count;
116 };
117
118 static char     *sensu_tags = NULL;
119 static char     **sensu_attrs = NULL;
120 static size_t sensu_attrs_num;
121
122 static int add_str_to_list(struct str_list *strs,
123                 const char *str_to_add) /* {{{ */
124 {
125         char **old_strs_ptr = strs->strs;
126         char *newstr = strdup(str_to_add);
127         if (newstr == NULL) {
128                 ERROR("write_sensu plugin: Unable to alloc memory");
129                 return -1;
130         }
131         strs->strs = realloc(strs->strs, sizeof(char *) *(strs->nb_strs + 1));
132         if (strs->strs == NULL) {
133                 strs->strs = old_strs_ptr;
134                 free(newstr);
135                 ERROR("write_sensu plugin: Unable to alloc memory");
136                 return -1;
137         }
138         strs->strs[strs->nb_strs] = newstr;
139         strs->nb_strs++;
140         return 0;
141 }
142 /* }}} int add_str_to_list */
143
144 static void free_str_list(struct str_list *strs) /* {{{ */
145 {
146         for (int i=0; i<strs->nb_strs; i++)
147                 free(strs->strs[i]);
148         free(strs->strs);
149 }
150 /* }}} void free_str_list */
151
152 static int sensu_connect(struct sensu_host *host) /* {{{ */
153 {
154         int                      e;
155         char const              *node;
156         char const              *service;
157
158         // Resolve the target if we haven't done already
159         if (!(host->flags & F_READY)) {
160                 memset(&service, 0, sizeof(service));
161                 host->res = NULL;
162
163                 node = (host->node != NULL) ? host->node : SENSU_HOST;
164                 service = (host->service != NULL) ? host->service : SENSU_PORT;
165
166                 struct addrinfo ai_hints = {
167                         .ai_family = AF_INET,
168                         .ai_flags = AI_ADDRCONFIG,
169                         .ai_socktype = SOCK_STREAM
170                 };
171
172                 if ((e = getaddrinfo(node, service, &ai_hints, &(host->res))) != 0) {
173                         ERROR("write_sensu plugin: Unable to resolve host \"%s\": %s",
174                                         node, gai_strerror(e));
175                         return -1;
176                 }
177                 DEBUG("write_sensu plugin: successfully resolved host/port: %s/%s",
178                                 node, service);
179                 host->flags |= F_READY;
180         }
181
182         struct linger so_linger;
183         host->s = -1;
184         for (struct addrinfo *ai = host->res; ai != NULL; ai = ai->ai_next) {
185                 // create the socket
186                 if ((host->s = socket(ai->ai_family,
187                                       ai->ai_socktype,
188                                       ai->ai_protocol)) == -1) {
189                         continue;
190                 }
191
192                 // Set very low close() lingering
193                 so_linger.l_onoff = 1;
194                 so_linger.l_linger = 3;
195                 if (setsockopt(host->s, SOL_SOCKET, SO_LINGER, &so_linger, sizeof so_linger) != 0)
196                         WARNING("write_sensu plugin: failed to set socket close() lingering");
197
198                 set_sock_opts(host->s);
199
200                 // connect the socket
201                 if (connect(host->s, ai->ai_addr, ai->ai_addrlen) != 0) {
202                         close(host->s);
203                         host->s = -1;
204                         continue;
205                 }
206                 DEBUG("write_sensu plugin: connected");
207                 break;
208         }
209
210         if (host->s < 0) {
211                 WARNING("write_sensu plugin: Unable to connect to sensu client");
212                 return -1;
213         }
214         return 0;
215 } /* }}} int sensu_connect */
216
217 static void sensu_close_socket(struct sensu_host *host) /* {{{ */
218 {
219         if (host->s != -1)
220                 close(host->s);
221         host->s = -1;
222
223 } /* }}} void sensu_close_socket */
224
225 static char *build_json_str_list(const char *tag, struct str_list const *list) /* {{{ */
226 {
227         int res;
228         char *ret_str = NULL;
229         char *temp_str;
230         if (list->nb_strs == 0) {
231                 ret_str = malloc(sizeof(char));
232                 if (ret_str == NULL) {
233                         ERROR("write_sensu plugin: Unable to alloc memory");
234                         return NULL;
235                 }
236                 ret_str[0] = '\0';
237         }
238
239         res = my_asprintf(&temp_str, "\"%s\": [\"%s\"", tag, list->strs[0]);
240         if (res == -1) {
241                 ERROR("write_sensu plugin: Unable to alloc memory");
242                 free(ret_str);
243                 return NULL;
244         }
245         for (int i=1; i<list->nb_strs; i++) {
246                 res = my_asprintf(&ret_str, "%s, \"%s\"", temp_str, list->strs[i]);
247                 free(temp_str);
248                 if (res == -1) {
249                         ERROR("write_sensu plugin: Unable to alloc memory");
250                         return NULL;
251                 }
252                 temp_str = ret_str;
253         }
254         res = my_asprintf(&ret_str, "%s]", temp_str);
255         free(temp_str);
256         if (res == -1) {
257                 ERROR("write_sensu plugin: Unable to alloc memory");
258                 return NULL;
259         }
260
261         return ret_str;
262 } /* }}} char *build_json_str_list*/
263
264 static int sensu_format_name2(char *ret, int ret_len,
265                 const char *hostname,
266                 const char *plugin, const char *plugin_instance,
267                 const char *type, const char *type_instance,
268                 const char *separator)
269 {
270         char *buffer;
271         size_t buffer_size;
272
273         buffer = ret;
274         buffer_size = (size_t) ret_len;
275
276 #define APPEND(str) do {          \
277         size_t l = strlen (str);        \
278         if (l >= buffer_size)           \
279                 return (ENOBUFS);             \
280         memcpy (buffer, (str), l);      \
281         buffer += l; buffer_size -= l;  \
282 } while (0)
283
284         assert (plugin != NULL);
285         assert (type != NULL);
286
287         APPEND (hostname);
288         APPEND (separator);
289         APPEND (plugin);
290         if ((plugin_instance != NULL) && (plugin_instance[0] != 0))
291         {
292                 APPEND ("-");
293                 APPEND (plugin_instance);
294         }
295         APPEND (separator);
296         APPEND (type);
297         if ((type_instance != NULL) && (type_instance[0] != 0))
298         {
299                 APPEND ("-");
300                 APPEND (type_instance);
301         }
302         assert (buffer_size > 0);
303         buffer[0] = 0;
304
305 #undef APPEND
306         return (0);
307 } /* int sensu_format_name2 */
308
309 static void in_place_replace_sensu_name_reserved(char *orig_name) /* {{{ */
310 {
311         int len=strlen(orig_name);
312         for (int i=0; i<len; i++) {
313                 // some plugins like ipmi generate special characters in metric name
314                 switch(orig_name[i]) {
315                         case '(': orig_name[i] = '_'; break;
316                         case ')': orig_name[i] = '_'; break;
317                         case ' ': orig_name[i] = '_'; break;
318                         case '"': orig_name[i] = '_'; break;
319                         case '\'': orig_name[i] = '_'; break;
320                         case '+': orig_name[i] = '_'; break;
321                 }
322         }
323 } /* }}} char *replace_sensu_name_reserved */
324
325 static char *sensu_value_to_json(struct sensu_host const *host, /* {{{ */
326                 data_set_t const *ds,
327                 value_list_t const *vl, size_t index,
328                 gauge_t const *rates,
329                 int status)
330 {
331         char name_buffer[5 * DATA_MAX_NAME_LEN];
332         char service_buffer[6 * DATA_MAX_NAME_LEN];
333         char *ret_str;
334         char *temp_str;
335         char *value_str;
336         int res;
337         // First part of the JSON string
338         const char *part1 = "{\"name\": \"collectd\", \"type\": \"metric\"";
339
340         char *handlers_str = build_json_str_list("handlers", &(host->metric_handlers));
341         if (handlers_str == NULL) {
342                 ERROR("write_sensu plugin: Unable to alloc memory");
343                 return NULL;
344         }
345
346         // incorporate the handlers
347         if (strlen(handlers_str) == 0) {
348                 free(handlers_str);
349                 ret_str = strdup(part1);
350                 if (ret_str == NULL) {
351                         ERROR("write_sensu plugin: Unable to alloc memory");
352                         return NULL;
353                 }
354         }
355         else {
356                 res = my_asprintf(&ret_str, "%s, %s", part1, handlers_str);
357                 free(handlers_str);
358                 if (res == -1) {
359                         ERROR("write_sensu plugin: Unable to alloc memory");
360                         return NULL;
361                 }
362         }
363
364         // incorporate the plugin name information
365         res = my_asprintf(&temp_str, "%s, \"collectd_plugin\": \"%s\"", ret_str, vl->plugin);
366         free(ret_str);
367         if (res == -1) {
368                 ERROR("write_sensu plugin: Unable to alloc memory");
369                 return NULL;
370         }
371         ret_str = temp_str;
372
373         // incorporate the plugin type
374         res = my_asprintf(&temp_str, "%s, \"collectd_plugin_type\": \"%s\"", ret_str, vl->type);
375         free(ret_str);
376         if (res == -1) {
377                 ERROR("write_sensu plugin: Unable to alloc memory");
378                 return NULL;
379         }
380         ret_str = temp_str;
381
382         // incorporate the plugin instance if any
383         if (vl->plugin_instance[0] != 0) {
384                 res = my_asprintf(&temp_str, "%s, \"collectd_plugin_instance\": \"%s\"", ret_str, vl->plugin_instance);
385                 free(ret_str);
386                 if (res == -1) {
387                         ERROR("write_sensu plugin: Unable to alloc memory");
388                         return NULL;
389                 }
390                 ret_str = temp_str;
391         }
392
393         // incorporate the plugin type instance if any
394         if (vl->type_instance[0] != 0) {
395                 res = my_asprintf(&temp_str, "%s, \"collectd_plugin_type_instance\": \"%s\"", ret_str, vl->type_instance);
396                 free(ret_str);
397                 if (res == -1) {
398                         ERROR("write_sensu plugin: Unable to alloc memory");
399                         return NULL;
400                 }
401                 ret_str = temp_str;
402         }
403
404         // incorporate the data source type
405         if ((ds->ds[index].type != DS_TYPE_GAUGE) && (rates != NULL)) {
406                 char ds_type[DATA_MAX_NAME_LEN];
407                 ssnprintf (ds_type, sizeof (ds_type), "%s:rate", DS_TYPE_TO_STRING(ds->ds[index].type));
408                 res = my_asprintf(&temp_str, "%s, \"collectd_data_source_type\": \"%s\"", ret_str, ds_type);
409                 free(ret_str);
410                 if (res == -1) {
411                         ERROR("write_sensu plugin: Unable to alloc memory");
412                         return NULL;
413                 }
414                 ret_str = temp_str;
415         } else {
416                 res = my_asprintf(&temp_str, "%s, \"collectd_data_source_type\": \"%s\"", ret_str, DS_TYPE_TO_STRING(ds->ds[index].type));
417                 free(ret_str);
418                 if (res == -1) {
419                         ERROR("write_sensu plugin: Unable to alloc memory");
420                         return NULL;
421                 }
422                 ret_str = temp_str;
423         }
424
425         // incorporate the data source name
426         res = my_asprintf(&temp_str, "%s, \"collectd_data_source_name\": \"%s\"", ret_str, ds->ds[index].name);
427         free(ret_str);
428         if (res == -1) {
429                 ERROR("write_sensu plugin: Unable to alloc memory");
430                 return NULL;
431         }
432         ret_str = temp_str;
433
434         // incorporate the data source index
435         {
436                 char ds_index[DATA_MAX_NAME_LEN];
437                 ssnprintf (ds_index, sizeof (ds_index), "%zu", index);
438                 res = my_asprintf(&temp_str, "%s, \"collectd_data_source_index\": %s", ret_str, ds_index);
439                 free(ret_str);
440                 if (res == -1) {
441                         ERROR("write_sensu plugin: Unable to alloc memory");
442                         return NULL;
443                 }
444                 ret_str = temp_str;
445         }
446
447         // add key value attributes from config if any
448         for (size_t i = 0; i < sensu_attrs_num; i += 2) {
449                 res = my_asprintf(&temp_str, "%s, \"%s\": \"%s\"", ret_str, sensu_attrs[i], sensu_attrs[i+1]);
450                 free(ret_str);
451                 if (res == -1) {
452                         ERROR("write_sensu plugin: Unable to alloc memory");
453                         return NULL;
454                 }
455                 ret_str = temp_str;
456         }
457
458         // incorporate sensu tags from config if any
459         if ((sensu_tags != NULL) && (strlen(sensu_tags) != 0)) {
460                 res = my_asprintf(&temp_str, "%s, %s", ret_str, sensu_tags);
461                 free(ret_str);
462                 if (res == -1) {
463                         ERROR("write_sensu plugin: Unable to alloc memory");
464                         return NULL;
465                 }
466                 ret_str = temp_str;
467         }
468
469         // calculate the value and set to a string
470         if (ds->ds[index].type == DS_TYPE_GAUGE) {
471                 res = my_asprintf(&value_str, GAUGE_FORMAT, vl->values[index].gauge);
472                 if (res == -1) {
473                         free(ret_str);
474                         ERROR("write_sensu plugin: Unable to alloc memory");
475                         return NULL;
476                 }
477         } else if (rates != NULL) {
478                 res = my_asprintf(&value_str, GAUGE_FORMAT, rates[index]);
479                 if (res == -1) {
480                         free(ret_str);
481                         ERROR("write_sensu plugin: Unable to alloc memory");
482                         return NULL;
483                 }
484         } else {
485                 if (ds->ds[index].type == DS_TYPE_DERIVE) {
486                         res = my_asprintf(&value_str, "%"PRIi64, vl->values[index].derive);
487                         if (res == -1) {
488                                 free(ret_str);
489                                 ERROR("write_sensu plugin: Unable to alloc memory");
490                                 return NULL;
491                         }
492                 }
493                 else if (ds->ds[index].type == DS_TYPE_ABSOLUTE) {
494                         res = my_asprintf(&value_str, "%"PRIu64, vl->values[index].absolute);
495                         if (res == -1) {
496                                 free(ret_str);
497                                 ERROR("write_sensu plugin: Unable to alloc memory");
498                                 return NULL;
499                         }
500                 }
501                 else {
502                         res = my_asprintf(&value_str, "%llu", vl->values[index].counter);
503                         if (res == -1) {
504                                 free(ret_str);
505                                 ERROR("write_sensu plugin: Unable to alloc memory");
506                                 return NULL;
507                         }
508                 }
509         }
510
511         // Generate the full service name
512         sensu_format_name2(name_buffer, sizeof(name_buffer),
513                 vl->host, vl->plugin, vl->plugin_instance,
514                 vl->type, vl->type_instance, host->separator);
515         if (host->always_append_ds || (ds->ds_num > 1)) {
516                 if (host->event_service_prefix == NULL)
517                         ssnprintf(service_buffer, sizeof(service_buffer), "%s.%s",
518                                         name_buffer, ds->ds[index].name);
519                 else
520                         ssnprintf(service_buffer, sizeof(service_buffer), "%s%s.%s",
521                                         host->event_service_prefix, name_buffer, ds->ds[index].name);
522         } else {
523                 if (host->event_service_prefix == NULL)
524                         sstrncpy(service_buffer, name_buffer, sizeof(service_buffer));
525                 else
526                         ssnprintf(service_buffer, sizeof(service_buffer), "%s%s",
527                                         host->event_service_prefix, name_buffer);
528         }
529
530         // Replace collectd sensor name reserved characters so that time series DB is happy
531         in_place_replace_sensu_name_reserved(service_buffer);
532
533         // finalize the buffer by setting the output and closing curly bracket
534         res = my_asprintf(&temp_str, "%s, \"output\": \"%s %s %lld\"}\n",ret_str, service_buffer, value_str, (long long)CDTIME_T_TO_TIME_T(vl->time));
535         free(ret_str);
536         free(value_str);
537         if (res == -1) {
538                 ERROR("write_sensu plugin: Unable to alloc memory");
539                 return NULL;
540         }
541         ret_str = temp_str;
542
543         DEBUG("write_sensu plugin: Successfully created json for metric: "
544                         "host = \"%s\", service = \"%s\"",
545                         vl->host, service_buffer);
546         return ret_str;
547 } /* }}} char *sensu_value_to_json */
548
549 /*
550  * Uses replace_str2() implementation from
551  * http://creativeandcritical.net/str-replace-c/
552  * copyright (c) Laird Shaw, under public domain.
553  */
554 static char *replace_str(const char *str, const char *old, /* {{{ */
555                 const char *new)
556 {
557         char *ret, *r;
558         const char *p, *q;
559         size_t oldlen = strlen(old);
560         size_t count = strlen(new);
561         size_t retlen;
562         size_t newlen = count;
563         int samesize = (oldlen == newlen);
564
565         if (!samesize) {
566                 for (count = 0, p = str; (q = strstr(p, old)) != NULL; p = q + oldlen)
567                         count++;
568                 /* This is undefined if p - str > PTRDIFF_MAX */
569                 retlen = p - str + strlen(p) + count * (newlen - oldlen);
570         } else
571                 retlen = strlen(str);
572
573         ret = calloc(1, retlen + 1);
574         if (ret == NULL)
575                 return NULL;
576         // added to original: not optimized, but keeps valgrind happy.
577
578         r = ret;
579         p = str;
580         while (1) {
581                 /* If the old and new strings are different lengths - in other
582                  * words we have already iterated through with strstr above,
583                  * and thus we know how many times we need to call it - then we
584                  * can avoid the final (potentially lengthy) call to strstr,
585                  * which we already know is going to return NULL, by
586                  * decrementing and checking count.
587                  */
588                 if (!samesize && !count--)
589                         break;
590                 /* Otherwise i.e. when the old and new strings are the same
591                  * length, and we don't know how many times to call strstr,
592                  * we must check for a NULL return here (we check it in any
593                  * event, to avoid further conditions, and because there's
594                  * no harm done with the check even when the old and new
595                  * strings are different lengths).
596                  */
597                 if ((q = strstr(p, old)) == NULL)
598                         break;
599                 /* This is undefined if q - p > PTRDIFF_MAX */
600                 ptrdiff_t l = q - p;
601                 memcpy(r, p, l);
602                 r += l;
603                 memcpy(r, new, newlen);
604                 r += newlen;
605                 p = q + oldlen;
606         }
607         strncpy(r, p, strlen(p));
608
609         return ret;
610 } /* }}} char *replace_str */
611
612 static char *replace_json_reserved(const char *message) /* {{{ */
613 {
614         char *msg = replace_str(message, "\\", "\\\\");
615         if (msg == NULL) {
616                 ERROR("write_sensu plugin: Unable to alloc memory");
617                 return NULL;
618         }
619         char *tmp = replace_str(msg, "\"", "\\\"");
620         free(msg);
621         if (tmp == NULL) {
622                 ERROR("write_sensu plugin: Unable to alloc memory");
623                 return NULL;
624         }
625         msg = replace_str(tmp, "\n", "\\\n");
626         free(tmp);
627         if (msg == NULL) {
628                 ERROR("write_sensu plugin: Unable to alloc memory");
629                 return NULL;
630         }
631         return msg;
632 } /* }}} char *replace_json_reserved */
633
634 static char *sensu_notification_to_json(struct sensu_host *host, /* {{{ */
635                 notification_t const *n)
636 {
637         char service_buffer[6 * DATA_MAX_NAME_LEN];
638         char const *severity;
639         char *ret_str;
640         char *temp_str;
641         int status;
642         size_t i;
643         int res;
644         // add the severity/status
645         switch (n->severity) {
646                 case NOTIF_OKAY:
647                         severity = "OK";
648                         status = 0;
649                         break;
650                 case NOTIF_WARNING:
651                         severity = "WARNING";
652                         status = 1;
653                         break;
654                 case NOTIF_FAILURE:
655                         severity = "CRITICAL";
656                         status = 2;
657                         break;
658                 default:
659                         severity = "UNKNOWN";
660                         status = 3;
661         }
662         res = my_asprintf(&temp_str, "{\"status\": %d", status);
663         if (res == -1) {
664                 ERROR("write_sensu plugin: Unable to alloc memory");
665                 return NULL;
666         }
667         ret_str = temp_str;
668
669         // incorporate the timestamp
670         res = my_asprintf(&temp_str, "%s, \"timestamp\": %lld", ret_str, (long long)CDTIME_T_TO_TIME_T(n->time));
671         free(ret_str);
672         if (res == -1) {
673                 ERROR("write_sensu plugin: Unable to alloc memory");
674                 return NULL;
675         }
676         ret_str = temp_str;
677
678         char *handlers_str = build_json_str_list("handlers", &(host->notification_handlers));
679         if (handlers_str == NULL) {
680                 ERROR("write_sensu plugin: Unable to alloc memory");
681                 free(ret_str);
682                 return NULL;
683         }
684         // incorporate the handlers
685         if (strlen(handlers_str) != 0) {
686                 res = my_asprintf(&temp_str, "%s, %s", ret_str, handlers_str);
687                 free(ret_str);
688                 free(handlers_str);
689                 if (res == -1) {
690                         ERROR("write_sensu plugin: Unable to alloc memory");
691                         return NULL;
692                 }
693                 ret_str = temp_str;
694         } else {
695                 free(handlers_str);
696         }
697
698         // incorporate the plugin name information if any
699         if (n->plugin[0] != 0) {
700                 res = my_asprintf(&temp_str, "%s, \"collectd_plugin\": \"%s\"", ret_str, n->plugin);
701                 free(ret_str);
702                 if (res == -1) {
703                         ERROR("write_sensu plugin: Unable to alloc memory");
704                         return NULL;
705                 }
706                 ret_str = temp_str;
707         }
708
709         // incorporate the plugin type if any
710         if (n->type[0] != 0) {
711                 res = my_asprintf(&temp_str, "%s, \"collectd_plugin_type\": \"%s\"", ret_str, n->type);
712                 free(ret_str);
713                 if (res == -1) {
714                         ERROR("write_sensu plugin: Unable to alloc memory");
715                         return NULL;
716                 }
717                 ret_str = temp_str;
718         }
719
720         // incorporate the plugin instance if any
721         if (n->plugin_instance[0] != 0) {
722                 res = my_asprintf(&temp_str, "%s, \"collectd_plugin_instance\": \"%s\"", ret_str, n->plugin_instance);
723                 free(ret_str);
724                 if (res == -1) {
725                         ERROR("write_sensu plugin: Unable to alloc memory");
726                         return NULL;
727                 }
728                 ret_str = temp_str;
729         }
730
731         // incorporate the plugin type instance if any
732         if (n->type_instance[0] != 0) {
733                 res = my_asprintf(&temp_str, "%s, \"collectd_plugin_type_instance\": \"%s\"", ret_str, n->type_instance);
734                 free(ret_str);
735                 if (res == -1) {
736                         ERROR("write_sensu plugin: Unable to alloc memory");
737                         return NULL;
738                 }
739                 ret_str = temp_str;
740         }
741
742         // add key value attributes from config if any
743         for (i = 0; i < sensu_attrs_num; i += 2) {
744                 res = my_asprintf(&temp_str, "%s, \"%s\": \"%s\"", ret_str, sensu_attrs[i], sensu_attrs[i+1]);
745                 free(ret_str);
746                 if (res == -1) {
747                         ERROR("write_sensu plugin: Unable to alloc memory");
748                         return NULL;
749                 }
750                 ret_str = temp_str;
751         }
752
753         // incorporate sensu tags from config if any
754         if ((sensu_tags != NULL) && (strlen(sensu_tags) != 0)) {
755                 res = my_asprintf(&temp_str, "%s, %s", ret_str, sensu_tags);
756                 free(ret_str);
757                 if (res == -1) {
758                         ERROR("write_sensu plugin: Unable to alloc memory");
759                         return NULL;
760                 }
761                 ret_str = temp_str;
762         }
763
764         // incorporate the service name
765         sensu_format_name2(service_buffer, sizeof(service_buffer),
766                                 /* host */ "", n->plugin, n->plugin_instance,
767                                 n->type, n->type_instance, host->separator);
768         // replace sensu event name chars that are considered illegal
769         in_place_replace_sensu_name_reserved(service_buffer);
770         res = my_asprintf(&temp_str, "%s, \"name\": \"%s\"", ret_str, &service_buffer[1]);
771         free(ret_str);
772         if (res == -1) {
773                 ERROR("write_sensu plugin: Unable to alloc memory");
774                 return NULL;
775         }
776         ret_str = temp_str;
777
778         // incorporate the check output
779         if (n->message[0] != 0) {
780                 char *msg = replace_json_reserved(n->message);
781                 if (msg == NULL) {
782                         ERROR("write_sensu plugin: Unable to alloc memory");
783                         free(ret_str);
784                         return NULL;
785                 }
786                 res = my_asprintf(&temp_str, "%s, \"output\": \"%s - %s\"", ret_str, severity, msg);
787                 free(ret_str);
788                 free(msg);
789                 if (res == -1) {
790                         ERROR("write_sensu plugin: Unable to alloc memory");
791                         return NULL;
792                 }
793                 ret_str = temp_str;
794         }
795
796         // Pull in values from threshold and add extra attributes
797         for (notification_meta_t *meta = n->meta; meta != NULL; meta = meta->next) {
798                 if (strcasecmp("CurrentValue", meta->name) == 0 && meta->type == NM_TYPE_DOUBLE) {
799                         res = my_asprintf(&temp_str, "%s, \"current_value\": \"%.8f\"", ret_str, meta->nm_value.nm_double);
800                         free(ret_str);
801                         if (res == -1) {
802                                 ERROR("write_sensu plugin: Unable to alloc memory");
803                                 return NULL;
804                         }
805                         ret_str = temp_str;
806                 }
807                 if (meta->type == NM_TYPE_STRING) {
808                         res = my_asprintf(&temp_str, "%s, \"%s\": \"%s\"", ret_str, meta->name, meta->nm_value.nm_string);
809                         free(ret_str);
810                         if (res == -1) {
811                                 ERROR("write_sensu plugin: Unable to alloc memory");
812                                 return NULL;
813                         }
814                         ret_str = temp_str;
815                 }
816         }
817
818         // close the curly bracket
819         res = my_asprintf(&temp_str, "%s}\n", ret_str);
820         free(ret_str);
821         if (res == -1) {
822                 ERROR("write_sensu plugin: Unable to alloc memory");
823                 return NULL;
824         }
825         ret_str = temp_str;
826
827         DEBUG("write_sensu plugin: Successfully created JSON for notification: "
828                                 "host = \"%s\", service = \"%s\", state = \"%s\"",
829                                 n->host, service_buffer, severity);
830         return ret_str;
831 } /* }}} char *sensu_notification_to_json */
832
833 static int sensu_send_msg(struct sensu_host *host, const char *msg) /* {{{ */
834 {
835         int status = 0;
836         size_t  buffer_len;
837
838         status = sensu_connect(host);
839         if (status != 0)
840                 return status;
841
842         buffer_len = strlen(msg);
843
844         status = (int) swrite(host->s, msg, buffer_len);
845         sensu_close_socket(host);
846
847         if (status != 0) {
848                 char errbuf[1024];
849                 ERROR("write_sensu plugin: Sending to Sensu at %s:%s failed: %s",
850                                 (host->node != NULL) ? host->node : SENSU_HOST,
851                                 (host->service != NULL) ? host->service : SENSU_PORT,
852                                 sstrerror(errno, errbuf, sizeof(errbuf)));
853                 return -1;
854         }
855
856         return 0;
857 } /* }}} int sensu_send_msg */
858
859
860 static int sensu_send(struct sensu_host *host, char const *msg) /* {{{ */
861 {
862         int status = 0;
863
864         status = sensu_send_msg(host, msg);
865         if (status != 0) {
866                 host->flags &= ~F_READY;
867                 if (host->res != NULL) {
868                         freeaddrinfo(host->res);
869                         host->res = NULL;
870                 }
871                 return status;
872         }
873
874         return 0;
875 } /* }}} int sensu_send */
876
877
878 static int sensu_write(const data_set_t *ds, /* {{{ */
879               const value_list_t *vl,
880               user_data_t *ud)
881 {
882         int status = 0;
883         int statuses[vl->values_len];
884         struct sensu_host       *host = ud->data;
885         gauge_t *rates = NULL;
886         char *msg;
887
888         pthread_mutex_lock(&host->lock);
889         memset(statuses, 0, vl->values_len * sizeof(*statuses));
890
891         if (host->store_rates) {
892                 rates = uc_get_rate(ds, vl);
893                 if (rates == NULL) {
894                         ERROR("write_sensu plugin: uc_get_rate failed.");
895                         pthread_mutex_unlock(&host->lock);
896                         return -1;
897                 }
898         }
899         for (size_t i = 0; i < vl->values_len; i++) {
900                 msg = sensu_value_to_json(host, ds, vl, (int) i, rates, statuses[i]);
901                 if (msg == NULL) {
902                         sfree(rates);
903                         pthread_mutex_unlock(&host->lock);
904                         return -1;
905                 }
906                 status = sensu_send(host, msg);
907                 free(msg);
908                 if (status != 0) {
909                         ERROR("write_sensu plugin: sensu_send failed with status %i", status);
910                         pthread_mutex_unlock(&host->lock);
911                         sfree(rates);
912                         return status;
913                 }
914         }
915         sfree(rates);
916         pthread_mutex_unlock(&host->lock);
917         return status;
918 } /* }}} int sensu_write */
919
920 static int sensu_notification(const notification_t *n, user_data_t *ud) /* {{{ */
921 {
922         int     status;
923         struct sensu_host *host = ud->data;
924         char *msg;
925
926         pthread_mutex_lock(&host->lock);
927
928         msg = sensu_notification_to_json(host, n);
929         if (msg == NULL) {
930                 pthread_mutex_unlock(&host->lock);
931                 return -1;
932         }
933
934         status = sensu_send(host, msg);
935         free(msg);
936         if (status != 0)
937                 ERROR("write_sensu plugin: sensu_send failed with status %i", status);
938         pthread_mutex_unlock(&host->lock);
939
940         return status;
941 } /* }}} int sensu_notification */
942
943 static void sensu_free(void *p) /* {{{ */
944 {
945         struct sensu_host *host = p;
946
947         if (host == NULL)
948                 return;
949
950         pthread_mutex_lock(&host->lock);
951
952         host->reference_count--;
953         if (host->reference_count > 0) {
954                 pthread_mutex_unlock(&host->lock);
955                 return;
956         }
957
958         sensu_close_socket(host);
959         if (host->res != NULL) {
960                 freeaddrinfo(host->res);
961                 host->res = NULL;
962         }
963         sfree(host->service);
964         sfree(host->event_service_prefix);
965         sfree(host->name);
966         sfree(host->node);
967         sfree(host->separator);
968         free_str_list(&(host->metric_handlers));
969         free_str_list(&(host->notification_handlers));
970         pthread_mutex_destroy(&host->lock);
971         sfree(host);
972 } /* }}} void sensu_free */
973
974
975 static int sensu_config_node(oconfig_item_t *ci) /* {{{ */
976 {
977         struct sensu_host       *host = NULL;
978         int                                     status = 0;
979         oconfig_item_t          *child;
980         char                            callback_name[DATA_MAX_NAME_LEN];
981         user_data_t                     ud;
982
983         if ((host = calloc(1, sizeof(*host))) == NULL) {
984                 ERROR("write_sensu plugin: calloc failed.");
985                 return ENOMEM;
986         }
987         pthread_mutex_init(&host->lock, NULL);
988         host->reference_count = 1;
989         host->node = NULL;
990         host->service = NULL;
991         host->notifications = 0;
992         host->metrics = 0;
993         host->store_rates = 1;
994         host->always_append_ds = 0;
995         host->metric_handlers.nb_strs = 0;
996         host->metric_handlers.strs = NULL;
997         host->notification_handlers.nb_strs = 0;
998         host->notification_handlers.strs = NULL;
999         host->separator = strdup("/");
1000         if (host->separator == NULL) {
1001                 ERROR("write_sensu plugin: Unable to alloc memory");
1002                 sensu_free(host);
1003                 return -1;
1004         }
1005
1006         status = cf_util_get_string(ci, &host->name);
1007         if (status != 0) {
1008                 WARNING("write_sensu plugin: Required host name is missing.");
1009                 sensu_free(host);
1010                 return -1;
1011         }
1012
1013         for (int i = 0; i < ci->children_num; i++) {
1014                 child = &ci->children[i];
1015                 status = 0;
1016
1017                 if (strcasecmp("Host", child->key) == 0) {
1018                         status = cf_util_get_string(child, &host->node);
1019                         if (status != 0)
1020                                 break;
1021                 } else if (strcasecmp("Notifications", child->key) == 0) {
1022                         status = cf_util_get_boolean(child, &host->notifications);
1023                         if (status != 0)
1024                                 break;
1025                 } else if (strcasecmp("Metrics", child->key) == 0) {
1026                                         status = cf_util_get_boolean(child, &host->metrics);
1027                                         if (status != 0)
1028                                                 break;
1029                 } else if (strcasecmp("EventServicePrefix", child->key) == 0) {
1030                         status = cf_util_get_string(child, &host->event_service_prefix);
1031                         if (status != 0)
1032                                 break;
1033                 } else if (strcasecmp("Separator", child->key) == 0) {
1034                                 status = cf_util_get_string(child, &host->separator);
1035                                 if (status != 0)
1036                                         break;
1037                 } else if (strcasecmp("MetricHandler", child->key) == 0) {
1038                         char *temp_str = NULL;
1039                         status = cf_util_get_string(child, &temp_str);
1040                         if (status != 0)
1041                                 break;
1042                         status = add_str_to_list(&(host->metric_handlers), temp_str);
1043                         free(temp_str);
1044                         if (status != 0)
1045                                 break;
1046                 } else if (strcasecmp("NotificationHandler", child->key) == 0) {
1047                         char *temp_str = NULL;
1048                         status = cf_util_get_string(child, &temp_str);
1049                         if (status != 0)
1050                                 break;
1051                         status = add_str_to_list(&(host->notification_handlers), temp_str);
1052                         free(temp_str);
1053                         if (status != 0)
1054                                 break;
1055                 } else if (strcasecmp("Port", child->key) == 0) {
1056                         status = cf_util_get_service(child, &host->service);
1057                         if (status != 0) {
1058                                 ERROR("write_sensu plugin: Invalid argument "
1059                                                 "configured for the \"Port\" "
1060                                                 "option.");
1061                                 break;
1062                         }
1063                 } else if (strcasecmp("StoreRates", child->key) == 0) {
1064                         status = cf_util_get_boolean(child, &host->store_rates);
1065                         if (status != 0)
1066                                 break;
1067                 } else if (strcasecmp("AlwaysAppendDS", child->key) == 0) {
1068                         status = cf_util_get_boolean(child,
1069                                         &host->always_append_ds);
1070                         if (status != 0)
1071                                 break;
1072                 } else {
1073                         WARNING("write_sensu plugin: ignoring unknown config "
1074                                 "option: \"%s\"", child->key);
1075                 }
1076         }
1077         if (status != 0) {
1078                 sensu_free(host);
1079                 return status;
1080         }
1081
1082         if (host->metrics && (host->metric_handlers.nb_strs == 0)) {
1083                         sensu_free(host);
1084                         WARNING("write_sensu plugin: metrics enabled but no MetricHandler defined. Giving up.");
1085                         return -1;
1086                 }
1087
1088         if (host->notifications && (host->notification_handlers.nb_strs == 0)) {
1089                 sensu_free(host);
1090                 WARNING("write_sensu plugin: notifications enabled but no NotificationHandler defined. Giving up.");
1091                 return -1;
1092         }
1093
1094         if ((host->notification_handlers.nb_strs > 0) && (host->notifications == 0)) {
1095                 WARNING("write_sensu plugin: NotificationHandler given so forcing notifications to be enabled");
1096                 host->notifications = 1;
1097         }
1098
1099         if ((host->metric_handlers.nb_strs > 0) && (host->metrics == 0)) {
1100                 WARNING("write_sensu plugin: MetricHandler given so forcing metrics to be enabled");
1101                 host->metrics = 1;
1102         }
1103
1104         if (!(host->notifications || host->metrics)) {
1105                 WARNING("write_sensu plugin: neither metrics nor notifications enabled. Giving up.");
1106                 sensu_free(host);
1107                 return -1;
1108         }
1109
1110         ssnprintf(callback_name, sizeof(callback_name), "write_sensu/%s", host->name);
1111         ud.data = host;
1112         ud.free_func = sensu_free;
1113
1114         pthread_mutex_lock(&host->lock);
1115
1116         if (host->metrics) {
1117                 status = plugin_register_write(callback_name, sensu_write, &ud);
1118                 if (status != 0)
1119                         WARNING("write_sensu plugin: plugin_register_write (\"%s\") "
1120                                         "failed with status %i.",
1121                                         callback_name, status);
1122                 else /* success */
1123                         host->reference_count++;
1124         }
1125
1126         if (host->notifications) {
1127                 status = plugin_register_notification(callback_name, sensu_notification, &ud);
1128                 if (status != 0)
1129                         WARNING("write_sensu plugin: plugin_register_notification (\"%s\") "
1130                                         "failed with status %i.",
1131                                         callback_name, status);
1132                 else
1133                         host->reference_count++;
1134         }
1135
1136         if (host->reference_count <= 1) {
1137                 /* Both callbacks failed => free memory.
1138                  * We need to unlock here, because sensu_free() will lock.
1139                  * This is not a race condition, because we're the only one
1140                  * holding a reference. */
1141                 pthread_mutex_unlock(&host->lock);
1142                 sensu_free(host);
1143                 return -1;
1144         }
1145
1146         host->reference_count--;
1147         pthread_mutex_unlock(&host->lock);
1148
1149         return status;
1150 } /* }}} int sensu_config_node */
1151
1152 static int sensu_config(oconfig_item_t *ci) /* {{{ */
1153 {
1154         oconfig_item_t  *child;
1155         int              status;
1156         struct str_list sensu_tags_arr;
1157
1158         sensu_tags_arr.nb_strs = 0;
1159         sensu_tags_arr.strs = NULL;
1160
1161         for (int i = 0; i < ci->children_num; i++)  {
1162                 child = &ci->children[i];
1163
1164                 if (strcasecmp("Node", child->key) == 0) {
1165                         sensu_config_node(child);
1166                 } else if (strcasecmp(child->key, "attribute") == 0) {
1167                         if (child->values_num != 2) {
1168                                 WARNING("sensu attributes need both a key and a value.");
1169                                 continue;
1170                         }
1171                         if (child->values[0].type != OCONFIG_TYPE_STRING ||
1172                                         child->values[1].type != OCONFIG_TYPE_STRING) {
1173                                 WARNING("sensu attribute needs string arguments.");
1174                                 continue;
1175                         }
1176
1177                         strarray_add(&sensu_attrs, &sensu_attrs_num, child->values[0].value.string);
1178                         strarray_add(&sensu_attrs, &sensu_attrs_num, child->values[1].value.string);
1179
1180                         DEBUG("write_sensu plugin: New attribute: %s => %s",
1181                                         child->values[0].value.string,
1182                                         child->values[1].value.string);
1183                 } else if (strcasecmp(child->key, "tag") == 0) {
1184                         char *tmp = NULL;
1185                         status = cf_util_get_string(child, &tmp);
1186                         if (status != 0)
1187                                 continue;
1188
1189                         status = add_str_to_list(&sensu_tags_arr, tmp);
1190                         sfree(tmp);
1191                         if (status != 0)
1192                                 continue;
1193                         DEBUG("write_sensu plugin: Got tag: %s", tmp);
1194                 } else {
1195                         WARNING("write_sensu plugin: Ignoring unknown "
1196                                  "configuration option \"%s\" at top level.",
1197                                  child->key);
1198                 }
1199         }
1200         if (sensu_tags_arr.nb_strs > 0) {
1201                 sfree (sensu_tags);
1202                 sensu_tags = build_json_str_list("tags", &sensu_tags_arr);
1203                 free_str_list(&sensu_tags_arr);
1204                 if (sensu_tags == NULL) {
1205                         ERROR("write_sensu plugin: Unable to alloc memory");
1206                         return -1;
1207                 }
1208         }
1209         return 0;
1210 } /* }}} int sensu_config */
1211
1212 void module_register(void)
1213 {
1214         plugin_register_complex_config("write_sensu", sensu_config);
1215 }
1216
1217 /* vim: set sw=8 sts=8 ts=8 noet : */