add support for notifications as well
[collectd.git] / src / riemann.c
1 /*
2  * collectd - src/riemann.c
3  *
4  * Copyright (C) 2012  Pierre-Yves Ritschard <pyr@spootnik.org>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF MIND, USE, DATA OR PROFITS, WHETHER
15  * IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING
16  * OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  *
18  */
19
20 #include "collectd.h"
21 #include "plugin.h"
22 #include "common.h"
23 #include "configfile.h"
24 #include "riemann.pb-c.h"
25
26 #include <sys/socket.h>
27 #include <arpa/inet.h>
28 #include <errno.h>
29 #include <netdb.h>
30 #include <inttypes.h>
31 #include <pthread.h>
32
33 #define RIEMANN_DELAY           1
34 #define RIEMANN_PORT            5555
35 #define RIEMANN_MAX_TAGS        37
36 #define RIEMANN_EXTRA_TAGS      32
37
38 struct riemann_host {
39         struct riemann_host     *next;
40 #define F_CONNECT                0x01
41         u_int8_t                 flags;
42         pthread_mutex_t          lock;
43         int                      delay;
44         char                     name[DATA_MAX_NAME_LEN];
45         int                      port;
46         int                      s;
47 };
48
49 struct riemann_event {
50         Event            ev;
51         char             service[DATA_MAX_NAME_LEN];
52         const char      *tags[RIEMANN_MAX_TAGS];
53 };
54
55 char    *riemann_tags[RIEMANN_EXTRA_TAGS];
56 int      riemann_tagcount;
57
58 int     riemann_send(struct riemann_host *, Msg *);
59 int     riemann_notification(const notification_t *, user_data_t *);
60 int     riemann_write(const data_set_t *, const value_list_t *, user_data_t *);
61 int     riemann_connect(struct riemann_host *);
62 void    riemann_free(void *);
63 int     riemann_config_host(oconfig_item_t *);
64 int     riemann_config(oconfig_item_t *);
65 void    module_register(void);
66
67 int
68 riemann_send(struct riemann_host *host, Msg *msg)
69 {
70         u_char                  *buf;
71         size_t                   len;
72
73         len = msg__get_packed_size(msg);
74         DEBUG("riemann_write: packed size computed: %ld", len);
75         if ((buf = calloc(1, len)) == NULL) {
76                 WARNING("riemann_write: failing to alloc buf!");
77                 return ENOMEM;
78         }
79
80         msg__pack(msg, buf);
81
82         if (write(host->s, buf, len) != len) {
83                 WARNING("riemann_write: could not send out full packet");
84                 free(buf);
85                 return -1;
86         }
87         free(buf);
88         return 0;
89 }
90
91 int
92 riemann_notification(const notification_t *n, user_data_t *ud)
93 {
94         int                      i;
95         struct riemann_host     *host = ud->data;
96         Msg                      msg = MSG__INIT;
97         Event                    ev = EVENT__INIT;
98         Event                   *evtab[1];
99         const char              *tags[RIEMANN_MAX_TAGS];
100         char                     service[DATA_MAX_NAME_LEN];
101         notification_meta_t     *meta;
102         struct { 
103                 int              code;
104                 char            *name;
105         }                        severities[] = {
106                 { NOTIF_OKAY,           "ok" },
107                 { NOTIF_WARNING,        "warning" },
108                 { NOTIF_FAILURE,        "critical" },
109                 { -1,                   "unknown" }
110         };
111
112         evtab[0] = &ev;
113         msg.n_events = 1;
114         msg.events = evtab;
115         
116         ev.host = host->name;
117         ev.time = CDTIME_T_TO_TIME_T(n->time);
118         ev.has_time = 1;
119
120         for (i = 0;
121              severities[i].code > 0 && severities[i].code != n->severity;
122              i++)
123                 ;
124         ev.state = severities[i].name;
125
126         ev.n_tags = 2;
127         ev.tags = (char **)tags;
128         tags[0] = n->plugin;
129         tags[1] = "notification";
130         
131         for (i = 0; i < riemann_tagcount; i++)
132                 tags[ev.n_tags++] = riemann_tags[i];
133
134         ssnprintf(service, sizeof(service),
135                   "%s-%s-%s-%s", n->plugin, n->plugin_instance,
136                   n->type, n->type_instance);
137         ev.service = service;
138         ev.description = (char *)n->message;
139         
140         /*
141          * Pull in values from threshold
142          */
143         for (meta = n->meta; 
144              meta != NULL && strcasecmp(meta->name, "CurrentValue") != 0;
145              meta = meta->next)
146                 ;
147
148         if (meta != NULL) {
149                 ev.has_metric_d = 1;
150                 ev.metric_d = meta->nm_value.nm_double;
151         }
152         
153         return riemann_send(host, &msg);
154 }
155
156 int
157 riemann_write(const data_set_t *ds,
158               const value_list_t *vl,
159               user_data_t *ud)
160 {
161         int                      i, j;
162         int                      status;
163         struct riemann_host     *host = ud->data;
164         Msg                      msg = MSG__INIT;
165         Event                   *ev;
166         struct riemann_event    *event_tab, *event;
167
168         if ((status = riemann_connect(host)) != 0)
169                 return status;
170
171         msg.n_events = vl->values_len;
172
173         /*
174          * Get rid of allocations up front
175          */
176         if ((msg.events = calloc(msg.n_events, sizeof(*msg.events))) == NULL ||
177             (event_tab = calloc(msg.n_events, sizeof(*event_tab))) == NULL) {
178                 free(msg.events);
179                 free(event_tab);
180                 return ENOMEM;
181         }
182
183         /*
184          * Now produce valid protobuf structures
185          */
186         for (i = 0; i < vl->values_len; i++) {
187                 event = &event_tab[i];
188                 event__init(&event->ev);
189
190                 ev = &event->ev;
191                 event__init(ev);
192                 ev->host = host->name;
193                 ev->has_time = 1;
194                 ev->time = CDTIME_T_TO_TIME_T(vl->time);
195                 ev->has_ttl = 1;
196                 ev->ttl = CDTIME_T_TO_TIME_T(vl->interval) + host->delay;
197                 ev->n_tags = 3;
198                 ev->tags = (char **)event->tags;
199                 event->tags[0] = DS_TYPE_TO_STRING(ds->ds[i].type);
200                 event->tags[1] = vl->plugin;
201                 event->tags[2] = ds->ds[i].name;
202                 if (vl->plugin_instance && strlen(vl->plugin_instance)) {
203                         event->tags[ev->n_tags++] = vl->plugin_instance;
204                 }
205                 if (vl->type && strlen(vl->type)) {
206                         event->tags[ev->n_tags++] = vl->type;
207                 }
208                 if (vl->type_instance && strlen(vl->type_instance)) {
209                         event->tags[ev->n_tags++] = vl->type_instance;
210                 }
211
212                 /* add user defined extra tags */
213                 for (j = 0; j < riemann_tagcount; j++)
214                         event->tags[ev->n_tags++] = riemann_tags[j];
215
216                 switch (ds->ds[i].type) {
217                 case DS_TYPE_COUNTER:
218                         ev->has_metric_sint64 = 1;
219                         ev->metric_sint64 = vl->values[i].counter;
220                         break;
221                 case DS_TYPE_GAUGE:
222                         ev->has_metric_d = 1;
223                         ev->metric_d = vl->values[i].gauge;
224                         break;
225                 case DS_TYPE_DERIVE:
226                         ev->has_metric_sint64 = 1;
227                         ev->metric_sint64 = vl->values[i].derive;
228                         break;
229                 case DS_TYPE_ABSOLUTE:
230                         ev->has_metric_sint64 = 1;
231                         ev->metric_sint64 = vl->values[i].absolute;
232                         break;
233                 default:
234                         WARNING("riemann_write: unknown metric type: %d",
235                                 ds->ds[i].type);
236                         break;
237                 }
238                 ssnprintf(event->service, sizeof(event->service),
239                           "%s-%s-%s-%s-%s", vl->plugin, vl->plugin_instance,
240                           vl->type, vl->type_instance, ds->ds[i].name);
241                 ev->service = event->service;
242                 DEBUG("riemann_write: %s ready to send", ev->service);
243                 msg.events[i] = ev;
244         }
245         
246         status = riemann_send(host, &msg);
247         sfree(msg.events);
248         return status;
249 }
250
251 int
252 riemann_connect(struct riemann_host *host)
253 {
254         int                      e;
255         struct addrinfo         *ai, *res, hints;
256         struct sockaddr_in      *sin4;
257         struct sockaddr_in6     *sin6;
258
259         if (host->flags & F_CONNECT)
260                 return 0;
261                 
262         memset(&hints, 0, sizeof(hints));
263         hints.ai_family = PF_UNSPEC;
264         hints.ai_socktype = SOCK_DGRAM;
265         
266         if ((e = getaddrinfo(host->name, NULL, &hints, &res)) != 0) {
267                 WARNING("could not resolve host \"%s\": %s",
268                         host->name, gai_strerror(e));
269                 return -1;
270         }
271
272         for (ai = res; ai != NULL; ai = ai->ai_next) {
273                 pthread_mutex_lock(&host->lock);
274                 /*
275                  * check if another thread did not already succesfully connect
276                  */
277                 if (host->flags & F_CONNECT) {
278                         freeaddrinfo(res);
279                         return 0;
280                 }
281                 
282                 if ((host->s = socket(ai->ai_family, SOCK_DGRAM, 0)) == -1) {
283                         pthread_mutex_unlock(&host->lock);
284                         WARNING("riemann_connect: could not open socket");
285                         freeaddrinfo(res);
286                         return -1;
287                 }
288
289                 switch (ai->ai_family) {
290                 case AF_INET:
291                         sin4 = (struct sockaddr_in *)ai->ai_addr;
292                         sin4->sin_port = ntohs(host->port);
293                         break;
294                 case AF_INET6:
295                         sin6 = (struct sockaddr_in6 *)ai->ai_addr;
296                         sin6->sin6_port = ntohs(host->port);
297                         break;
298                 default:
299                         WARNING("riemann_connect: unsupported address family");
300                         close(host->s);
301                         pthread_mutex_unlock(&host->lock);
302                         freeaddrinfo(res);
303                         return -1;
304                 }
305
306                 if (connect(host->s, ai->ai_addr, ai->ai_addrlen) != 0) {
307                         close(host->s);
308                         host->flags |= ~F_CONNECT;
309                         pthread_mutex_unlock(&host->lock);
310                         freeaddrinfo(res);
311                         return -1;
312                 }
313                 host->flags |= F_CONNECT;
314                 DEBUG("got a succesful connection for: %s", host->name);
315                 pthread_mutex_unlock(&host->lock);
316                 break;
317         }
318         
319         freeaddrinfo(res);
320         if (ai == NULL) {
321                 WARNING("riemann_connect: no suitable hosts found");
322                 return -1;
323         }
324
325         return 0;
326 }
327
328 void
329 riemann_free(void *p)
330 {
331         struct riemann_host     *host = p;
332
333         if (host->flags & F_CONNECT)
334                 close(host->s);
335         sfree(host);
336 }
337
338 int
339 riemann_config_host(oconfig_item_t *ci)
340 {
341         struct riemann_host     *host = NULL;
342         int                      status = 0;
343         int                      i;
344         oconfig_item_t          *child;
345         char                     w_cb_name[DATA_MAX_NAME_LEN];
346         char                     n_cb_name[DATA_MAX_NAME_LEN];
347         user_data_t              ud;
348
349         if (ci->values_num != 1 ||
350             ci->values[0].type != OCONFIG_TYPE_STRING) {
351                 WARNING("riemann hosts need one string argument");
352                 return -1;
353         }
354
355         if ((host = calloc(1, sizeof (*host))) == NULL) {
356                 WARNING("riemann host allocation failed");
357                 return ENOMEM;
358         }
359
360         if (cf_util_get_string_buffer(ci, host->name,
361                                       sizeof(host->name)) != 0) {
362                 WARNING("riemann host name too long");
363                 sfree(host);
364                 return -1;
365         }
366
367         host->port = RIEMANN_PORT;
368         host->delay = RIEMANN_DELAY;
369         for (i = 0; i < ci->children_num; i++) {
370                 /*
371                  * The code here could be simplified but makes room
372                  * for easy adding of new options later on.
373                  */
374                 child = &ci->children[i];
375                 status = 0;
376
377                 if (strcasecmp(child->key, "port") == 0) {
378                         if ((status = cf_util_get_port_number(child)) < 0) {
379                                 WARNING("invalid port number");
380                                 break;
381                         }
382                         host->port = status;
383                         status = 0;
384                 } else if (strcasecmp(child->key, "delay") == 0) {
385                         if ((status = cf_util_get_int(ci, &host->delay)) != 0)
386                                 break;
387                 } else {
388                         WARNING("riemann plugin: ignoring unknown config "
389                                 "option: \"%s\"", child->key);
390                 }
391         }
392         if (status != 0) {
393                 sfree(host);
394                 return status;
395         }
396
397         pthread_mutex_init(&host->lock, NULL);
398         ssnprintf(w_cb_name, sizeof(w_cb_name), "write-riemann/%s:%d",
399                   host->name, host->port);
400         ssnprintf(n_cb_name, sizeof(n_cb_name), "notification-riemann/%s:%d",
401                   host->name, host->port);
402         DEBUG("riemann w_cb_name: %s", w_cb_name);
403         DEBUG("riemann n_cb_name: %s", n_cb_name);
404         ud.data = host;
405         ud.free_func = riemann_free;
406         
407         if ((status = plugin_register_write(w_cb_name, riemann_write, &ud)) != 0)
408                 riemann_free(host);
409
410         if ((status = plugin_register_notification(n_cb_name,
411                                                    riemann_notification,
412                                                    &ud)) != 0) {
413                 plugin_unregister_write(w_cb_name);
414                 riemann_free(host);
415         }
416         return status;
417 }
418
419 int
420 riemann_config(oconfig_item_t *ci)
421 {
422         int              i;
423         char            *newtag;
424         oconfig_item_t  *child;
425
426         for (i = 0; i < ci->children_num; i++)  {
427                 child = &ci->children[i];
428
429                 if (strcasecmp(child->key, "host") == 0) {
430                         riemann_config_host(child);
431                 } else if (strcasecmp(child->key, "tag") == 0) {
432                         if (riemann_tagcount >= RIEMANN_EXTRA_TAGS) {
433                                 WARNING("riemann plugin: too many tags");
434                                 return -1;
435                         }
436                         newtag = NULL;
437                         cf_util_get_string(child, &newtag);
438                         if (newtag == NULL)
439                                 return -1;
440                         riemann_tags[riemann_tagcount++] = newtag;
441                         DEBUG("riemann_config: got tag: %s", newtag);
442  
443                 } else {
444                         WARNING ("riemann plugin: Ignoring unknown "
445                                  "configuration option \"%s\" at top level.",
446                                  child->key);
447                 }
448         }
449         return (0);
450 }
451
452 void
453 module_register(void)
454 {
455         DEBUG("riemann: module_register");
456         
457         plugin_register_complex_config ("riemann", riemann_config);
458 }