ddbbe9e381a09a6e9f2976ae063422a93901b236
[collectd.git] / src / riemann.c
1 /*
2  * collectd - src/riemann.c
3  *
4  * Copyright (C) 2012  Pierre-Yves Ritschard <pyr@spootnik.org>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF MIND, USE, DATA OR PROFITS, WHETHER
15  * IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING
16  * OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  *
18  */
19
20 #include "collectd.h"
21 #include "plugin.h"
22 #include "common.h"
23 #include "configfile.h"
24 #include "riemann.pb-c.h"
25
26 #include <sys/socket.h>
27 #include <arpa/inet.h>
28 #include <errno.h>
29 #include <netdb.h>
30 #include <inttypes.h>
31 #include <pthread.h>
32
33 #define RIEMANN_DELAY           1
34 #define RIEMANN_PORT            5555
35 #define RIEMANN_MAX_TAGS        37
36 #define RIEMANN_EXTRA_TAGS      32
37
38 struct riemann_host {
39         struct riemann_host     *next;
40 #define F_CONNECT                0x01
41         u_int8_t                 flags;
42         pthread_mutex_t          lock;
43         int                      delay;
44         char                     name[DATA_MAX_NAME_LEN];
45         int                      port;
46         int                      s;
47 };
48
49 struct riemann_event {
50         Event            ev;
51         char             service[DATA_MAX_NAME_LEN];
52         const char      *tags[RIEMANN_MAX_TAGS];
53 };
54
55 char    *riemann_tags[RIEMANN_EXTRA_TAGS];
56 int      riemann_tagcount;
57
58 int     riemann_write(const data_set_t *, const value_list_t *, user_data_t *);
59 int     riemann_connect(struct riemann_host *);
60 void    riemann_free(void *);
61 int     riemann_config_host(oconfig_item_t *);
62 int     riemann_config(oconfig_item_t *);
63 void    module_register(void);
64
65 /*
66  * Functions
67  */
68 int
69 riemann_write(const data_set_t *ds,
70               const value_list_t *vl,
71               user_data_t *ud)
72 {
73         int                      i, j;
74         int                      status;
75         struct riemann_host     *host = ud->data;
76         Msg                      msg = MSG__INIT;
77         Event                   *ev;
78         struct riemann_event    *event_tab, *event;
79         u_char                  *buf;
80         size_t                   len;
81
82         if ((status = riemann_connect(host)) != 0)
83                 return status;
84
85         msg.n_events = vl->values_len;
86
87         /*
88          * Get rid of allocations up front
89          */
90         if ((msg.events = calloc(msg.n_events, sizeof(*msg.events))) == NULL ||
91             (event_tab = calloc(msg.n_events, sizeof(*event_tab))) == NULL) {
92                 free(msg.events);
93                 free(event_tab);
94                 return ENOMEM;
95         }
96
97         /*
98          * Now produce valid protobuf structures
99          */
100         for (i = 0; i < vl->values_len; i++) {
101                 event = &event_tab[i];
102                 event__init(&event->ev);
103
104                 ev = &event->ev;
105                 event__init(ev);
106                 ev->host = host->name;
107                 ev->has_time = 1;
108                 ev->time = CDTIME_T_TO_TIME_T(vl->time);
109                 ev->has_ttl = 1;
110                 ev->ttl = CDTIME_T_TO_TIME_T(vl->interval) + host->delay;
111                 ev->n_tags = 3;
112                 ev->tags = (char **)event->tags;
113                 event->tags[0] = DS_TYPE_TO_STRING(ds->ds[i].type);
114                 event->tags[1] = vl->plugin;
115                 event->tags[2] = ds->ds[i].name;
116                 if (vl->plugin_instance && strlen(vl->plugin_instance)) {
117                         event->tags[ev->n_tags++] = vl->plugin_instance;
118                 }
119                 if (vl->type && strlen(vl->type)) {
120                         event->tags[ev->n_tags++] = vl->type;
121                 }
122                 if (vl->type_instance && strlen(vl->type_instance)) {
123                         event->tags[ev->n_tags++] = vl->type_instance;
124                 }
125
126                 /* add user defined extra tags */
127                 for (j = 0; j < riemann_tagcount; j++)
128                         event->tags[ev->n_tags++] = riemann_tags[j];
129
130                 switch (ds->ds[i].type) {
131                 case DS_TYPE_COUNTER:
132                         ev->has_metric_sint64 = 1;
133                         ev->metric_sint64 = vl->values[i].counter;
134                         break;
135                 case DS_TYPE_GAUGE:
136                         ev->has_metric_d = 1;
137                         ev->metric_d = vl->values[i].gauge;
138                         break;
139                 case DS_TYPE_DERIVE:
140                         ev->has_metric_sint64 = 1;
141                         ev->metric_sint64 = vl->values[i].derive;
142                         break;
143                 case DS_TYPE_ABSOLUTE:
144                         ev->has_metric_sint64 = 1;
145                         ev->metric_sint64 = vl->values[i].absolute;
146                         break;
147                 default:
148                         WARNING("riemann_write: unknown metric type: %d",
149                                 ds->ds[i].type);
150                         break;
151                 }
152                 ssnprintf(event->service, sizeof(event->service),
153                           "%s-%s-%s-%s-%s", vl->plugin, vl->plugin_instance,
154                           vl->type, vl->type_instance, ds->ds[i].name);
155                 ev->service = event->service;
156                 DEBUG("riemann_write: %s ready to send", ev->service);
157                 msg.events[i] = ev;
158         }
159         
160         /*
161          * we have now packed a bunch of events, let's pack them
162          */
163         len = msg__get_packed_size(&msg);
164         DEBUG("riemann_write: packed size computed: %ld", len);
165         if ((buf = calloc(1, len)) == NULL) {
166                 WARNING("riemann_write: failing to alloc buf!");
167                 sfree(msg.events);
168                 return ENOMEM;
169         }
170
171         /*
172          * prepend full size to beginning of buffer
173          */
174         msg__pack(&msg, buf);
175         sfree(msg.events);
176
177         /*
178          * we're now ready to send
179          */
180         if (write(host->s, buf, len) != len) {
181                 WARNING("riemann_write: could not send out full packet");
182                 return -1;
183         }
184         free(buf);
185         return 0;
186 }
187
188 int
189 riemann_connect(struct riemann_host *host)
190 {
191         int                      e;
192         struct addrinfo         *ai, *res, hints;
193         struct sockaddr_in      *sin4;
194         struct sockaddr_in6     *sin6;
195
196         if (host->flags & F_CONNECT)
197                 return 0;
198                 
199         memset(&hints, 0, sizeof(hints));
200         hints.ai_family = PF_UNSPEC;
201         hints.ai_socktype = SOCK_DGRAM;
202         
203         if ((e = getaddrinfo(host->name, NULL, &hints, &res)) != 0) {
204                 WARNING("could not resolve host \"%s\": %s",
205                         host->name, gai_strerror(e));
206                 return -1;
207         }
208
209         for (ai = res; ai != NULL; ai = ai->ai_next) {
210                 pthread_mutex_lock(&host->lock);
211                 /*
212                  * check if another thread did not already succesfully connect
213                  */
214                 if (host->flags & F_CONNECT) {
215                         freeaddrinfo(res);
216                         return 0;
217                 }
218                 
219                 if ((host->s = socket(ai->ai_family, SOCK_DGRAM, 0)) == -1) {
220                         pthread_mutex_unlock(&host->lock);
221                         WARNING("riemann_connect: could not open socket");
222                         freeaddrinfo(res);
223                         return -1;
224                 }
225
226                 switch (ai->ai_family) {
227                 case AF_INET:
228                         sin4 = (struct sockaddr_in *)ai->ai_addr;
229                         sin4->sin_port = ntohs(host->port);
230                         break;
231                 case AF_INET6:
232                         sin6 = (struct sockaddr_in6 *)ai->ai_addr;
233                         sin6->sin6_port = ntohs(host->port);
234                         break;
235                 default:
236                         WARNING("riemann_connect: unsupported address family");
237                         close(host->s);
238                         pthread_mutex_unlock(&host->lock);
239                         freeaddrinfo(res);
240                         return -1;
241                 }
242
243                 if (connect(host->s, ai->ai_addr, ai->ai_addrlen) != 0) {
244                         close(host->s);
245                         host->flags |= ~F_CONNECT;
246                         pthread_mutex_unlock(&host->lock);
247                         freeaddrinfo(res);
248                         return -1;
249                 }
250                 host->flags |= F_CONNECT;
251                 DEBUG("got a succesful connection for: %s", host->name);
252                 pthread_mutex_unlock(&host->lock);
253                 break;
254         }
255         
256         freeaddrinfo(res);
257         if (ai == NULL) {
258                 WARNING("riemann_connect: no suitable hosts found");
259                 return -1;
260         }
261
262         return 0;
263 }
264
265 void
266 riemann_free(void *p)
267 {
268         struct riemann_host     *host = p;
269
270         if (host->flags & F_CONNECT)
271                 close(host->s);
272         sfree(host);
273 }
274
275 int
276 riemann_config_host(oconfig_item_t *ci)
277 {
278         struct riemann_host     *host = NULL;
279         int                      status = 0;
280         int                      i;
281         oconfig_item_t          *child;
282         char                     cb_name[DATA_MAX_NAME_LEN];
283         user_data_t              ud;
284
285         if (ci->values_num != 1 ||
286             ci->values[0].type != OCONFIG_TYPE_STRING) {
287                 WARNING("riemann hosts need one string argument");
288                 return -1;
289         }
290
291         if ((host = calloc(1, sizeof (*host))) == NULL) {
292                 WARNING("riemann host allocation failed");
293                 return ENOMEM;
294         }
295
296         if (cf_util_get_string_buffer(ci, host->name,
297                                       sizeof(host->name)) != 0) {
298                 WARNING("riemann host name too long");
299                 sfree(host);
300                 return -1;
301         }
302
303         host->port = RIEMANN_PORT;
304         host->delay = RIEMANN_DELAY;
305         for (i = 0; i < ci->children_num; i++) {
306                 /*
307                  * The code here could be simplified but makes room
308                  * for easy adding of new options later on.
309                  */
310                 child = &ci->children[i];
311                 status = 0;
312
313                 if (strcasecmp(child->key, "port") == 0) {
314                         if ((status = cf_util_get_port_number(child)) < 0) {
315                                 WARNING("invalid port number");
316                                 break;
317                         }
318                         host->port = status;
319                         status = 0;
320                 } else if (strcasecmp(child->key, "delay") == 0) {
321                         if ((status = cf_util_get_int(ci, &host->delay)) != 0)
322                                 break;
323                 } else {
324                         WARNING("riemann plugin: ignoring unknown config "
325                                 "option: \"%s\"", child->key);
326                 }
327         }
328         if (status != 0) {
329                 sfree(host);
330                 return status;
331         }
332
333         pthread_mutex_init(&host->lock, NULL);
334         ssnprintf(cb_name, sizeof(cb_name), "riemann/%s:%d", host->name, host->port);
335         DEBUG("riemann cb_name: %s", cb_name);
336         ud.data = host;
337         ud.free_func = riemann_free;
338         
339         if ((status = plugin_register_write(cb_name, riemann_write, &ud)) != 0)
340                 riemann_free(host);
341         
342         return status;
343 }
344
345 int
346 riemann_config(oconfig_item_t *ci)
347 {
348         int              i;
349         char            *newtag;
350         oconfig_item_t  *child;
351
352         for (i = 0; i < ci->children_num; i++)  {
353                 child = &ci->children[i];
354
355                 if (strcasecmp(child->key, "host") == 0) {
356                         riemann_config_host(child);
357                 } else if (strcasecmp(child->key, "tag") == 0) {
358                         if (riemann_tagcount >= RIEMANN_EXTRA_TAGS) {
359                                 WARNING("riemann plugin: too many tags");
360                                 return -1;
361                         }
362                         newtag = NULL;
363                         cf_util_get_string(child, &newtag);
364                         if (newtag == NULL)
365                                 return -1;
366                         riemann_tags[riemann_tagcount++] = newtag;
367                         DEBUG("riemann_config: got tag: %s", newtag);
368  
369                 } else {
370                         WARNING ("riemann plugin: Ignoring unknown "
371                                  "configuration option \"%s\" at top level.",
372                                  child->key);
373                 }
374         }
375         return (0);
376 }
377
378 void
379 module_register(void)
380 {
381         DEBUG("riemann: module_register");
382         
383         plugin_register_complex_config ("riemann", riemann_config);
384 }