Merge remote-tracking branch 'origin/pr/925'
[collectd.git] / src / write_sensu.c
1 /**
2  * collectd - src/write_sensu.c
3  * Copyright (C) 2015 Fabrice A. Marie
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Fabrice A. Marie <fabrice at kibinlabs.com>
25  */
26
27 #include "collectd.h"
28 #include "plugin.h"
29 #include "common.h"
30 #include "configfile.h"
31 #include "utils_cache.h"
32 #include <sys/socket.h>
33 #include <arpa/inet.h>
34 #include <errno.h>
35 #include <netdb.h>
36 #include <inttypes.h>
37 #include <pthread.h>
38 #include <stddef.h>
39
40 #include <stdlib.h>
41 #ifndef HAVE_ASPRINTF
42 /*
43  * Uses asprintf() portable implementation from
44  * https://github.com/littlstar/asprintf.c/blob/master/
45  * copyright (c) 2014 joseph werle <joseph.werle@gmail.com> under MIT license.
46  */
47 #include <stdio.h>
48 #include <stdarg.h>
49
50 int vasprintf(char **str, const char *fmt, va_list args) {
51         int size = 0;
52         va_list tmpa;
53         // copy
54         va_copy(tmpa, args);
55         // apply variadic arguments to
56         // sprintf with format to get size
57         size = vsnprintf(NULL, size, fmt, tmpa);
58         // toss args
59         va_end(tmpa);
60         // return -1 to be compliant if
61         // size is less than 0
62         if (size < 0) { return -1; }
63         // alloc with size plus 1 for `\0'
64         *str = (char *) malloc(size + 1);
65         // return -1 to be compliant
66         // if pointer is `NULL'
67         if (NULL == *str) { return -1; }
68         // format string with original
69         // variadic arguments and set new size
70         size = vsprintf(*str, fmt, args);
71         return size;
72 }
73
74 int asprintf(char **str, const char *fmt, ...) {
75         int size = 0;
76         va_list args;
77         // init variadic argumens
78         va_start(args, fmt);
79         // format and get size
80         size = vasprintf(str, fmt, args);
81         // toss args
82         va_end(args);
83         return size;
84 }
85
86 #endif
87
88 #define SENSU_HOST              "localhost"
89 #define SENSU_PORT              "3030"
90
91 struct str_list {
92         int nb_strs;
93         char **strs;
94 };
95
96 struct sensu_host {
97         char                    *name;
98         char                    *event_service_prefix;
99         struct str_list metric_handlers;
100         struct str_list notification_handlers;
101 #define F_READY      0x01
102         uint8_t                  flags;
103         pthread_mutex_t  lock;
104         _Bool            notifications;
105         _Bool            metrics;
106         _Bool                    store_rates;
107         _Bool                    always_append_ds;
108         char                    *separator;
109         char                    *node;
110         char                    *service;
111         int              s;
112         struct addrinfo *res;
113         int                          reference_count;
114 };
115
116 static char     *sensu_tags;
117 static char     **sensu_attrs;
118 static size_t sensu_attrs_num;
119 static const char *alloc_err ="write_sensu plugin: Unable to alloc memory";
120
121 static int add_str_to_list(struct str_list *strs,
122                 const char *str_to_add) /* {{{ */
123 {
124         char **old_strs_ptr = strs->strs;
125         char *newstr = strdup(str_to_add);
126         if (newstr == NULL) {
127                 ERROR(alloc_err);
128                 return -1;
129         }
130         strs->strs = realloc(strs->strs, sizeof(char *) *(strs->nb_strs + 1));
131         if (strs->strs == NULL) {
132                 strs->strs = old_strs_ptr;
133                 free(newstr);
134                 ERROR(alloc_err);
135                 return -1;
136         }
137         strs->strs[strs->nb_strs] = newstr;
138         strs->nb_strs++;
139         return 0;
140 }
141 /* }}} int add_str_to_list */
142
143 static void free_str_list(struct str_list *strs) /* {{{ */
144 {
145         int i;
146         for (i=0; i<strs->nb_strs; i++)
147                 free(strs->strs[i]);
148         free(strs->strs);
149 }
150 /* }}} void free_str_list */
151
152 static int sensu_connect(struct sensu_host *host) /* {{{ */
153 {
154         int                      e;
155         struct addrinfo         *ai, hints;
156         char const              *node;
157         char const              *service;
158
159         // Resolve the target if we haven't done already
160         if (!(host->flags & F_READY)) {
161                 memset(&hints, 0, sizeof(hints));
162                 memset(&service, 0, sizeof(service));
163                 host->res = NULL;
164                 hints.ai_family = AF_INET;
165                 hints.ai_socktype = SOCK_STREAM;
166 #ifdef AI_ADDRCONFIG
167                 hints.ai_flags |= AI_ADDRCONFIG;
168 #endif
169
170                 node = (host->node != NULL) ? host->node : SENSU_HOST;
171                 service = (host->service != NULL) ? host->service : SENSU_PORT;
172
173                 if ((e = getaddrinfo(node, service, &hints, &(host->res))) != 0) {
174                         ERROR("write_sensu plugin: Unable to resolve host \"%s\": %s",
175                                         node, gai_strerror(e));
176                         return -1;
177                 }
178                 DEBUG("write_sensu plugin: successfully resolved host/port: %s/%s",
179                                 node, service);
180                 host->flags |= F_READY;
181         }
182
183         struct linger so_linger;
184         host->s = -1;
185         for (ai = host->res; ai != NULL; ai = ai->ai_next) {
186                 // create the socket
187                 if ((host->s = socket(ai->ai_family,
188                                       ai->ai_socktype,
189                                       ai->ai_protocol)) == -1) {
190                         continue;
191                 }
192
193                 // Set very low close() lingering
194                 so_linger.l_onoff = 1;
195                 so_linger.l_linger = 3;
196                 if (setsockopt(host->s, SOL_SOCKET, SO_LINGER, &so_linger, sizeof so_linger) != 0)
197                         WARNING("write_sensu plugin: failed to set socket close() lingering");
198
199                 // connect the socket
200                 if (connect(host->s, ai->ai_addr, ai->ai_addrlen) != 0) {
201                         close(host->s);
202                         host->s = -1;
203                         continue;
204                 }
205                 DEBUG("write_sensu plugin: connected");
206                 break;
207         }
208
209         if (host->s < 0) {
210                 WARNING("write_sensu plugin: Unable to connect to sensu client");
211                 return -1;
212         }
213         return 0;
214 } /* }}} int sensu_connect */
215
216 static void sensu_close_socket(struct sensu_host *host) /* {{{ */
217 {
218         if (host->s != -1)
219                 close(host->s);
220         host->s = -1;
221
222 } /* }}} void sensu_close_socket */
223
224 static char *build_json_str_list(const char *tag, struct str_list const *list) /* {{{ */
225 {
226         int res;
227         char *ret_str;
228         char *temp_str;
229         int i;
230         if (list->nb_strs == 0) {
231                 ret_str = malloc(sizeof(char));
232                 if (ret_str == NULL) {
233                         ERROR(alloc_err);
234                         return NULL;
235                 }
236                 ret_str[0] = '\0';
237         }
238
239         res = asprintf(&temp_str, "\"%s\": [\"%s\"", tag, list->strs[0]);
240         if (res == -1) {
241                 ERROR(alloc_err);
242                 return NULL;
243         }
244         for (i=1; i<list->nb_strs; i++) {
245                 res = asprintf(&ret_str, "%s, \"%s\"", temp_str, list->strs[i]);
246                 free(temp_str);
247                 if (res == -1) {
248                         ERROR(alloc_err);
249                         return NULL;
250                 }
251                 temp_str = ret_str;
252         }
253         res = asprintf(&ret_str, "%s]", temp_str);
254         free(temp_str);
255         if (res == -1) {
256                 ERROR(alloc_err);
257                 return NULL;
258         }
259
260         return ret_str;
261 } /* }}} char *build_json_str_list*/
262
263 int sensu_format_name2(char *ret, int ret_len,
264                 const char *hostname,
265                 const char *plugin, const char *plugin_instance,
266                 const char *type, const char *type_instance,
267                 const char *separator)
268 {
269         char *buffer;
270         size_t buffer_size;
271
272         buffer = ret;
273         buffer_size = (size_t) ret_len;
274
275 #define APPEND(str) do {          \
276         size_t l = strlen (str);        \
277         if (l >= buffer_size)           \
278                 return (ENOBUFS);             \
279         memcpy (buffer, (str), l);      \
280         buffer += l; buffer_size -= l;  \
281 } while (0)
282
283         assert (plugin != NULL);
284         assert (type != NULL);
285
286         APPEND (hostname);
287         APPEND (separator);
288         APPEND (plugin);
289         if ((plugin_instance != NULL) && (plugin_instance[0] != 0))
290         {
291                 APPEND ("-");
292                 APPEND (plugin_instance);
293         }
294         APPEND (separator);
295         APPEND (type);
296         if ((type_instance != NULL) && (type_instance[0] != 0))
297         {
298                 APPEND ("-");
299                 APPEND (type_instance);
300         }
301         assert (buffer_size > 0);
302         buffer[0] = 0;
303
304 #undef APPEND
305         return (0);
306 } /* int sensu_format_name2 */
307
308 static void in_place_replace_sensu_name_reserved(char *orig_name) /* {{{ */
309 {
310         int i;
311         int len=strlen(orig_name);
312         for (i=0; i<len; i++) {
313                 // some plugins like ipmi generate special characters in metric name
314                 switch(orig_name[i]) {
315                         case '(': orig_name[i] = '_'; break;
316                         case ')': orig_name[i] = '_'; break;
317                         case ' ': orig_name[i] = '_'; break;
318                         case '"': orig_name[i] = '_'; break;
319                         case '\'': orig_name[i] = '_'; break;
320                         case '+': orig_name[i] = '_'; break;
321                 }
322         }
323 } /* }}} char *replace_sensu_name_reserved */
324
325 static char *sensu_value_to_json(struct sensu_host const *host, /* {{{ */
326                 data_set_t const *ds,
327                 value_list_t const *vl, size_t index,
328                 gauge_t const *rates,
329                 int status)
330 {
331         char name_buffer[5 * DATA_MAX_NAME_LEN];
332         char service_buffer[6 * DATA_MAX_NAME_LEN];
333         int i;
334         char *ret_str;
335         char *temp_str;
336         char *value_str;
337         int res;
338         // First part of the JSON string
339         const char *part1 = "{\"name\": \"collectd\", \"type\": \"metric\"";
340
341         char *handlers_str = build_json_str_list("handlers", &(host->metric_handlers));
342         if (handlers_str == NULL) {
343                 ERROR(alloc_err);
344                 return NULL;
345         }
346
347         // incorporate the handlers
348         if (strlen(handlers_str) == 0) {
349                 free(handlers_str);
350                 ret_str = strdup(part1);
351                 if (ret_str == NULL) {
352                         ERROR(alloc_err);
353                         return NULL;
354                 }
355         }
356         else {
357                 res = asprintf(&ret_str, "%s, %s", part1, handlers_str);
358                 free(handlers_str);
359                 if (res == -1) {
360                         ERROR(alloc_err);
361                         return NULL;
362                 }
363         }
364
365         // incorporate the plugin name information
366         res = asprintf(&temp_str, "%s, \"collectd_plugin\": \"%s\"", ret_str, vl->plugin);
367         free(ret_str);
368         if (res == -1) {
369                 ERROR(alloc_err);
370                 return NULL;
371         }
372         ret_str = temp_str;
373
374         // incorporate the plugin type
375         res = asprintf(&temp_str, "%s, \"collectd_plugin_type\": \"%s\"", ret_str, vl->type);
376         free(ret_str);
377         if (res == -1) {
378                 ERROR(alloc_err);
379                 return NULL;
380         }
381         ret_str = temp_str;
382
383         // incorporate the plugin instance if any
384         if (vl->plugin_instance[0] != 0) {
385                 res = asprintf(&temp_str, "%s, \"collectd_plugin_instance\": \"%s\"", ret_str, vl->plugin_instance);
386                 free(ret_str);
387                 if (res == -1) {
388                         ERROR(alloc_err);
389                         return NULL;
390                 }
391                 ret_str = temp_str;
392         }
393
394         // incorporate the plugin type instance if any
395         if (vl->type_instance[0] != 0) {
396                 res = asprintf(&temp_str, "%s, \"collectd_plugin_type_instance\": \"%s\"", ret_str, vl->type_instance);
397                 free(ret_str);
398                 if (res == -1) {
399                         ERROR(alloc_err);
400                         return NULL;
401                 }
402                 ret_str = temp_str;
403         }
404
405         // incorporate the data source type
406         if ((ds->ds[index].type != DS_TYPE_GAUGE) && (rates != NULL)) {
407                 char ds_type[DATA_MAX_NAME_LEN];
408                 ssnprintf (ds_type, sizeof (ds_type), "%s:rate", DS_TYPE_TO_STRING(ds->ds[index].type));
409                 res = asprintf(&temp_str, "%s, \"collectd_data_source_type\": \"%s\"", ret_str, ds_type);
410                 free(ret_str);
411                 if (res == -1) {
412                         ERROR(alloc_err);
413                         return NULL;
414                 }
415                 ret_str = temp_str;
416         } else {
417                 res = asprintf(&temp_str, "%s, \"collectd_data_source_type\": \"%s\"", ret_str, DS_TYPE_TO_STRING(ds->ds[index].type));
418                 free(ret_str);
419                 if (res == -1) {
420                         ERROR(alloc_err);
421                         return NULL;
422                 }
423                 ret_str = temp_str;
424         }
425
426         // incorporate the data source name
427         res = asprintf(&temp_str, "%s, \"collectd_data_source_name\": \"%s\"", ret_str, ds->ds[index].name);
428         free(ret_str);
429         if (res == -1) {
430                 ERROR(alloc_err);
431                 return NULL;
432         }
433         ret_str = temp_str;
434
435         // incorporate the data source index
436         {
437                 char ds_index[DATA_MAX_NAME_LEN];
438                 ssnprintf (ds_index, sizeof (ds_index), "%zu", index);
439                 res = asprintf(&temp_str, "%s, \"collectd_data_source_index\": %s", ret_str, ds_index);
440                 free(ret_str);
441                 if (res == -1) {
442                         ERROR(alloc_err);
443                         return NULL;
444                 }
445                 ret_str = temp_str;
446         }
447
448         // add key value attributes from config if any
449         for (i = 0; i < sensu_attrs_num; i += 2) {
450                 res = asprintf(&temp_str, "%s, \"%s\": \"%s\"", ret_str, sensu_attrs[i], sensu_attrs[i+1]);
451                 free(ret_str);
452                 if (res == -1) {
453                         ERROR(alloc_err);
454                         return NULL;
455                 }
456                 ret_str = temp_str;
457         }
458
459         // incorporate sensu tags from config if any
460         if (strlen(sensu_tags) != 0) {
461                 res = asprintf(&temp_str, "%s, %s", ret_str, sensu_tags);
462                 free(ret_str);
463                 if (res == -1) {
464                         ERROR(alloc_err);
465                         return NULL;
466                 }
467                 ret_str = temp_str;
468         }
469
470         // calculate the value and set to a string
471         if (ds->ds[index].type == DS_TYPE_GAUGE) {
472                 double tmp_v = (double) vl->values[index].gauge;
473                 res = asprintf(&value_str, "%.8f", tmp_v, sensu_tags);
474                 if (res == -1) {
475                         free(ret_str);
476                         ERROR(alloc_err);
477                         return NULL;
478                 }
479         } else if (rates != NULL) {
480                 double tmp_v = (double) rates[index];
481                 res = asprintf(&value_str, "%.8f", tmp_v, sensu_tags);
482                 if (res == -1) {
483                         free(ret_str);
484                         ERROR(alloc_err);
485                         return NULL;
486                 }
487         } else {
488                 int64_t tmp_v;
489                 if (ds->ds[index].type == DS_TYPE_DERIVE)
490                         tmp_v = (int64_t) vl->values[index].derive;
491                 else if (ds->ds[index].type == DS_TYPE_ABSOLUTE)
492                         tmp_v = (int64_t) vl->values[index].absolute;
493                 else
494                         tmp_v = (int64_t) vl->values[index].counter;
495                 res = asprintf(&value_str, "%lld", tmp_v, sensu_tags);
496                 if (res == -1) {
497                         free(ret_str);
498                         ERROR(alloc_err);
499                         return NULL;
500                 }
501         }
502
503         // Generate the full service name
504         sensu_format_name2(name_buffer, sizeof(name_buffer),
505                 vl->host, vl->plugin, vl->plugin_instance,
506                 vl->type, vl->type_instance, host->separator);
507         if (host->always_append_ds || (ds->ds_num > 1)) {
508                 if (host->event_service_prefix == NULL)
509                         ssnprintf(service_buffer, sizeof(service_buffer), "%s.%s",
510                                         name_buffer, ds->ds[index].name);
511                 else
512                         ssnprintf(service_buffer, sizeof(service_buffer), "%s%s.%s",
513                                         host->event_service_prefix, name_buffer, ds->ds[index].name);
514         } else {
515                 if (host->event_service_prefix == NULL)
516                         sstrncpy(service_buffer, name_buffer, sizeof(service_buffer));
517                 else
518                         ssnprintf(service_buffer, sizeof(service_buffer), "%s%s",
519                                         host->event_service_prefix, name_buffer);
520         }
521
522         // Replace collectd sensor name reserved characters so that time series DB is happy
523         in_place_replace_sensu_name_reserved(service_buffer);
524
525         // finalize the buffer by setting the output and closing curly bracket
526         res = asprintf(&temp_str, "%s, \"output\": \"%s %s %ld\"}\n", ret_str, service_buffer, value_str, CDTIME_T_TO_TIME_T(vl->time));
527         free(ret_str);
528         free(value_str);
529         if (res == -1) {
530                 ERROR(alloc_err);
531                 return NULL;
532         }
533         ret_str = temp_str;
534
535         DEBUG("write_sensu plugin: Successfully created json for metric: "
536                         "host = \"%s\", service = \"%s\"",
537                         vl->host, service_buffer);
538         return ret_str;
539 } /* }}} char *sensu_value_to_json */
540
541 /*
542  * Uses replace_str2() implementation from
543  * http://creativeandcritical.net/str-replace-c/
544  * copyright (c) Laird Shaw, under public domain.
545  */
546 char *replace_str(const char *str, const char *old, /* {{{ */
547                 const char *new)
548 {
549         char *ret, *r;
550         const char *p, *q;
551         size_t oldlen = strlen(old);
552         size_t count = strlen(new);
553         size_t retlen = count;
554         size_t newlen = count;
555         int samesize = (oldlen == newlen);
556
557         if (!samesize) {
558                 for (count = 0, p = str; (q = strstr(p, old)) != NULL; p = q + oldlen)
559                         count++;
560                 /* This is undefined if p - str > PTRDIFF_MAX */
561                 retlen = p - str + strlen(p) + count * (newlen - oldlen);
562         } else
563                 retlen = strlen(str);
564
565         ret = malloc(retlen + 1);
566         if (ret == NULL)
567                 return NULL;
568         // added to original: not optimized, but keeps valgrind happy.
569         memset(ret, 0, retlen + 1);
570
571         r = ret;
572         p = str;
573         while (1) {
574                 /* If the old and new strings are different lengths - in other
575                  * words we have already iterated through with strstr above,
576                  * and thus we know how many times we need to call it - then we
577                  * can avoid the final (potentially lengthy) call to strstr,
578                  * which we already know is going to return NULL, by
579                  * decrementing and checking count.
580                  */
581                 if (!samesize && !count--)
582                         break;
583                 /* Otherwise i.e. when the old and new strings are the same
584                  * length, and we don't know how many times to call strstr,
585                  * we must check for a NULL return here (we check it in any
586                  * event, to avoid further conditions, and because there's
587                  * no harm done with the check even when the old and new
588                  * strings are different lengths).
589                  */
590                 if ((q = strstr(p, old)) == NULL)
591                         break;
592                 /* This is undefined if q - p > PTRDIFF_MAX */
593                 ptrdiff_t l = q - p;
594                 memcpy(r, p, l);
595                 r += l;
596                 memcpy(r, new, newlen);
597                 r += newlen;
598                 p = q + oldlen;
599         }
600         strncpy(r, p, strlen(p));
601
602         return ret;
603 } /* }}} char *replace_str */
604
605 static char *replace_json_reserved(const char *message) /* {{{ */
606 {
607         char *msg = replace_str(message, "\\", "\\\\");
608         if (msg == NULL) {
609                 ERROR(alloc_err);
610                 return NULL;
611         }
612         char *tmp = replace_str(msg, "\"", "\\\"");
613         free(msg);
614         if (tmp == NULL) {
615                 ERROR(alloc_err);
616                 return NULL;
617         }
618         msg = replace_str(tmp, "\n", "\\\n");
619         free(tmp);
620         if (msg == NULL) {
621                 ERROR(alloc_err);
622                 return NULL;
623         }
624         return msg;
625 } /* }}} char *replace_json_reserved */
626
627 static char *sensu_notification_to_json(struct sensu_host *host, /* {{{ */
628                 notification_t const *n)
629 {
630         char service_buffer[6 * DATA_MAX_NAME_LEN];
631         char const *severity;
632         notification_meta_t *meta;
633         char *ret_str;
634         char *temp_str;
635         int status;
636         int i;
637         int res;
638         // add the severity/status
639         switch (n->severity) {
640                 case NOTIF_OKAY:
641                         severity = "OK";
642                         status = 0;
643                         break;
644                 case NOTIF_WARNING:
645                         severity = "WARNING";
646                         status = 1;
647                         break;
648                 case NOTIF_FAILURE:
649                         severity = "CRITICAL";
650                         status = 2;
651                         break;
652                 default:
653                         severity = "UNKNOWN";
654                         status = 3;
655         }
656         res = asprintf(&temp_str, "{\"status\": %d", status);
657         if (res == -1) {
658                 ERROR(alloc_err);
659                 return NULL;
660         }
661         ret_str = temp_str;
662
663         // incorporate the timestamp
664         res = asprintf(&temp_str, "%s, \"timestamp\": %ld", ret_str, CDTIME_T_TO_TIME_T(n->time));
665         free(ret_str);
666         if (res == -1) {
667                 ERROR(alloc_err);
668                 return NULL;
669         }
670         ret_str = temp_str;
671
672         char *handlers_str = build_json_str_list("handlers", &(host->notification_handlers));
673         if (handlers_str == NULL) {
674                 ERROR(alloc_err);
675                 return NULL;
676         }
677         // incorporate the handlers
678         if (strlen(handlers_str) != 0) {
679                 res = asprintf(&temp_str, "%s, %s", ret_str, handlers_str);
680                 free(ret_str);
681                 free(handlers_str);
682                 if (res == -1) {
683                         ERROR(alloc_err);
684                         return NULL;
685                 }
686                 ret_str = temp_str;
687         } else {
688                 free(handlers_str);
689         }
690
691         // incorporate the plugin name information if any
692         if (n->plugin[0] != 0) {
693                 res = asprintf(&temp_str, "%s, \"collectd_plugin\": \"%s\"", ret_str, n->plugin);
694                 free(ret_str);
695                 if (res == -1) {
696                         ERROR(alloc_err);
697                         return NULL;
698                 }
699                 ret_str = temp_str;
700         }
701
702         // incorporate the plugin type if any
703         if (n->type[0] != 0) {
704                 res = asprintf(&temp_str, "%s, \"collectd_plugin_type\": \"%s\"", ret_str, n->type);
705                 free(ret_str);
706                 if (res == -1) {
707                         ERROR(alloc_err);
708                         return NULL;
709                 }
710                 ret_str = temp_str;
711         }
712
713         // incorporate the plugin instance if any
714         if (n->plugin_instance[0] != 0) {
715                 res = asprintf(&temp_str, "%s, \"collectd_plugin_instance\": \"%s\"", ret_str, n->plugin_instance);
716                 free(ret_str);
717                 if (res == -1) {
718                         ERROR(alloc_err);
719                         return NULL;
720                 }
721                 ret_str = temp_str;
722         }
723
724         // incorporate the plugin type instance if any
725         if (n->type_instance[0] != 0) {
726                 res = asprintf(&temp_str, "%s, \"collectd_plugin_type_instance\": \"%s\"", ret_str, n->type_instance);
727                 free(ret_str);
728                 if (res == -1) {
729                         ERROR(alloc_err);
730                         return NULL;
731                 }
732                 ret_str = temp_str;
733         }
734
735         // add key value attributes from config if any
736         for (i = 0; i < sensu_attrs_num; i += 2) {
737                 res = asprintf(&temp_str, "%s, \"%s\": \"%s\"", ret_str, sensu_attrs[i], sensu_attrs[i+1]);
738                 free(ret_str);
739                 if (res == -1) {
740                         ERROR(alloc_err);
741                         return NULL;
742                 }
743                 ret_str = temp_str;
744         }
745
746         // incorporate sensu tags from config if any
747         if (strlen(sensu_tags) != 0) {
748                 res = asprintf(&temp_str, "%s, %s", ret_str, sensu_tags);
749                 free(ret_str);
750                 if (res == -1) {
751                         ERROR(alloc_err);
752                         return NULL;
753                 }
754                 ret_str = temp_str;
755         }
756
757         // incorporate the service name
758         sensu_format_name2(service_buffer, sizeof(service_buffer),
759                                 /* host */ "", n->plugin, n->plugin_instance,
760                                 n->type, n->type_instance, host->separator);
761         // replace sensu event name chars that are considered illegal
762         in_place_replace_sensu_name_reserved(service_buffer);
763         res = asprintf(&temp_str, "%s, \"name\": \"%s\"", ret_str, &service_buffer[1]);
764         free(ret_str);
765         if (res == -1) {
766                 ERROR(alloc_err);
767                 return NULL;
768         }
769         ret_str = temp_str;
770
771         // incorporate the check output
772         if (n->message[0] != 0) {
773                 char *msg = replace_json_reserved(n->message);
774                 if (msg == NULL) {
775                         ERROR(alloc_err);
776                         return NULL;
777                 }
778                 res = asprintf(&temp_str, "%s, \"output\": \"%s - %s\"", ret_str, severity, msg);
779                 free(ret_str);
780                 free(msg);
781                 if (res == -1) {
782                         ERROR(alloc_err);
783                         return NULL;
784                 }
785                 ret_str = temp_str;
786         }
787
788         // Pull in values from threshold and add extra attributes
789         for (meta = n->meta; meta != NULL; meta = meta->next) {
790                 if (strcasecmp("CurrentValue", meta->name) == 0 && meta->type == NM_TYPE_DOUBLE) {
791                         res = asprintf(&temp_str, "%s, \"current_value\": \"%.8f\"", ret_str, meta->nm_value.nm_double);
792                         free(ret_str);
793                         if (res == -1) {
794                                 ERROR(alloc_err);
795                                 return NULL;
796                         }
797                         ret_str = temp_str;
798                 }
799                 if (meta->type == NM_TYPE_STRING) {
800                         res = asprintf(&temp_str, "%s, \"%s\": \"%s\"", ret_str, meta->name, meta->nm_value.nm_string);
801                         free(ret_str);
802                         if (res == -1) {
803                                 ERROR(alloc_err);
804                                 return NULL;
805                         }
806                         ret_str = temp_str;
807                 }
808         }
809
810         // close the curly bracket
811         res = asprintf(&temp_str, "%s}\n", ret_str);
812         free(ret_str);
813         if (res == -1) {
814                 ERROR(alloc_err);
815                 return NULL;
816         }
817         ret_str = temp_str;
818
819         DEBUG("write_sensu plugin: Successfully created JSON for notification: "
820                                 "host = \"%s\", service = \"%s\", state = \"%s\"",
821                                 n->host, service_buffer, severity);
822         return ret_str;
823 } /* }}} char *sensu_notification_to_json */
824
825 static int sensu_send_msg(struct sensu_host *host, const char *msg) /* {{{ */
826 {
827         int status = 0;
828         size_t  buffer_len;
829
830         status = sensu_connect(host);
831         if (status != 0)
832                 return status;
833
834         buffer_len = strlen(msg);
835
836         status = (int) swrite(host->s, msg, buffer_len);
837         sensu_close_socket(host);
838
839         if (status != 0) {
840                 char errbuf[1024];
841                 ERROR("write_sensu plugin: Sending to Sensu at %s:%s failed: %s",
842                                 (host->node != NULL) ? host->node : SENSU_HOST,
843                                 (host->service != NULL) ? host->service : SENSU_PORT,
844                                 sstrerror(errno, errbuf, sizeof(errbuf)));
845                 return -1;
846         }
847
848         return 0;
849 } /* }}} int sensu_send_msg */
850
851
852 static int sensu_send(struct sensu_host *host, char const *msg) /* {{{ */
853 {
854         int status = 0;
855
856         status = sensu_send_msg(host, msg);
857         if (status != 0) {
858                 host->flags &= ~F_READY;
859                 if (host->res != NULL) {
860                         freeaddrinfo(host->res);
861                         host->res = NULL;
862                 }
863                 return status;
864         }
865
866         return 0;
867 } /* }}} int sensu_send */
868
869
870 static int sensu_write(const data_set_t *ds, /* {{{ */
871               const value_list_t *vl,
872               user_data_t *ud)
873 {
874         int status = 0;
875         int statuses[vl->values_len];
876         struct sensu_host       *host = ud->data;
877         gauge_t *rates = NULL;
878         int i;
879         char *msg;
880
881         pthread_mutex_lock(&host->lock);
882         memset(statuses, 0, vl->values_len * sizeof(*statuses));
883
884         if (host->store_rates) {
885                 rates = uc_get_rate(ds, vl);
886                 if (rates == NULL) {
887                         ERROR("write_sensu plugin: uc_get_rate failed.");
888                         pthread_mutex_unlock(&host->lock);
889                         return -1;
890                 }
891         }
892         for (i = 0; i < (size_t) vl->values_len; i++) {
893                 msg = sensu_value_to_json(host, ds, vl, (int) i, rates, statuses[i]);
894                 if (msg == NULL) {
895                         sfree(rates);
896                         pthread_mutex_unlock(&host->lock);
897                         return -1;
898                 }
899                 status = sensu_send(host, msg);
900                 free(msg);
901                 if (status != 0) {
902                         ERROR("write_sensu plugin: sensu_send failed with status %i", status);
903                         pthread_mutex_unlock(&host->lock);
904                         sfree(rates);
905                         return status;
906                 }
907         }
908         sfree(rates);
909         pthread_mutex_unlock(&host->lock);
910         return status;
911 } /* }}} int sensu_write */
912
913 static int sensu_notification(const notification_t *n, user_data_t *ud) /* {{{ */
914 {
915         int     status;
916         struct sensu_host *host = ud->data;
917         char *msg;
918
919         pthread_mutex_lock(&host->lock);
920
921         msg = sensu_notification_to_json(host, n);
922         if (msg == NULL) {
923                 pthread_mutex_unlock(&host->lock);
924                 return -1;
925         }
926
927         status = sensu_send(host, msg);
928         free(msg);
929         if (status != 0)
930                 ERROR("write_sensu plugin: sensu_send failed with status %i", status);
931         pthread_mutex_unlock(&host->lock);
932
933         return status;
934 } /* }}} int sensu_notification */
935
936 static void sensu_free(void *p) /* {{{ */
937 {
938         struct sensu_host *host = p;
939
940         if (host == NULL)
941                 return;
942
943         pthread_mutex_lock(&host->lock);
944
945         host->reference_count--;
946         if (host->reference_count > 0) {
947                 pthread_mutex_unlock(&host->lock);
948                 return;
949         }
950
951         sensu_close_socket(host);
952         if (host->res != NULL) {
953                 freeaddrinfo(host->res);
954                 host->res = NULL;
955         }
956         sfree(host->service);
957         sfree(host->event_service_prefix);
958         sfree(host->name);
959         sfree(host->node);
960         sfree(host->separator);
961         free_str_list(&(host->metric_handlers));
962         free_str_list(&(host->notification_handlers));
963         pthread_mutex_destroy(&host->lock);
964         sfree(host);
965 } /* }}} void sensu_free */
966
967
968 static int sensu_config_node(oconfig_item_t *ci) /* {{{ */
969 {
970         struct sensu_host       *host = NULL;
971         int                                     status = 0;
972         int                                     i;
973         oconfig_item_t          *child;
974         char                            callback_name[DATA_MAX_NAME_LEN];
975         user_data_t                     ud;
976
977         if ((host = calloc(1, sizeof(*host))) == NULL) {
978                 ERROR("write_sensu plugin: calloc failed.");
979                 return ENOMEM;
980         }
981         pthread_mutex_init(&host->lock, NULL);
982         host->reference_count = 1;
983         host->node = NULL;
984         host->service = NULL;
985         host->notifications = 0;
986         host->metrics = 0;
987         host->store_rates = 1;
988         host->always_append_ds = 0;
989         host->metric_handlers.nb_strs = 0;
990         host->metric_handlers.strs = NULL;
991         host->notification_handlers.nb_strs = 0;
992         host->notification_handlers.strs = NULL;
993         host->separator = strdup("/");
994         if (host->separator == NULL) {
995                 ERROR(alloc_err);
996                 sensu_free(host);
997                 return -1;
998         }
999
1000         status = cf_util_get_string(ci, &host->name);
1001         if (status != 0) {
1002                 WARNING("write_sensu plugin: Required host name is missing.");
1003                 sensu_free(host);
1004                 return -1;
1005         }
1006
1007         for (i = 0; i < ci->children_num; i++) {
1008                 child = &ci->children[i];
1009                 status = 0;
1010
1011                 if (strcasecmp("Host", child->key) == 0) {
1012                         status = cf_util_get_string(child, &host->node);
1013                         if (status != 0)
1014                                 break;
1015                 } else if (strcasecmp("Notifications", child->key) == 0) {
1016                         status = cf_util_get_boolean(child, &host->notifications);
1017                         if (status != 0)
1018                                 break;
1019                 } else if (strcasecmp("Metrics", child->key) == 0) {
1020                                         status = cf_util_get_boolean(child, &host->metrics);
1021                                         if (status != 0)
1022                                                 break;
1023                 } else if (strcasecmp("EventServicePrefix", child->key) == 0) {
1024                         status = cf_util_get_string(child, &host->event_service_prefix);
1025                         if (status != 0)
1026                                 break;
1027                 } else if (strcasecmp("Separator", child->key) == 0) {
1028                                 status = cf_util_get_string(child, &host->separator);
1029                                 if (status != 0)
1030                                         break;
1031                 } else if (strcasecmp("MetricHandler", child->key) == 0) {
1032                         char *temp_str = NULL;
1033                         status = cf_util_get_string(child, &temp_str);
1034                         if (status != 0)
1035                                 break;
1036                         status = add_str_to_list(&(host->metric_handlers), temp_str);
1037                         free(temp_str);
1038                         if (status != 0)
1039                                 break;
1040                 } else if (strcasecmp("NotificationHandler", child->key) == 0) {
1041                         char *temp_str = NULL;
1042                         status = cf_util_get_string(child, &temp_str);
1043                         if (status != 0)
1044                                 break;
1045                         status = add_str_to_list(&(host->notification_handlers), temp_str);
1046                         free(temp_str);
1047                         if (status != 0)
1048                                 break;
1049                 } else if (strcasecmp("Port", child->key) == 0) {
1050                         status = cf_util_get_service(child, &host->service);
1051                         if (status != 0) {
1052                                 ERROR("write_sensu plugin: Invalid argument "
1053                                                 "configured for the \"Port\" "
1054                                                 "option.");
1055                                 break;
1056                         }
1057                 } else if (strcasecmp("StoreRates", child->key) == 0) {
1058                         status = cf_util_get_boolean(child, &host->store_rates);
1059                         if (status != 0)
1060                                 break;
1061                 } else if (strcasecmp("AlwaysAppendDS", child->key) == 0) {
1062                         status = cf_util_get_boolean(child,
1063                                         &host->always_append_ds);
1064                         if (status != 0)
1065                                 break;
1066                 } else {
1067                         WARNING("write_sensu plugin: ignoring unknown config "
1068                                 "option: \"%s\"", child->key);
1069                 }
1070         }
1071         if (status != 0) {
1072                 sensu_free(host);
1073                 return status;
1074         }
1075
1076         if (host->metrics && (host->metric_handlers.nb_strs == 0)) {
1077                         sensu_free(host);
1078                         WARNING("write_sensu plugin: metrics enabled but no MetricHandler defined. Giving up.");
1079                         return -1;
1080                 }
1081
1082         if (host->notifications && (host->notification_handlers.nb_strs == 0)) {
1083                 sensu_free(host);
1084                 WARNING("write_sensu plugin: notifications enabled but no NotificationHandler defined. Giving up.");
1085                 return -1;
1086         }
1087
1088         if ((host->notification_handlers.nb_strs > 0) && (host->notifications == 0)) {
1089                 WARNING("write_sensu plugin: NotificationHandler given so forcing notifications to be enabled");
1090                 host->notifications = 1;
1091         }
1092
1093         if ((host->metric_handlers.nb_strs > 0) && (host->metrics == 0)) {
1094                 WARNING("write_sensu plugin: MetricHandler given so forcing metrics to be enabled");
1095                 host->metrics = 1;
1096         }
1097
1098         if (!(host->notifications || host->metrics)) {
1099                 WARNING("write_sensu plugin: neither metrics nor notifications enabled. Giving up.");
1100                 sensu_free(host);
1101                 return -1;
1102         }
1103
1104         ssnprintf(callback_name, sizeof(callback_name), "write_sensu/%s", host->name);
1105         ud.data = host;
1106         ud.free_func = sensu_free;
1107
1108         pthread_mutex_lock(&host->lock);
1109
1110         if (host->metrics) {
1111                 status = plugin_register_write(callback_name, sensu_write, &ud);
1112                 if (status != 0)
1113                         WARNING("write_sensu plugin: plugin_register_write (\"%s\") "
1114                                         "failed with status %i.",
1115                                         callback_name, status);
1116                 else /* success */
1117                         host->reference_count++;
1118         }
1119
1120         if (host->notifications) {
1121                 status = plugin_register_notification(callback_name, sensu_notification, &ud);
1122                 if (status != 0)
1123                         WARNING("write_sensu plugin: plugin_register_notification (\"%s\") "
1124                                         "failed with status %i.",
1125                                         callback_name, status);
1126                 else
1127                         host->reference_count++;
1128         }
1129
1130         if (host->reference_count <= 1) {
1131                 /* Both callbacks failed => free memory.
1132                  * We need to unlock here, because sensu_free() will lock.
1133                  * This is not a race condition, because we're the only one
1134                  * holding a reference. */
1135                 pthread_mutex_unlock(&host->lock);
1136                 sensu_free(host);
1137                 return -1;
1138         }
1139
1140         host->reference_count--;
1141         pthread_mutex_unlock(&host->lock);
1142
1143         return status;
1144 } /* }}} int sensu_config_node */
1145
1146 static int sensu_config(oconfig_item_t *ci) /* {{{ */
1147 {
1148         int              i;
1149         oconfig_item_t  *child;
1150         int              status;
1151         struct str_list sensu_tags_arr;
1152
1153         sensu_tags_arr.nb_strs = 0;
1154         sensu_tags_arr.strs = NULL;
1155         sensu_tags = malloc(sizeof(char));
1156         if (sensu_tags == NULL) {
1157                 ERROR(alloc_err);
1158                 return -1;
1159         }
1160         sensu_tags[0] = '\0';
1161
1162         for (i = 0; i < ci->children_num; i++)  {
1163                 child = &ci->children[i];
1164
1165                 if (strcasecmp("Node", child->key) == 0) {
1166                         sensu_config_node(child);
1167                 } else if (strcasecmp(child->key, "attribute") == 0) {
1168                         char *key = NULL;
1169                         char *val = NULL;
1170
1171                         if (child->values_num != 2) {
1172                                 WARNING("sensu attributes need both a key and a value.");
1173                                 free(sensu_tags);
1174                                 return -1;
1175                         }
1176                         if (child->values[0].type != OCONFIG_TYPE_STRING ||
1177                             child->values[1].type != OCONFIG_TYPE_STRING) {
1178                                 WARNING("sensu attribute needs string arguments.");
1179                                 free(sensu_tags);
1180                                 return -1;
1181                         }
1182                         if ((key = strdup(child->values[0].value.string)) == NULL) {
1183                                 ERROR(alloc_err);
1184                                 free(sensu_tags);
1185                                 return -1;
1186                         }
1187                         if ((val = strdup(child->values[1].value.string)) == NULL) {
1188                                 free(sensu_tags);
1189                                 free(key);
1190                                 ERROR(alloc_err);
1191                                 return -1;
1192                         }
1193                         strarray_add(&sensu_attrs, &sensu_attrs_num, key);
1194                         strarray_add(&sensu_attrs, &sensu_attrs_num, val);
1195                         DEBUG("write_sensu: got attr: %s => %s", key, val);
1196                         sfree(key);
1197                         sfree(val);
1198                 } else if (strcasecmp(child->key, "tag") == 0) {
1199                         char *tmp = NULL;
1200                         status = cf_util_get_string(child, &tmp);
1201                         if (status != 0)
1202                                 continue;
1203
1204                         status = add_str_to_list(&sensu_tags_arr, tmp);
1205                         sfree(tmp);
1206                         if (status != 0)
1207                                 continue;
1208                         DEBUG("write_sensu plugin: Got tag: %s", tmp);
1209                 } else {
1210                         WARNING("write_sensu plugin: Ignoring unknown "
1211                                  "configuration option \"%s\" at top level.",
1212                                  child->key);
1213                 }
1214         }
1215         if (sensu_tags_arr.nb_strs > 0) {
1216                 free(sensu_tags);
1217                 sensu_tags = build_json_str_list("tags", &sensu_tags_arr);
1218                 free_str_list(&sensu_tags_arr);
1219                 if (sensu_tags == NULL) {
1220                         ERROR(alloc_err);
1221                         return -1;
1222                 }
1223         }
1224         return 0;
1225 } /* }}} int sensu_config */
1226
1227 void module_register(void)
1228 {
1229         plugin_register_complex_config("write_sensu", sensu_config);
1230 }
1231
1232 /* vim: set sw=8 sts=8 ts=8 noet : */