take into account PR comments from @octo
[collectd.git] / src / riemann.c
1 /*
2  * collectd - src/riemann.c
3  *
4  * Copyright (C) 2012  Pierre-Yves Ritschard <pyr@spootnik.org>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF MIND, USE, DATA OR PROFITS, WHETHER
15  * IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING
16  * OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  *
18  */
19
20 #include "collectd.h"
21 #include "plugin.h"
22 #include "common.h"
23 #include "configfile.h"
24 #include "riemann.pb-c.h"
25
26 #include <sys/socket.h>
27 #include <arpa/inet.h>
28 #include <errno.h>
29 #include <netdb.h>
30 #include <inttypes.h>
31 #include <pthread.h>
32
33 #define RIEMANN_DELAY           1
34 #define RIEMANN_PORT            5555
35 #define RIEMANN_MAX_TAGS        37
36 #define RIEMANN_EXTRA_TAGS      32
37
38 struct riemann_host {
39 #define F_CONNECT                0x01
40         u_int8_t                 flags;
41         pthread_mutex_t          lock;
42         int                      delay;
43         char                     name[DATA_MAX_NAME_LEN];
44         int                      port;
45         int                      s;
46 };
47
48 struct riemann_event {
49         Event            ev;
50         char             service[DATA_MAX_NAME_LEN];
51         const char      *tags[RIEMANN_MAX_TAGS];
52 };
53
54 char    *riemann_tags[RIEMANN_EXTRA_TAGS];
55 int      riemann_tagcount;
56
57 int     riemann_send(struct riemann_host *, Msg *);
58 int     riemann_notification(const notification_t *, user_data_t *);
59 int     riemann_write(const data_set_t *, const value_list_t *, user_data_t *);
60 int     riemann_connect(struct riemann_host *);
61 void    riemann_free(void *);
62 int     riemann_config_host(oconfig_item_t *);
63 int     riemann_config(oconfig_item_t *);
64 void    module_register(void);
65
66 int
67 riemann_send(struct riemann_host *host, Msg *msg)
68 {
69         u_char                  *buf;
70         size_t                   len;
71
72         len = msg__get_packed_size(msg);
73         DEBUG("riemann_write: packed size computed: %ld", len);
74         if ((buf = calloc(1, len)) == NULL) {
75                 WARNING("riemann_write: failing to alloc buf!");
76                 return ENOMEM;
77         }
78
79         msg__pack(msg, buf);
80
81         if (write(host->s, buf, len) != len) {
82                 host->flags &= ~F_CONNECT;
83                 WARNING("riemann_write: could not send out full packet");
84                 free(buf);
85                 return -1;
86         }
87         free(buf);
88         return 0;
89 }
90
91 int
92 riemann_notification(const notification_t *n, user_data_t *ud)
93 {
94         int                      i;
95         struct riemann_host     *host = ud->data;
96         Msg                      msg = MSG__INIT;
97         Event                    ev = EVENT__INIT;
98         Event                   *evtab[1];
99         const char              *tags[RIEMANN_MAX_TAGS];
100         char                     service[DATA_MAX_NAME_LEN];
101         notification_meta_t     *meta;
102         struct { 
103                 int              code;
104                 char            *name;
105         }                        severities[] = {
106                 { NOTIF_OKAY,           "ok" },
107                 { NOTIF_WARNING,        "warning" },
108                 { NOTIF_FAILURE,        "critical" },
109                 { -1,                   "unknown" }
110         };
111
112         evtab[0] = &ev;
113         msg.n_events = 1;
114         msg.events = evtab;
115         
116         ev.host = host->name;
117         ev.time = CDTIME_T_TO_TIME_T(n->time);
118         ev.has_time = 1;
119
120         for (i = 0;
121              severities[i].code > 0 && severities[i].code != n->severity;
122              i++)
123                 ;
124         ev.state = severities[i].name;
125
126         ev.n_tags = 2;
127         ev.tags = (char **)tags;
128         tags[0] = n->plugin;
129         tags[1] = "notification";
130         
131         for (i = 0; i < riemann_tagcount; i++)
132                 tags[ev.n_tags++] = riemann_tags[i];
133
134         ssnprintf(service, sizeof(service),
135                   "%s-%s-%s-%s", n->plugin, n->plugin_instance,
136                   n->type, n->type_instance);
137         ev.service = service;
138         ev.description = (char *)n->message;
139         
140         /*
141          * Pull in values from threshold
142          */
143         for (meta = n->meta; 
144              meta != NULL && strcasecmp(meta->name, "CurrentValue") != 0;
145              meta = meta->next)
146                 ;
147
148         if (meta != NULL) {
149                 ev.has_metric_d = 1;
150                 ev.metric_d = meta->nm_value.nm_double;
151         }
152         
153         return riemann_send(host, &msg);
154 }
155
156 int
157 riemann_write(const data_set_t *ds,
158               const value_list_t *vl,
159               user_data_t *ud)
160 {
161         int                      i, j;
162         int                      status;
163         struct riemann_host     *host = ud->data;
164         Msg                      msg = MSG__INIT;
165         Event                   *ev;
166         struct riemann_event    *event_tab, *event;
167
168         if ((status = riemann_connect(host)) != 0)
169                 return status;
170
171         msg.n_events = vl->values_len;
172
173         /*
174          * Get rid of allocations up front
175          */
176         if ((msg.events = calloc(msg.n_events, sizeof(*msg.events))) == NULL ||
177             (event_tab = calloc(msg.n_events, sizeof(*event_tab))) == NULL) {
178                 free(msg.events);
179                 free(event_tab);
180                 return ENOMEM;
181         }
182
183         /*
184          * Now produce valid protobuf structures
185          */
186         for (i = 0; i < vl->values_len; i++) {
187                 event = &event_tab[i];
188                 event__init(&event->ev);
189
190                 ev = &event->ev;
191                 event__init(ev);
192                 ev->host = host->name;
193                 ev->has_time = 1;
194                 ev->time = CDTIME_T_TO_TIME_T(vl->time);
195                 ev->has_ttl = 1;
196                 ev->ttl = CDTIME_T_TO_TIME_T(vl->interval) + host->delay;
197                 ev->n_tags = 3;
198                 ev->tags = (char **)event->tags;
199                 event->tags[0] = DS_TYPE_TO_STRING(ds->ds[i].type);
200                 event->tags[1] = vl->plugin;
201                 event->tags[2] = ds->ds[i].name;
202                 if (vl->plugin_instance && strlen(vl->plugin_instance)) {
203                         event->tags[ev->n_tags++] = vl->plugin_instance;
204                 }
205                 if (vl->type && strlen(vl->type)) {
206                         event->tags[ev->n_tags++] = vl->type;
207                 }
208                 if (vl->type_instance && strlen(vl->type_instance)) {
209                         event->tags[ev->n_tags++] = vl->type_instance;
210                 }
211
212                 /* add user defined extra tags */
213                 for (j = 0; j < riemann_tagcount; j++)
214                         event->tags[ev->n_tags++] = riemann_tags[j];
215
216                 switch (ds->ds[i].type) {
217                 case DS_TYPE_COUNTER:
218                         ev->has_metric_sint64 = 1;
219                         ev->metric_sint64 = vl->values[i].counter;
220                         break;
221                 case DS_TYPE_GAUGE:
222                         ev->has_metric_d = 1;
223                         ev->metric_d = vl->values[i].gauge;
224                         break;
225                 case DS_TYPE_DERIVE:
226                         ev->has_metric_sint64 = 1;
227                         ev->metric_sint64 = vl->values[i].derive;
228                         break;
229                 case DS_TYPE_ABSOLUTE:
230                         ev->has_metric_sint64 = 1;
231                         ev->metric_sint64 = vl->values[i].absolute;
232                         break;
233                 default:
234                         WARNING("riemann_write: unknown metric type: %d",
235                                 ds->ds[i].type);
236                         break;
237                 }
238                 ssnprintf(event->service, sizeof(event->service),
239                           "%s-%s-%s-%s-%s", vl->plugin, vl->plugin_instance,
240                           vl->type, vl->type_instance, ds->ds[i].name);
241                 ev->service = event->service;
242                 DEBUG("riemann_write: %s ready to send", ev->service);
243                 msg.events[i] = ev;
244         }
245         
246         status = riemann_send(host, &msg);
247         sfree(msg.events);
248         return status;
249 }
250
251 int
252 riemann_connect(struct riemann_host *host)
253 {
254         int                      e;
255         struct addrinfo         *ai, *res, hints;
256         char                     service[32];
257
258         if (host->flags & F_CONNECT)
259                 return 0;
260                 
261         memset(&hints, 0, sizeof(hints));
262         memset(&service, 0, sizeof(service));
263         hints.ai_family = PF_UNSPEC;
264         hints.ai_socktype = SOCK_DGRAM;
265
266         ssnprintf(service, sizeof(service), "%d", host->port);
267         
268         if ((e = getaddrinfo(host->name, service, &hints, &res)) != 0) {
269                 WARNING("could not resolve host \"%s\": %s",
270                         host->name, gai_strerror(e));
271                 return -1;
272         }
273
274         for (ai = res; ai != NULL; ai = ai->ai_next) {
275                 pthread_mutex_lock(&host->lock);
276                 /*
277                  * check if another thread did not already succesfully connect
278                  */
279                 if (host->flags & F_CONNECT) {
280                         freeaddrinfo(res);
281                         return 0;
282                 }
283                 
284                 if ((host->s = socket(ai->ai_family,
285                                       ai->ai_socktype,
286                                       ai->ai_protocol)) == -1) {
287                         pthread_mutex_unlock(&host->lock);
288                         WARNING("riemann_connect: could not open socket");
289                         freeaddrinfo(res);
290                         return -1;
291                 }
292
293                 if (connect(host->s, ai->ai_addr, ai->ai_addrlen) != 0) {
294                         close(host->s);
295                         host->flags |= ~F_CONNECT;
296                         pthread_mutex_unlock(&host->lock);
297                         freeaddrinfo(res);
298                         return -1;
299                 }
300                 host->flags |= F_CONNECT;
301                 DEBUG("got a succesful connection for: %s", host->name);
302                 pthread_mutex_unlock(&host->lock);
303                 break;
304         }
305         
306         freeaddrinfo(res);
307         if (ai == NULL) {
308                 WARNING("riemann_connect: no suitable hosts found");
309                 return -1;
310         }
311
312         return 0;
313 }
314
315 void
316 riemann_free(void *p)
317 {
318         struct riemann_host     *host = p;
319
320         if (host->flags & F_CONNECT)
321                 close(host->s);
322         sfree(host);
323 }
324
325 int
326 riemann_config_host(oconfig_item_t *ci)
327 {
328         struct riemann_host     *host = NULL;
329         int                      status = 0;
330         int                      i;
331         oconfig_item_t          *child;
332         char                     w_cb_name[DATA_MAX_NAME_LEN];
333         char                     n_cb_name[DATA_MAX_NAME_LEN];
334         user_data_t              ud;
335
336         if (ci->values_num != 1 ||
337             ci->values[0].type != OCONFIG_TYPE_STRING) {
338                 WARNING("riemann hosts need one string argument");
339                 return -1;
340         }
341
342         if ((host = calloc(1, sizeof (*host))) == NULL) {
343                 WARNING("riemann host allocation failed");
344                 return ENOMEM;
345         }
346
347         if (cf_util_get_string_buffer(ci, host->name,
348                                       sizeof(host->name)) != 0) {
349                 WARNING("riemann host name too long");
350                 sfree(host);
351                 return -1;
352         }
353
354         host->port = RIEMANN_PORT;
355         host->delay = RIEMANN_DELAY;
356         for (i = 0; i < ci->children_num; i++) {
357                 /*
358                  * The code here could be simplified but makes room
359                  * for easy adding of new options later on.
360                  */
361                 child = &ci->children[i];
362                 status = 0;
363
364                 if (strcasecmp(child->key, "port") == 0) {
365                         if ((status = cf_util_get_port_number(child)) < 0) {
366                                 WARNING("invalid port number");
367                                 break;
368                         }
369                         host->port = status;
370                         status = 0;
371                 } else if (strcasecmp(child->key, "delay") == 0) {
372                         if ((status = cf_util_get_int(ci, &host->delay)) != 0)
373                                 break;
374                 } else {
375                         WARNING("riemann plugin: ignoring unknown config "
376                                 "option: \"%s\"", child->key);
377                 }
378         }
379         if (status != 0) {
380                 sfree(host);
381                 return status;
382         }
383
384         pthread_mutex_init(&host->lock, NULL);
385         ssnprintf(w_cb_name, sizeof(w_cb_name), "write-riemann/%s:%d",
386                   host->name, host->port);
387         ssnprintf(n_cb_name, sizeof(n_cb_name), "notification-riemann/%s:%d",
388                   host->name, host->port);
389         DEBUG("riemann w_cb_name: %s", w_cb_name);
390         DEBUG("riemann n_cb_name: %s", n_cb_name);
391         ud.data = host;
392         ud.free_func = riemann_free;
393         
394         if ((status = plugin_register_write(w_cb_name, riemann_write, &ud)) != 0)
395                 riemann_free(host);
396
397         if ((status = plugin_register_notification(n_cb_name,
398                                                    riemann_notification,
399                                                    &ud)) != 0) {
400                 plugin_unregister_write(w_cb_name);
401                 riemann_free(host);
402         }
403         return status;
404 }
405
406 int
407 riemann_config(oconfig_item_t *ci)
408 {
409         int              i;
410         char            *newtag;
411         oconfig_item_t  *child;
412
413         for (i = 0; i < ci->children_num; i++)  {
414                 child = &ci->children[i];
415
416                 if (strcasecmp(child->key, "host") == 0) {
417                         riemann_config_host(child);
418                 } else if (strcasecmp(child->key, "tag") == 0) {
419                         if (riemann_tagcount >= RIEMANN_EXTRA_TAGS) {
420                                 WARNING("riemann plugin: too many tags");
421                                 return -1;
422                         }
423                         newtag = NULL;
424                         cf_util_get_string(child, &newtag);
425                         if (newtag == NULL)
426                                 return -1;
427                         riemann_tags[riemann_tagcount++] = newtag;
428                         DEBUG("riemann_config: got tag: %s", newtag);
429  
430                 } else {
431                         WARNING ("riemann plugin: Ignoring unknown "
432                                  "configuration option \"%s\" at top level.",
433                                  child->key);
434                 }
435         }
436         return (0);
437 }
438
439 void
440 module_register(void)
441 {
442         DEBUG("riemann: module_register");
443         
444         plugin_register_complex_config ("riemann", riemann_config);
445 }