Merge pull request #1876 from octo/issue/1819
[collectd.git] / src / write_sensu.c
1 /**
2  * collectd - src/write_sensu.c
3  * Copyright (C) 2015 Fabrice A. Marie
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Fabrice A. Marie <fabrice at kibinlabs.com>
25  */
26
27 #define _GNU_SOURCE
28
29 #include "collectd.h"
30
31 #include "plugin.h"
32 #include "common.h"
33 #include "configfile.h"
34 #include "utils_cache.h"
35 #include <arpa/inet.h>
36 #include <errno.h>
37 #include <netdb.h>
38 #include <inttypes.h>
39 #include <stddef.h>
40
41 #include <stdlib.h>
42 #define SENSU_HOST              "localhost"
43 #define SENSU_PORT              "3030"
44
45 #ifdef HAVE_ASPRINTF
46 #define my_asprintf asprintf
47 #define my_vasprintf vasprintf
48 #else
49 /*
50  * asprintf() is available from Solaris 10 update 11.
51  * For older versions, use asprintf() portable implementation from
52  * https://github.com/littlstar/asprintf.c/blob/master/
53  * copyright (c) 2014 joseph werle <joseph.werle@gmail.com> under MIT license.
54  */
55
56 static int my_vasprintf(char **str, const char *fmt, va_list args) {
57        int size = 0;
58        va_list tmpa;
59        // copy
60        va_copy(tmpa, args);
61        // apply variadic arguments to
62        // sprintf with format to get size
63        size = vsnprintf(NULL, size, fmt, tmpa);
64        // toss args
65        va_end(tmpa);
66        // return -1 to be compliant if
67        // size is less than 0
68        if (size < 0) { return -1; }
69        // alloc with size plus 1 for `\0'
70        *str = (char *) malloc(size + 1);
71        // return -1 to be compliant
72        // if pointer is `NULL'
73        if (NULL == *str) { return -1; }
74        // format string with original
75        // variadic arguments and set new size
76        size = vsprintf(*str, fmt, args);
77        return size;
78 }
79
80 static int my_asprintf(char **str, const char *fmt, ...) {
81        int size = 0;
82        va_list args;
83        // init variadic argumens
84        va_start(args, fmt);
85        // format and get size
86        size = my_vasprintf(str, fmt, args);
87        // toss args
88        va_end(args);
89        return size;
90 }
91
92 #endif
93
94 struct str_list {
95         int nb_strs;
96         char **strs;
97 };
98
99 struct sensu_host {
100         char                    *name;
101         char                    *event_service_prefix;
102         struct str_list metric_handlers;
103         struct str_list notification_handlers;
104 #define F_READY      0x01
105         uint8_t                  flags;
106         pthread_mutex_t  lock;
107         _Bool            notifications;
108         _Bool            metrics;
109         _Bool                    store_rates;
110         _Bool                    always_append_ds;
111         char                    *separator;
112         char                    *node;
113         char                    *service;
114         int              s;
115         struct addrinfo *res;
116         int                          reference_count;
117 };
118
119 static char     *sensu_tags = NULL;
120 static char     **sensu_attrs = NULL;
121 static size_t sensu_attrs_num;
122
123 static int add_str_to_list(struct str_list *strs,
124                 const char *str_to_add) /* {{{ */
125 {
126         char **old_strs_ptr = strs->strs;
127         char *newstr = strdup(str_to_add);
128         if (newstr == NULL) {
129                 ERROR("write_sensu plugin: Unable to alloc memory");
130                 return -1;
131         }
132         strs->strs = realloc(strs->strs, sizeof(char *) *(strs->nb_strs + 1));
133         if (strs->strs == NULL) {
134                 strs->strs = old_strs_ptr;
135                 free(newstr);
136                 ERROR("write_sensu plugin: Unable to alloc memory");
137                 return -1;
138         }
139         strs->strs[strs->nb_strs] = newstr;
140         strs->nb_strs++;
141         return 0;
142 }
143 /* }}} int add_str_to_list */
144
145 static void free_str_list(struct str_list *strs) /* {{{ */
146 {
147         for (int i=0; i<strs->nb_strs; i++)
148                 free(strs->strs[i]);
149         free(strs->strs);
150 }
151 /* }}} void free_str_list */
152
153 static int sensu_connect(struct sensu_host *host) /* {{{ */
154 {
155         int                      e;
156         char const              *node;
157         char const              *service;
158
159         // Resolve the target if we haven't done already
160         if (!(host->flags & F_READY)) {
161                 memset(&service, 0, sizeof(service));
162                 host->res = NULL;
163
164                 node = (host->node != NULL) ? host->node : SENSU_HOST;
165                 service = (host->service != NULL) ? host->service : SENSU_PORT;
166
167                 struct addrinfo ai_hints = {
168                         .ai_family = AF_INET,
169                         .ai_flags = AI_ADDRCONFIG,
170                         .ai_socktype = SOCK_STREAM
171                 };
172
173                 if ((e = getaddrinfo(node, service, &ai_hints, &(host->res))) != 0) {
174                         ERROR("write_sensu plugin: Unable to resolve host \"%s\": %s",
175                                         node, gai_strerror(e));
176                         return -1;
177                 }
178                 DEBUG("write_sensu plugin: successfully resolved host/port: %s/%s",
179                                 node, service);
180                 host->flags |= F_READY;
181         }
182
183         struct linger so_linger;
184         host->s = -1;
185         for (struct addrinfo *ai = host->res; ai != NULL; ai = ai->ai_next) {
186                 // create the socket
187                 if ((host->s = socket(ai->ai_family,
188                                       ai->ai_socktype,
189                                       ai->ai_protocol)) == -1) {
190                         continue;
191                 }
192
193                 // Set very low close() lingering
194                 so_linger.l_onoff = 1;
195                 so_linger.l_linger = 3;
196                 if (setsockopt(host->s, SOL_SOCKET, SO_LINGER, &so_linger, sizeof so_linger) != 0)
197                         WARNING("write_sensu plugin: failed to set socket close() lingering");
198
199                 set_sock_opts(host->s);
200
201                 // connect the socket
202                 if (connect(host->s, ai->ai_addr, ai->ai_addrlen) != 0) {
203                         close(host->s);
204                         host->s = -1;
205                         continue;
206                 }
207                 DEBUG("write_sensu plugin: connected");
208                 break;
209         }
210
211         if (host->s < 0) {
212                 WARNING("write_sensu plugin: Unable to connect to sensu client");
213                 return -1;
214         }
215         return 0;
216 } /* }}} int sensu_connect */
217
218 static void sensu_close_socket(struct sensu_host *host) /* {{{ */
219 {
220         if (host->s != -1)
221                 close(host->s);
222         host->s = -1;
223
224 } /* }}} void sensu_close_socket */
225
226 static char *build_json_str_list(const char *tag, struct str_list const *list) /* {{{ */
227 {
228         int res;
229         char *ret_str = NULL;
230         char *temp_str;
231         if (list->nb_strs == 0) {
232                 ret_str = malloc(sizeof(char));
233                 if (ret_str == NULL) {
234                         ERROR("write_sensu plugin: Unable to alloc memory");
235                         return NULL;
236                 }
237                 ret_str[0] = '\0';
238         }
239
240         res = my_asprintf(&temp_str, "\"%s\": [\"%s\"", tag, list->strs[0]);
241         if (res == -1) {
242                 ERROR("write_sensu plugin: Unable to alloc memory");
243                 free(ret_str);
244                 return NULL;
245         }
246         for (int i=1; i<list->nb_strs; i++) {
247                 res = my_asprintf(&ret_str, "%s, \"%s\"", temp_str, list->strs[i]);
248                 free(temp_str);
249                 if (res == -1) {
250                         ERROR("write_sensu plugin: Unable to alloc memory");
251                         return NULL;
252                 }
253                 temp_str = ret_str;
254         }
255         res = my_asprintf(&ret_str, "%s]", temp_str);
256         free(temp_str);
257         if (res == -1) {
258                 ERROR("write_sensu plugin: Unable to alloc memory");
259                 return NULL;
260         }
261
262         return ret_str;
263 } /* }}} char *build_json_str_list*/
264
265 static int sensu_format_name2(char *ret, int ret_len,
266                 const char *hostname,
267                 const char *plugin, const char *plugin_instance,
268                 const char *type, const char *type_instance,
269                 const char *separator)
270 {
271         char *buffer;
272         size_t buffer_size;
273
274         buffer = ret;
275         buffer_size = (size_t) ret_len;
276
277 #define APPEND(str) do {          \
278         size_t l = strlen (str);        \
279         if (l >= buffer_size)           \
280                 return (ENOBUFS);             \
281         memcpy (buffer, (str), l);      \
282         buffer += l; buffer_size -= l;  \
283 } while (0)
284
285         assert (plugin != NULL);
286         assert (type != NULL);
287
288         APPEND (hostname);
289         APPEND (separator);
290         APPEND (plugin);
291         if ((plugin_instance != NULL) && (plugin_instance[0] != 0))
292         {
293                 APPEND ("-");
294                 APPEND (plugin_instance);
295         }
296         APPEND (separator);
297         APPEND (type);
298         if ((type_instance != NULL) && (type_instance[0] != 0))
299         {
300                 APPEND ("-");
301                 APPEND (type_instance);
302         }
303         assert (buffer_size > 0);
304         buffer[0] = 0;
305
306 #undef APPEND
307         return (0);
308 } /* int sensu_format_name2 */
309
310 static void in_place_replace_sensu_name_reserved(char *orig_name) /* {{{ */
311 {
312         int len=strlen(orig_name);
313         for (int i=0; i<len; i++) {
314                 // some plugins like ipmi generate special characters in metric name
315                 switch(orig_name[i]) {
316                         case '(': orig_name[i] = '_'; break;
317                         case ')': orig_name[i] = '_'; break;
318                         case ' ': orig_name[i] = '_'; break;
319                         case '"': orig_name[i] = '_'; break;
320                         case '\'': orig_name[i] = '_'; break;
321                         case '+': orig_name[i] = '_'; break;
322                 }
323         }
324 } /* }}} char *replace_sensu_name_reserved */
325
326 static char *sensu_value_to_json(struct sensu_host const *host, /* {{{ */
327                 data_set_t const *ds,
328                 value_list_t const *vl, size_t index,
329                 gauge_t const *rates,
330                 int status)
331 {
332         char name_buffer[5 * DATA_MAX_NAME_LEN];
333         char service_buffer[6 * DATA_MAX_NAME_LEN];
334         char *ret_str;
335         char *temp_str;
336         char *value_str;
337         int res;
338         // First part of the JSON string
339         const char *part1 = "{\"name\": \"collectd\", \"type\": \"metric\"";
340
341         char *handlers_str = build_json_str_list("handlers", &(host->metric_handlers));
342         if (handlers_str == NULL) {
343                 ERROR("write_sensu plugin: Unable to alloc memory");
344                 return NULL;
345         }
346
347         // incorporate the handlers
348         if (strlen(handlers_str) == 0) {
349                 free(handlers_str);
350                 ret_str = strdup(part1);
351                 if (ret_str == NULL) {
352                         ERROR("write_sensu plugin: Unable to alloc memory");
353                         return NULL;
354                 }
355         }
356         else {
357                 res = my_asprintf(&ret_str, "%s, %s", part1, handlers_str);
358                 free(handlers_str);
359                 if (res == -1) {
360                         ERROR("write_sensu plugin: Unable to alloc memory");
361                         return NULL;
362                 }
363         }
364
365         // incorporate the plugin name information
366         res = my_asprintf(&temp_str, "%s, \"collectd_plugin\": \"%s\"", ret_str, vl->plugin);
367         free(ret_str);
368         if (res == -1) {
369                 ERROR("write_sensu plugin: Unable to alloc memory");
370                 return NULL;
371         }
372         ret_str = temp_str;
373
374         // incorporate the plugin type
375         res = my_asprintf(&temp_str, "%s, \"collectd_plugin_type\": \"%s\"", ret_str, vl->type);
376         free(ret_str);
377         if (res == -1) {
378                 ERROR("write_sensu plugin: Unable to alloc memory");
379                 return NULL;
380         }
381         ret_str = temp_str;
382
383         // incorporate the plugin instance if any
384         if (vl->plugin_instance[0] != 0) {
385                 res = my_asprintf(&temp_str, "%s, \"collectd_plugin_instance\": \"%s\"", ret_str, vl->plugin_instance);
386                 free(ret_str);
387                 if (res == -1) {
388                         ERROR("write_sensu plugin: Unable to alloc memory");
389                         return NULL;
390                 }
391                 ret_str = temp_str;
392         }
393
394         // incorporate the plugin type instance if any
395         if (vl->type_instance[0] != 0) {
396                 res = my_asprintf(&temp_str, "%s, \"collectd_plugin_type_instance\": \"%s\"", ret_str, vl->type_instance);
397                 free(ret_str);
398                 if (res == -1) {
399                         ERROR("write_sensu plugin: Unable to alloc memory");
400                         return NULL;
401                 }
402                 ret_str = temp_str;
403         }
404
405         // incorporate the data source type
406         if ((ds->ds[index].type != DS_TYPE_GAUGE) && (rates != NULL)) {
407                 char ds_type[DATA_MAX_NAME_LEN];
408                 ssnprintf (ds_type, sizeof (ds_type), "%s:rate", DS_TYPE_TO_STRING(ds->ds[index].type));
409                 res = my_asprintf(&temp_str, "%s, \"collectd_data_source_type\": \"%s\"", ret_str, ds_type);
410                 free(ret_str);
411                 if (res == -1) {
412                         ERROR("write_sensu plugin: Unable to alloc memory");
413                         return NULL;
414                 }
415                 ret_str = temp_str;
416         } else {
417                 res = my_asprintf(&temp_str, "%s, \"collectd_data_source_type\": \"%s\"", ret_str, DS_TYPE_TO_STRING(ds->ds[index].type));
418                 free(ret_str);
419                 if (res == -1) {
420                         ERROR("write_sensu plugin: Unable to alloc memory");
421                         return NULL;
422                 }
423                 ret_str = temp_str;
424         }
425
426         // incorporate the data source name
427         res = my_asprintf(&temp_str, "%s, \"collectd_data_source_name\": \"%s\"", ret_str, ds->ds[index].name);
428         free(ret_str);
429         if (res == -1) {
430                 ERROR("write_sensu plugin: Unable to alloc memory");
431                 return NULL;
432         }
433         ret_str = temp_str;
434
435         // incorporate the data source index
436         {
437                 char ds_index[DATA_MAX_NAME_LEN];
438                 ssnprintf (ds_index, sizeof (ds_index), "%zu", index);
439                 res = my_asprintf(&temp_str, "%s, \"collectd_data_source_index\": %s", ret_str, ds_index);
440                 free(ret_str);
441                 if (res == -1) {
442                         ERROR("write_sensu plugin: Unable to alloc memory");
443                         return NULL;
444                 }
445                 ret_str = temp_str;
446         }
447
448         // add key value attributes from config if any
449         for (size_t i = 0; i < sensu_attrs_num; i += 2) {
450                 res = my_asprintf(&temp_str, "%s, \"%s\": \"%s\"", ret_str, sensu_attrs[i], sensu_attrs[i+1]);
451                 free(ret_str);
452                 if (res == -1) {
453                         ERROR("write_sensu plugin: Unable to alloc memory");
454                         return NULL;
455                 }
456                 ret_str = temp_str;
457         }
458
459         // incorporate sensu tags from config if any
460         if ((sensu_tags != NULL) && (strlen(sensu_tags) != 0)) {
461                 res = my_asprintf(&temp_str, "%s, %s", ret_str, sensu_tags);
462                 free(ret_str);
463                 if (res == -1) {
464                         ERROR("write_sensu plugin: Unable to alloc memory");
465                         return NULL;
466                 }
467                 ret_str = temp_str;
468         }
469
470         // calculate the value and set to a string
471         if (ds->ds[index].type == DS_TYPE_GAUGE) {
472                 res = my_asprintf(&value_str, GAUGE_FORMAT, vl->values[index].gauge);
473                 if (res == -1) {
474                         free(ret_str);
475                         ERROR("write_sensu plugin: Unable to alloc memory");
476                         return NULL;
477                 }
478         } else if (rates != NULL) {
479                 res = my_asprintf(&value_str, GAUGE_FORMAT, rates[index]);
480                 if (res == -1) {
481                         free(ret_str);
482                         ERROR("write_sensu plugin: Unable to alloc memory");
483                         return NULL;
484                 }
485         } else {
486                 if (ds->ds[index].type == DS_TYPE_DERIVE) {
487                         res = my_asprintf(&value_str, "%"PRIi64, vl->values[index].derive);
488                         if (res == -1) {
489                                 free(ret_str);
490                                 ERROR("write_sensu plugin: Unable to alloc memory");
491                                 return NULL;
492                         }
493                 }
494                 else if (ds->ds[index].type == DS_TYPE_ABSOLUTE) {
495                         res = my_asprintf(&value_str, "%"PRIu64, vl->values[index].absolute);
496                         if (res == -1) {
497                                 free(ret_str);
498                                 ERROR("write_sensu plugin: Unable to alloc memory");
499                                 return NULL;
500                         }
501                 }
502                 else {
503                         res = my_asprintf(&value_str, "%llu", vl->values[index].counter);
504                         if (res == -1) {
505                                 free(ret_str);
506                                 ERROR("write_sensu plugin: Unable to alloc memory");
507                                 return NULL;
508                         }
509                 }
510         }
511
512         // Generate the full service name
513         sensu_format_name2(name_buffer, sizeof(name_buffer),
514                 vl->host, vl->plugin, vl->plugin_instance,
515                 vl->type, vl->type_instance, host->separator);
516         if (host->always_append_ds || (ds->ds_num > 1)) {
517                 if (host->event_service_prefix == NULL)
518                         ssnprintf(service_buffer, sizeof(service_buffer), "%s.%s",
519                                         name_buffer, ds->ds[index].name);
520                 else
521                         ssnprintf(service_buffer, sizeof(service_buffer), "%s%s.%s",
522                                         host->event_service_prefix, name_buffer, ds->ds[index].name);
523         } else {
524                 if (host->event_service_prefix == NULL)
525                         sstrncpy(service_buffer, name_buffer, sizeof(service_buffer));
526                 else
527                         ssnprintf(service_buffer, sizeof(service_buffer), "%s%s",
528                                         host->event_service_prefix, name_buffer);
529         }
530
531         // Replace collectd sensor name reserved characters so that time series DB is happy
532         in_place_replace_sensu_name_reserved(service_buffer);
533
534         // finalize the buffer by setting the output and closing curly bracket
535         res = my_asprintf(&temp_str, "%s, \"output\": \"%s %s %lld\"}\n",ret_str, service_buffer, value_str, (long long)CDTIME_T_TO_TIME_T(vl->time));
536         free(ret_str);
537         free(value_str);
538         if (res == -1) {
539                 ERROR("write_sensu plugin: Unable to alloc memory");
540                 return NULL;
541         }
542         ret_str = temp_str;
543
544         DEBUG("write_sensu plugin: Successfully created json for metric: "
545                         "host = \"%s\", service = \"%s\"",
546                         vl->host, service_buffer);
547         return ret_str;
548 } /* }}} char *sensu_value_to_json */
549
550 /*
551  * Uses replace_str2() implementation from
552  * http://creativeandcritical.net/str-replace-c/
553  * copyright (c) Laird Shaw, under public domain.
554  */
555 static char *replace_str(const char *str, const char *old, /* {{{ */
556                 const char *new)
557 {
558         char *ret, *r;
559         const char *p, *q;
560         size_t oldlen = strlen(old);
561         size_t count = strlen(new);
562         size_t retlen;
563         size_t newlen = count;
564         int samesize = (oldlen == newlen);
565
566         if (!samesize) {
567                 for (count = 0, p = str; (q = strstr(p, old)) != NULL; p = q + oldlen)
568                         count++;
569                 /* This is undefined if p - str > PTRDIFF_MAX */
570                 retlen = p - str + strlen(p) + count * (newlen - oldlen);
571         } else
572                 retlen = strlen(str);
573
574         ret = calloc(1, retlen + 1);
575         if (ret == NULL)
576                 return NULL;
577         // added to original: not optimized, but keeps valgrind happy.
578
579         r = ret;
580         p = str;
581         while (1) {
582                 /* If the old and new strings are different lengths - in other
583                  * words we have already iterated through with strstr above,
584                  * and thus we know how many times we need to call it - then we
585                  * can avoid the final (potentially lengthy) call to strstr,
586                  * which we already know is going to return NULL, by
587                  * decrementing and checking count.
588                  */
589                 if (!samesize && !count--)
590                         break;
591                 /* Otherwise i.e. when the old and new strings are the same
592                  * length, and we don't know how many times to call strstr,
593                  * we must check for a NULL return here (we check it in any
594                  * event, to avoid further conditions, and because there's
595                  * no harm done with the check even when the old and new
596                  * strings are different lengths).
597                  */
598                 if ((q = strstr(p, old)) == NULL)
599                         break;
600                 /* This is undefined if q - p > PTRDIFF_MAX */
601                 ptrdiff_t l = q - p;
602                 memcpy(r, p, l);
603                 r += l;
604                 memcpy(r, new, newlen);
605                 r += newlen;
606                 p = q + oldlen;
607         }
608         strncpy(r, p, strlen(p));
609
610         return ret;
611 } /* }}} char *replace_str */
612
613 static char *replace_json_reserved(const char *message) /* {{{ */
614 {
615         char *msg = replace_str(message, "\\", "\\\\");
616         if (msg == NULL) {
617                 ERROR("write_sensu plugin: Unable to alloc memory");
618                 return NULL;
619         }
620         char *tmp = replace_str(msg, "\"", "\\\"");
621         free(msg);
622         if (tmp == NULL) {
623                 ERROR("write_sensu plugin: Unable to alloc memory");
624                 return NULL;
625         }
626         msg = replace_str(tmp, "\n", "\\\n");
627         free(tmp);
628         if (msg == NULL) {
629                 ERROR("write_sensu plugin: Unable to alloc memory");
630                 return NULL;
631         }
632         return msg;
633 } /* }}} char *replace_json_reserved */
634
635 static char *sensu_notification_to_json(struct sensu_host *host, /* {{{ */
636                 notification_t const *n)
637 {
638         char service_buffer[6 * DATA_MAX_NAME_LEN];
639         char const *severity;
640         char *ret_str;
641         char *temp_str;
642         int status;
643         size_t i;
644         int res;
645         // add the severity/status
646         switch (n->severity) {
647                 case NOTIF_OKAY:
648                         severity = "OK";
649                         status = 0;
650                         break;
651                 case NOTIF_WARNING:
652                         severity = "WARNING";
653                         status = 1;
654                         break;
655                 case NOTIF_FAILURE:
656                         severity = "CRITICAL";
657                         status = 2;
658                         break;
659                 default:
660                         severity = "UNKNOWN";
661                         status = 3;
662         }
663         res = my_asprintf(&temp_str, "{\"status\": %d", status);
664         if (res == -1) {
665                 ERROR("write_sensu plugin: Unable to alloc memory");
666                 return NULL;
667         }
668         ret_str = temp_str;
669
670         // incorporate the timestamp
671         res = my_asprintf(&temp_str, "%s, \"timestamp\": %lld", ret_str, (long long)CDTIME_T_TO_TIME_T(n->time));
672         free(ret_str);
673         if (res == -1) {
674                 ERROR("write_sensu plugin: Unable to alloc memory");
675                 return NULL;
676         }
677         ret_str = temp_str;
678
679         char *handlers_str = build_json_str_list("handlers", &(host->notification_handlers));
680         if (handlers_str == NULL) {
681                 ERROR("write_sensu plugin: Unable to alloc memory");
682                 free(ret_str);
683                 return NULL;
684         }
685         // incorporate the handlers
686         if (strlen(handlers_str) != 0) {
687                 res = my_asprintf(&temp_str, "%s, %s", ret_str, handlers_str);
688                 free(ret_str);
689                 free(handlers_str);
690                 if (res == -1) {
691                         ERROR("write_sensu plugin: Unable to alloc memory");
692                         return NULL;
693                 }
694                 ret_str = temp_str;
695         } else {
696                 free(handlers_str);
697         }
698
699         // incorporate the plugin name information if any
700         if (n->plugin[0] != 0) {
701                 res = my_asprintf(&temp_str, "%s, \"collectd_plugin\": \"%s\"", ret_str, n->plugin);
702                 free(ret_str);
703                 if (res == -1) {
704                         ERROR("write_sensu plugin: Unable to alloc memory");
705                         return NULL;
706                 }
707                 ret_str = temp_str;
708         }
709
710         // incorporate the plugin type if any
711         if (n->type[0] != 0) {
712                 res = my_asprintf(&temp_str, "%s, \"collectd_plugin_type\": \"%s\"", ret_str, n->type);
713                 free(ret_str);
714                 if (res == -1) {
715                         ERROR("write_sensu plugin: Unable to alloc memory");
716                         return NULL;
717                 }
718                 ret_str = temp_str;
719         }
720
721         // incorporate the plugin instance if any
722         if (n->plugin_instance[0] != 0) {
723                 res = my_asprintf(&temp_str, "%s, \"collectd_plugin_instance\": \"%s\"", ret_str, n->plugin_instance);
724                 free(ret_str);
725                 if (res == -1) {
726                         ERROR("write_sensu plugin: Unable to alloc memory");
727                         return NULL;
728                 }
729                 ret_str = temp_str;
730         }
731
732         // incorporate the plugin type instance if any
733         if (n->type_instance[0] != 0) {
734                 res = my_asprintf(&temp_str, "%s, \"collectd_plugin_type_instance\": \"%s\"", ret_str, n->type_instance);
735                 free(ret_str);
736                 if (res == -1) {
737                         ERROR("write_sensu plugin: Unable to alloc memory");
738                         return NULL;
739                 }
740                 ret_str = temp_str;
741         }
742
743         // add key value attributes from config if any
744         for (i = 0; i < sensu_attrs_num; i += 2) {
745                 res = my_asprintf(&temp_str, "%s, \"%s\": \"%s\"", ret_str, sensu_attrs[i], sensu_attrs[i+1]);
746                 free(ret_str);
747                 if (res == -1) {
748                         ERROR("write_sensu plugin: Unable to alloc memory");
749                         return NULL;
750                 }
751                 ret_str = temp_str;
752         }
753
754         // incorporate sensu tags from config if any
755         if ((sensu_tags != NULL) && (strlen(sensu_tags) != 0)) {
756                 res = my_asprintf(&temp_str, "%s, %s", ret_str, sensu_tags);
757                 free(ret_str);
758                 if (res == -1) {
759                         ERROR("write_sensu plugin: Unable to alloc memory");
760                         return NULL;
761                 }
762                 ret_str = temp_str;
763         }
764
765         // incorporate the service name
766         sensu_format_name2(service_buffer, sizeof(service_buffer),
767                                 /* host */ "", n->plugin, n->plugin_instance,
768                                 n->type, n->type_instance, host->separator);
769         // replace sensu event name chars that are considered illegal
770         in_place_replace_sensu_name_reserved(service_buffer);
771         res = my_asprintf(&temp_str, "%s, \"name\": \"%s\"", ret_str, &service_buffer[1]);
772         free(ret_str);
773         if (res == -1) {
774                 ERROR("write_sensu plugin: Unable to alloc memory");
775                 return NULL;
776         }
777         ret_str = temp_str;
778
779         // incorporate the check output
780         if (n->message[0] != 0) {
781                 char *msg = replace_json_reserved(n->message);
782                 if (msg == NULL) {
783                         ERROR("write_sensu plugin: Unable to alloc memory");
784                         free(ret_str);
785                         return NULL;
786                 }
787                 res = my_asprintf(&temp_str, "%s, \"output\": \"%s - %s\"", ret_str, severity, msg);
788                 free(ret_str);
789                 free(msg);
790                 if (res == -1) {
791                         ERROR("write_sensu plugin: Unable to alloc memory");
792                         return NULL;
793                 }
794                 ret_str = temp_str;
795         }
796
797         // Pull in values from threshold and add extra attributes
798         for (notification_meta_t *meta = n->meta; meta != NULL; meta = meta->next) {
799                 if (strcasecmp("CurrentValue", meta->name) == 0 && meta->type == NM_TYPE_DOUBLE) {
800                         res = my_asprintf(&temp_str, "%s, \"current_value\": \"%.8f\"", ret_str, meta->nm_value.nm_double);
801                         free(ret_str);
802                         if (res == -1) {
803                                 ERROR("write_sensu plugin: Unable to alloc memory");
804                                 return NULL;
805                         }
806                         ret_str = temp_str;
807                 }
808                 if (meta->type == NM_TYPE_STRING) {
809                         res = my_asprintf(&temp_str, "%s, \"%s\": \"%s\"", ret_str, meta->name, meta->nm_value.nm_string);
810                         free(ret_str);
811                         if (res == -1) {
812                                 ERROR("write_sensu plugin: Unable to alloc memory");
813                                 return NULL;
814                         }
815                         ret_str = temp_str;
816                 }
817         }
818
819         // close the curly bracket
820         res = my_asprintf(&temp_str, "%s}\n", ret_str);
821         free(ret_str);
822         if (res == -1) {
823                 ERROR("write_sensu plugin: Unable to alloc memory");
824                 return NULL;
825         }
826         ret_str = temp_str;
827
828         DEBUG("write_sensu plugin: Successfully created JSON for notification: "
829                                 "host = \"%s\", service = \"%s\", state = \"%s\"",
830                                 n->host, service_buffer, severity);
831         return ret_str;
832 } /* }}} char *sensu_notification_to_json */
833
834 static int sensu_send_msg(struct sensu_host *host, const char *msg) /* {{{ */
835 {
836         int status = 0;
837         size_t  buffer_len;
838
839         status = sensu_connect(host);
840         if (status != 0)
841                 return status;
842
843         buffer_len = strlen(msg);
844
845         status = (int) swrite(host->s, msg, buffer_len);
846         sensu_close_socket(host);
847
848         if (status != 0) {
849                 char errbuf[1024];
850                 ERROR("write_sensu plugin: Sending to Sensu at %s:%s failed: %s",
851                                 (host->node != NULL) ? host->node : SENSU_HOST,
852                                 (host->service != NULL) ? host->service : SENSU_PORT,
853                                 sstrerror(errno, errbuf, sizeof(errbuf)));
854                 return -1;
855         }
856
857         return 0;
858 } /* }}} int sensu_send_msg */
859
860
861 static int sensu_send(struct sensu_host *host, char const *msg) /* {{{ */
862 {
863         int status = 0;
864
865         status = sensu_send_msg(host, msg);
866         if (status != 0) {
867                 host->flags &= ~F_READY;
868                 if (host->res != NULL) {
869                         freeaddrinfo(host->res);
870                         host->res = NULL;
871                 }
872                 return status;
873         }
874
875         return 0;
876 } /* }}} int sensu_send */
877
878
879 static int sensu_write(const data_set_t *ds, /* {{{ */
880               const value_list_t *vl,
881               user_data_t *ud)
882 {
883         int status = 0;
884         int statuses[vl->values_len];
885         struct sensu_host       *host = ud->data;
886         gauge_t *rates = NULL;
887         char *msg;
888
889         pthread_mutex_lock(&host->lock);
890         memset(statuses, 0, vl->values_len * sizeof(*statuses));
891
892         if (host->store_rates) {
893                 rates = uc_get_rate(ds, vl);
894                 if (rates == NULL) {
895                         ERROR("write_sensu plugin: uc_get_rate failed.");
896                         pthread_mutex_unlock(&host->lock);
897                         return -1;
898                 }
899         }
900         for (size_t i = 0; i < vl->values_len; i++) {
901                 msg = sensu_value_to_json(host, ds, vl, (int) i, rates, statuses[i]);
902                 if (msg == NULL) {
903                         sfree(rates);
904                         pthread_mutex_unlock(&host->lock);
905                         return -1;
906                 }
907                 status = sensu_send(host, msg);
908                 free(msg);
909                 if (status != 0) {
910                         ERROR("write_sensu plugin: sensu_send failed with status %i", status);
911                         pthread_mutex_unlock(&host->lock);
912                         sfree(rates);
913                         return status;
914                 }
915         }
916         sfree(rates);
917         pthread_mutex_unlock(&host->lock);
918         return status;
919 } /* }}} int sensu_write */
920
921 static int sensu_notification(const notification_t *n, user_data_t *ud) /* {{{ */
922 {
923         int     status;
924         struct sensu_host *host = ud->data;
925         char *msg;
926
927         pthread_mutex_lock(&host->lock);
928
929         msg = sensu_notification_to_json(host, n);
930         if (msg == NULL) {
931                 pthread_mutex_unlock(&host->lock);
932                 return -1;
933         }
934
935         status = sensu_send(host, msg);
936         free(msg);
937         if (status != 0)
938                 ERROR("write_sensu plugin: sensu_send failed with status %i", status);
939         pthread_mutex_unlock(&host->lock);
940
941         return status;
942 } /* }}} int sensu_notification */
943
944 static void sensu_free(void *p) /* {{{ */
945 {
946         struct sensu_host *host = p;
947
948         if (host == NULL)
949                 return;
950
951         pthread_mutex_lock(&host->lock);
952
953         host->reference_count--;
954         if (host->reference_count > 0) {
955                 pthread_mutex_unlock(&host->lock);
956                 return;
957         }
958
959         sensu_close_socket(host);
960         if (host->res != NULL) {
961                 freeaddrinfo(host->res);
962                 host->res = NULL;
963         }
964         sfree(host->service);
965         sfree(host->event_service_prefix);
966         sfree(host->name);
967         sfree(host->node);
968         sfree(host->separator);
969         free_str_list(&(host->metric_handlers));
970         free_str_list(&(host->notification_handlers));
971         pthread_mutex_destroy(&host->lock);
972         sfree(host);
973 } /* }}} void sensu_free */
974
975
976 static int sensu_config_node(oconfig_item_t *ci) /* {{{ */
977 {
978         struct sensu_host       *host = NULL;
979         int                                     status = 0;
980         oconfig_item_t          *child;
981         char                            callback_name[DATA_MAX_NAME_LEN];
982         user_data_t                     ud;
983
984         if ((host = calloc(1, sizeof(*host))) == NULL) {
985                 ERROR("write_sensu plugin: calloc failed.");
986                 return ENOMEM;
987         }
988         pthread_mutex_init(&host->lock, NULL);
989         host->reference_count = 1;
990         host->node = NULL;
991         host->service = NULL;
992         host->notifications = 0;
993         host->metrics = 0;
994         host->store_rates = 1;
995         host->always_append_ds = 0;
996         host->metric_handlers.nb_strs = 0;
997         host->metric_handlers.strs = NULL;
998         host->notification_handlers.nb_strs = 0;
999         host->notification_handlers.strs = NULL;
1000         host->separator = strdup("/");
1001         if (host->separator == NULL) {
1002                 ERROR("write_sensu plugin: Unable to alloc memory");
1003                 sensu_free(host);
1004                 return -1;
1005         }
1006
1007         status = cf_util_get_string(ci, &host->name);
1008         if (status != 0) {
1009                 WARNING("write_sensu plugin: Required host name is missing.");
1010                 sensu_free(host);
1011                 return -1;
1012         }
1013
1014         for (int i = 0; i < ci->children_num; i++) {
1015                 child = &ci->children[i];
1016                 status = 0;
1017
1018                 if (strcasecmp("Host", child->key) == 0) {
1019                         status = cf_util_get_string(child, &host->node);
1020                         if (status != 0)
1021                                 break;
1022                 } else if (strcasecmp("Notifications", child->key) == 0) {
1023                         status = cf_util_get_boolean(child, &host->notifications);
1024                         if (status != 0)
1025                                 break;
1026                 } else if (strcasecmp("Metrics", child->key) == 0) {
1027                                         status = cf_util_get_boolean(child, &host->metrics);
1028                                         if (status != 0)
1029                                                 break;
1030                 } else if (strcasecmp("EventServicePrefix", child->key) == 0) {
1031                         status = cf_util_get_string(child, &host->event_service_prefix);
1032                         if (status != 0)
1033                                 break;
1034                 } else if (strcasecmp("Separator", child->key) == 0) {
1035                                 status = cf_util_get_string(child, &host->separator);
1036                                 if (status != 0)
1037                                         break;
1038                 } else if (strcasecmp("MetricHandler", child->key) == 0) {
1039                         char *temp_str = NULL;
1040                         status = cf_util_get_string(child, &temp_str);
1041                         if (status != 0)
1042                                 break;
1043                         status = add_str_to_list(&(host->metric_handlers), temp_str);
1044                         free(temp_str);
1045                         if (status != 0)
1046                                 break;
1047                 } else if (strcasecmp("NotificationHandler", child->key) == 0) {
1048                         char *temp_str = NULL;
1049                         status = cf_util_get_string(child, &temp_str);
1050                         if (status != 0)
1051                                 break;
1052                         status = add_str_to_list(&(host->notification_handlers), temp_str);
1053                         free(temp_str);
1054                         if (status != 0)
1055                                 break;
1056                 } else if (strcasecmp("Port", child->key) == 0) {
1057                         status = cf_util_get_service(child, &host->service);
1058                         if (status != 0) {
1059                                 ERROR("write_sensu plugin: Invalid argument "
1060                                                 "configured for the \"Port\" "
1061                                                 "option.");
1062                                 break;
1063                         }
1064                 } else if (strcasecmp("StoreRates", child->key) == 0) {
1065                         status = cf_util_get_boolean(child, &host->store_rates);
1066                         if (status != 0)
1067                                 break;
1068                 } else if (strcasecmp("AlwaysAppendDS", child->key) == 0) {
1069                         status = cf_util_get_boolean(child,
1070                                         &host->always_append_ds);
1071                         if (status != 0)
1072                                 break;
1073                 } else {
1074                         WARNING("write_sensu plugin: ignoring unknown config "
1075                                 "option: \"%s\"", child->key);
1076                 }
1077         }
1078         if (status != 0) {
1079                 sensu_free(host);
1080                 return status;
1081         }
1082
1083         if (host->metrics && (host->metric_handlers.nb_strs == 0)) {
1084                         sensu_free(host);
1085                         WARNING("write_sensu plugin: metrics enabled but no MetricHandler defined. Giving up.");
1086                         return -1;
1087                 }
1088
1089         if (host->notifications && (host->notification_handlers.nb_strs == 0)) {
1090                 sensu_free(host);
1091                 WARNING("write_sensu plugin: notifications enabled but no NotificationHandler defined. Giving up.");
1092                 return -1;
1093         }
1094
1095         if ((host->notification_handlers.nb_strs > 0) && (host->notifications == 0)) {
1096                 WARNING("write_sensu plugin: NotificationHandler given so forcing notifications to be enabled");
1097                 host->notifications = 1;
1098         }
1099
1100         if ((host->metric_handlers.nb_strs > 0) && (host->metrics == 0)) {
1101                 WARNING("write_sensu plugin: MetricHandler given so forcing metrics to be enabled");
1102                 host->metrics = 1;
1103         }
1104
1105         if (!(host->notifications || host->metrics)) {
1106                 WARNING("write_sensu plugin: neither metrics nor notifications enabled. Giving up.");
1107                 sensu_free(host);
1108                 return -1;
1109         }
1110
1111         ssnprintf(callback_name, sizeof(callback_name), "write_sensu/%s", host->name);
1112         ud.data = host;
1113         ud.free_func = sensu_free;
1114
1115         pthread_mutex_lock(&host->lock);
1116
1117         if (host->metrics) {
1118                 status = plugin_register_write(callback_name, sensu_write, &ud);
1119                 if (status != 0)
1120                         WARNING("write_sensu plugin: plugin_register_write (\"%s\") "
1121                                         "failed with status %i.",
1122                                         callback_name, status);
1123                 else /* success */
1124                         host->reference_count++;
1125         }
1126
1127         if (host->notifications) {
1128                 status = plugin_register_notification(callback_name, sensu_notification, &ud);
1129                 if (status != 0)
1130                         WARNING("write_sensu plugin: plugin_register_notification (\"%s\") "
1131                                         "failed with status %i.",
1132                                         callback_name, status);
1133                 else
1134                         host->reference_count++;
1135         }
1136
1137         if (host->reference_count <= 1) {
1138                 /* Both callbacks failed => free memory.
1139                  * We need to unlock here, because sensu_free() will lock.
1140                  * This is not a race condition, because we're the only one
1141                  * holding a reference. */
1142                 pthread_mutex_unlock(&host->lock);
1143                 sensu_free(host);
1144                 return -1;
1145         }
1146
1147         host->reference_count--;
1148         pthread_mutex_unlock(&host->lock);
1149
1150         return status;
1151 } /* }}} int sensu_config_node */
1152
1153 static int sensu_config(oconfig_item_t *ci) /* {{{ */
1154 {
1155         oconfig_item_t  *child;
1156         int              status;
1157         struct str_list sensu_tags_arr;
1158
1159         sensu_tags_arr.nb_strs = 0;
1160         sensu_tags_arr.strs = NULL;
1161
1162         for (int i = 0; i < ci->children_num; i++)  {
1163                 child = &ci->children[i];
1164
1165                 if (strcasecmp("Node", child->key) == 0) {
1166                         sensu_config_node(child);
1167                 } else if (strcasecmp(child->key, "attribute") == 0) {
1168                         if (child->values_num != 2) {
1169                                 WARNING("sensu attributes need both a key and a value.");
1170                                 continue;
1171                         }
1172                         if (child->values[0].type != OCONFIG_TYPE_STRING ||
1173                                         child->values[1].type != OCONFIG_TYPE_STRING) {
1174                                 WARNING("sensu attribute needs string arguments.");
1175                                 continue;
1176                         }
1177
1178                         strarray_add(&sensu_attrs, &sensu_attrs_num, child->values[0].value.string);
1179                         strarray_add(&sensu_attrs, &sensu_attrs_num, child->values[1].value.string);
1180
1181                         DEBUG("write_sensu plugin: New attribute: %s => %s",
1182                                         child->values[0].value.string,
1183                                         child->values[1].value.string);
1184                 } else if (strcasecmp(child->key, "tag") == 0) {
1185                         char *tmp = NULL;
1186                         status = cf_util_get_string(child, &tmp);
1187                         if (status != 0)
1188                                 continue;
1189
1190                         status = add_str_to_list(&sensu_tags_arr, tmp);
1191                         sfree(tmp);
1192                         if (status != 0)
1193                                 continue;
1194                         DEBUG("write_sensu plugin: Got tag: %s", tmp);
1195                 } else {
1196                         WARNING("write_sensu plugin: Ignoring unknown "
1197                                  "configuration option \"%s\" at top level.",
1198                                  child->key);
1199                 }
1200         }
1201         if (sensu_tags_arr.nb_strs > 0) {
1202                 sfree (sensu_tags);
1203                 sensu_tags = build_json_str_list("tags", &sensu_tags_arr);
1204                 free_str_list(&sensu_tags_arr);
1205                 if (sensu_tags == NULL) {
1206                         ERROR("write_sensu plugin: Unable to alloc memory");
1207                         return -1;
1208                 }
1209         }
1210         return 0;
1211 } /* }}} int sensu_config */
1212
1213 void module_register(void)
1214 {
1215         plugin_register_complex_config("write_sensu", sensu_config);
1216 }
1217
1218 /* vim: set sw=8 sts=8 ts=8 noet : */