statsd plugin: Initial implementation.
[collectd.git] / src / statsd.c
1 /**
2  * collectd - src/statsd.c
3  *
4  * Copyright (C) 2013       Florian octo Forster
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF MIND, USE, DATA OR PROFITS, WHETHER
15  * IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING
16  * OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  *
18  * Authors:
19  *   Florian octo Forster <octo at collectd.org>
20  */
21
22 #include "collectd.h"
23 #include "plugin.h"
24 #include "common.h"
25 #include "configfile.h"
26 #include "utils_avltree.h"
27 #include "utils_complain.h"
28
29 #include <pthread.h>
30
31 #include <sys/types.h>
32 #include <sys/socket.h>
33 #include <netdb.h>
34 #include <poll.h>
35
36 #ifndef STATSD_DEFAULT_NODE
37 # define STATSD_DEFAULT_NODE NULL
38 #endif
39
40 #ifndef STATSD_DEFAULT_SERVICE
41 # define STATSD_DEFAULT_SERVICE "8125"
42 #endif
43
44 enum metric_type_e
45 {
46   STATSD_COUNTER,
47   STATSD_TIMER,
48   STATSD_GAUGE
49 };
50 typedef enum metric_type_e metric_type_t;
51
52 struct statsd_metric_s
53 {
54   metric_type_t type;
55   int64_t value;
56   cdtime_t last_update;
57 };
58 typedef struct statsd_metric_s statsd_metric_t;
59
60 static c_avl_tree_t   *metrics_tree = NULL;
61 static pthread_mutex_t metrics_lock = PTHREAD_MUTEX_INITIALIZER;
62
63 static pthread_t network_thread;
64 static _Bool     network_thread_running = 0;
65 static _Bool     network_thread_shutdown = 0;
66
67 static char *conf_node = NULL;
68 static char *conf_service = NULL;
69
70 /* Must hold metrics_lock when calling this function. */
71 static int statsd_metric_set_unsafe (char const *name, int64_t value, /* {{{ */
72     metric_type_t type)
73 {
74   cdtime_t now;
75   statsd_metric_t *metric;
76   char *key;
77   int status;
78
79   now = cdtime ();
80
81   status = c_avl_get (metrics_tree, name, (void *) &metric);
82   if (status == 0)
83   {
84     metric->value = value;
85     metric->last_update = now;
86
87     return (0);
88   }
89
90   key = strdup (name);
91   metric = calloc (1, sizeof (*metric));
92   if ((key == NULL) || (metric == NULL))
93   {
94     sfree (key);
95     sfree (metric);
96     return (-1);
97   }
98
99   metric->type = type;
100   metric->value = value;
101   metric->last_update = now;
102
103   status = c_avl_insert (metrics_tree, key, metric);
104   if (status != 0)
105   {
106     sfree (key);
107     sfree (metric);
108
109     return (-1);
110   }
111
112   return (0);
113 } /* }}} int statsd_metric_set_unsafe */
114
115 static int statsd_metric_set (char const *name, int64_t value, /* {{{ */
116     metric_type_t type)
117 {
118   int status;
119
120   pthread_mutex_lock (&metrics_lock);
121   status = statsd_metric_set_unsafe (name, value, type);
122   pthread_mutex_unlock (&metrics_lock);
123
124   return (status);
125 } /* }}} int statsd_metric_set */
126
127 static int statsd_metric_add (char const *name, int64_t delta, /* {{{ */
128     metric_type_t type)
129 {
130   cdtime_t now;
131   statsd_metric_t *metric;
132   int status;
133
134   now = cdtime ();
135   pthread_mutex_lock (&metrics_lock);
136
137   status = c_avl_get (metrics_tree, name, (void *) &metric);
138   if (status == 0)
139   {
140     metric->value += delta;
141     metric->last_update = now;
142
143     pthread_mutex_unlock (&metrics_lock);
144     return (0);
145   }
146   else /* no such value yet */
147   {
148     status = statsd_metric_set_unsafe (name, delta, type);
149
150     pthread_mutex_unlock (&metrics_lock);
151     return (status);
152   }
153 } /* }}} int statsd_metric_add */
154
155 static int statsd_handle_counter (char const *name, /* {{{ */
156     char const *value_str,
157     char const *extra)
158 {
159   value_t value;
160   value_t scale;
161   int status;
162
163   if ((extra != NULL) && (extra[0] != '@'))
164     return (-1);
165
166   scale.gauge = 1.0;
167   if (extra != NULL)
168   {
169     status = parse_value (extra + 1, &scale, DS_TYPE_GAUGE);
170     if (status != 0)
171       return (status);
172
173     if (!isfinite (scale.gauge) || (scale.gauge <= 0.0) || (scale.gauge > 1.0))
174       return (-1);
175   }
176
177   value.derive = 1;
178   status = parse_value (value_str, &value, DS_TYPE_DERIVE);
179   if (status != 0)
180     return (status);
181
182   if (value.derive < 1)
183     return (-1);
184
185   return (statsd_metric_add (name,
186         (int64_t) (((gauge_t) value.derive) / scale.gauge),
187         STATSD_COUNTER));
188 } /* }}} int statsd_handle_counter */
189
190 static int statsd_handle_gauge (char const *name, /* {{{ */
191     char const *value_str)
192 {
193   value_t value;
194   int status;
195
196   value.derive = 0;
197   status = parse_value (value_str, &value, DS_TYPE_DERIVE);
198   if (status != 0)
199     return (status);
200
201   if ((value_str[0] == '+') || (value_str[0] == '-'))
202     return (statsd_metric_add (name, (int64_t) value.derive, STATSD_GAUGE));
203   else
204     return (statsd_metric_set (name, (int64_t) value.derive, STATSD_GAUGE));
205 } /* }}} int statsd_handle_gauge */
206
207 static int statsd_handle_timer (char const *name, /* {{{ */
208     char const *value_str)
209 {
210   value_t value;
211   int status;
212
213   value.derive = 0;
214   status = parse_value (value_str, &value, DS_TYPE_DERIVE);
215   if (status != 0)
216     return (status);
217
218   return (statsd_metric_add (name, (int64_t) value.derive, STATSD_TIMER));
219 } /* }}} int statsd_handle_timer */
220
221 static int statsd_handle_set (char const *name __attribute__((unused)), /* {{{ */
222     char const *value_str __attribute__((unused)))
223 {
224   static c_complain_t c = C_COMPLAIN_INIT_STATIC;
225
226   c_complain (LOG_WARNING, &c,
227       "statsd plugin: Support for sets is not yet implemented.");
228
229   return (0);
230 } /* }}} int statsd_handle_set */
231
232 static int statsd_parse_line (char *buffer) /* {{{ */
233 {
234   char *name = buffer;
235   char *value;
236   char *type;
237   char *extra;
238
239   type = strchr (name, '|');
240   if (type == NULL)
241     return (-1);
242   *type = 0;
243   type++;
244
245   value = strrchr (name, ':');
246   if (value == NULL)
247     return (-1);
248   *value = 0;
249   value++;
250
251   extra = strchr (type, '|');
252   if (extra != NULL)
253   {
254     *extra = 0;
255     extra++;
256   }
257
258   if (strcmp ("c", type) == 0)
259     return (statsd_handle_counter (name, value, extra));
260
261   /* extra is only valid for counters */
262   if (extra != NULL)
263     return (-1);
264
265   if (strcmp ("g", type) == 0)
266     return (statsd_handle_gauge (name, value));
267   else if (strcmp ("ms", type) == 0)
268     return (statsd_handle_timer (name, value));
269   else if (strcmp ("s", type) == 0)
270     return (statsd_handle_set (name, value));
271   else
272     return (-1);
273 } /* }}} void statsd_parse_line */
274
275 static void statsd_parse_buffer (char *buffer) /* {{{ */
276 {
277   char *dummy;
278   char *saveptr = NULL;
279   char *ptr;
280
281   for (dummy = buffer;
282       (ptr = strtok_r (dummy, "\r\n", &saveptr)) != NULL;
283       dummy = NULL)
284   {
285     char *line_orig = sstrdup (ptr);
286     int status;
287
288     status = statsd_parse_line (ptr);
289     if (status != 0)
290       ERROR ("statsd plugin: Unable to parse line: \"%s\"", line_orig);
291
292     sfree (line_orig);
293   }
294 } /* }}} void statsd_parse_buffer */
295
296 static void statsd_network_read (int fd) /* {{{ */
297 {
298   char buffer[4096];
299   size_t buffer_size;
300   ssize_t status;
301
302   status = recv (fd, buffer, sizeof (buffer), /* flags = */ MSG_DONTWAIT);
303   if (status < 0)
304   {
305     char errbuf[1024];
306
307     if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
308       return;
309
310     ERROR ("statsd plugin: recv(2) failed: %s",
311         sstrerror (errno, errbuf, sizeof (errbuf)));
312     return;
313   }
314
315   buffer_size = (size_t) status;
316   if (buffer_size >= sizeof (buffer))
317     buffer_size = sizeof (buffer) - 1;
318   buffer[buffer_size] = 0;
319
320   statsd_parse_buffer (buffer);
321 } /* }}} void statsd_network_read */
322
323 static int statsd_network_init (struct pollfd **ret_fds, /* {{{ */
324     size_t *ret_fds_num)
325 {
326   struct pollfd *fds = NULL;
327   size_t fds_num = 0;
328
329   struct addrinfo ai_hints;
330   struct addrinfo *ai_list = NULL;
331   struct addrinfo *ai_ptr;
332   int status;
333
334   char const *node = (conf_node != NULL) ? conf_node : STATSD_DEFAULT_NODE;
335   char const *service = (conf_service != NULL)
336     ? conf_service : STATSD_DEFAULT_SERVICE;
337
338   memset (&ai_hints, 0, sizeof (ai_hints));
339   ai_hints.ai_flags = AI_PASSIVE;
340 #ifdef AI_ADDRCONFIG
341   ai_hints.ai_flags |= AI_ADDRCONFIG;
342 #endif
343   ai_hints.ai_family = AF_UNSPEC;
344   ai_hints.ai_socktype = SOCK_DGRAM;
345
346   status = getaddrinfo (node, service, &ai_hints, &ai_list);
347   if (status != 0)
348   {
349     ERROR ("statsd plugin: getaddrinfo (\"%s\", \"%s\") failed: %s",
350         node, service, gai_strerror (status));
351     return (status);
352   }
353
354   for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
355   {
356     int fd;
357     struct pollfd *tmp;
358
359     char dbg_node[NI_MAXHOST];
360     char dbg_service[NI_MAXSERV];
361
362     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
363     if (fd < 0)
364     {
365       char errbuf[1024];
366       ERROR ("statsd plugin: socket(2) failed: %s",
367           sstrerror (errno, errbuf, sizeof (errbuf)));
368       continue;
369     }
370
371     getnameinfo (ai_ptr->ai_addr, ai_ptr->ai_addrlen,
372         dbg_node, sizeof (dbg_node), dbg_service, sizeof (dbg_service),
373         NI_DGRAM | NI_NUMERICHOST | NI_NUMERICSERV);
374     DEBUG ("statsd plugin: Trying to bind to [%s]:%s ...", dbg_node, dbg_service);
375
376     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
377     if (status != 0)
378     {
379       char errbuf[1024];
380       ERROR ("statsd plugin: bind(2) failed: %s",
381           sstrerror (errno, errbuf, sizeof (errbuf)));
382       close (fd);
383       continue;
384     }
385
386     tmp = realloc (fds, sizeof (*fds) * (fds_num + 1));
387     if (tmp == NULL)
388     {
389       ERROR ("statsd plugin: realloc failed.");
390       continue;
391     }
392     fds = tmp;
393     tmp = fds + fds_num;
394     fds_num++;
395
396     memset (tmp, 0, sizeof (*tmp));
397     tmp->fd = fd;
398     tmp->events = POLLIN | POLLPRI;
399   }
400
401   freeaddrinfo (ai_list);
402
403   if (fds_num == 0)
404   {
405     ERROR ("statsd plugin: Unable to create listening socket for [%s]:%s.",
406         (node != NULL) ? node : "::", service);
407     return (ENOENT);
408   }
409
410   *ret_fds = fds;
411   *ret_fds_num = fds_num;
412   return (0);
413 } /* }}} int statsd_network_init */
414
415 static void *statsd_network_thread (void *args) /* {{{ */
416 {
417   struct pollfd *fds = NULL;
418   size_t fds_num = 0;
419   int status;
420   size_t i;
421
422   status = statsd_network_init (&fds, &fds_num);
423   if (status != 0)
424   {
425     ERROR ("statsd plugin: Unable to open listening sockets.");
426     pthread_exit ((void *) 0);
427   }
428
429   while (!network_thread_shutdown)
430   {
431     status = poll (fds, (nfds_t) fds_num, /* timeout = */ -1);
432     if (status < 0)
433     {
434       char errbuf[1024];
435
436       if ((errno == EINTR) || (errno == EAGAIN))
437         continue;
438
439       ERROR ("statsd plugin: poll(2) failed: %s",
440           sstrerror (errno, errbuf, sizeof (errbuf)));
441       break;
442     }
443
444     for (i = 0; i < fds_num; i++)
445     {
446       if ((fds[i].revents & (POLLIN | POLLPRI)) == 0)
447         continue;
448
449       statsd_network_read (fds[i].fd);
450       fds[i].revents = 0;
451     }
452   } /* while (!network_thread_shutdown) */
453
454   /* Clean up */
455   for (i = 0; i < fds_num; i++)
456     close (fds[i].fd);
457   sfree (fds);
458
459   return ((void *) 0);
460 } /* }}} void *statsd_network_thread */
461
462 static int statsd_init (void) /* {{{ */
463 {
464   pthread_mutex_lock (&metrics_lock);
465   if (metrics_tree == NULL)
466     metrics_tree = c_avl_create ((void *) strcasecmp);
467
468   if (!network_thread_running)
469   {
470     int status;
471
472     status = pthread_create (&network_thread,
473         /* attr = */ NULL,
474         statsd_network_thread,
475         /* args = */ NULL);
476     if (status != 0)
477     {
478       char errbuf[1024];
479       pthread_mutex_unlock (&metrics_lock);
480       ERROR ("statsd plugin: pthread_create failed: %s",
481           sstrerror (errno, errbuf, sizeof (errbuf)));
482       return (status);
483     }
484   }
485   network_thread_running = 1;
486
487   pthread_mutex_unlock (&metrics_lock);
488
489   return (0);
490 } /* }}} int statsd_init */
491
492 static int statsd_metric_submit (char const *name, /* {{{ */
493     statsd_metric_t const *metric)
494 {
495   value_t values[1];
496   value_list_t vl = VALUE_LIST_INIT;
497
498   if (metric->type == STATSD_GAUGE)
499     values[0].gauge = (gauge_t) metric->value;
500   else
501     values[0].derive = (derive_t) metric->value;
502
503   vl.values = values;
504   vl.values_len = 1;
505   sstrncpy (vl.host, hostname_g, sizeof (vl.host));
506   sstrncpy (vl.plugin, "statsd", sizeof (vl.plugin));
507
508   if (metric->type == STATSD_GAUGE)
509     sstrncpy (vl.type, "gauge", sizeof (vl.type));
510   else if (metric->type == STATSD_TIMER)
511     sstrncpy (vl.type, "total_time_in_ms", sizeof (vl.type));
512   else /* if (metric->type == STATSD_COUNTER) */
513     sstrncpy (vl.type, "derive", sizeof (vl.type));
514
515   sstrncpy (vl.type_instance, name, sizeof (vl.type_instance));
516
517   return (plugin_dispatch_values (&vl));
518 } /* }}} int statsd_metric_submit */
519
520 static int statsd_read (void) /* {{{ */
521 {
522   c_avl_iterator_t *i;
523   char *name;
524   statsd_metric_t *metric;
525
526   pthread_mutex_lock (&metrics_lock);
527
528   if (metrics_tree == NULL)
529   {
530     pthread_mutex_unlock (&metrics_lock);
531     return (0);
532   }
533
534   i = c_avl_get_iterator (metrics_tree);
535   while (c_avl_iterator_next (i, (void *) &name, (void *) &metric) == 0)
536     statsd_metric_submit (name, metric);
537   c_avl_iterator_destroy (i);
538
539   pthread_mutex_unlock (&metrics_lock);
540
541   return (0);
542 } /* }}} int statsd_read */
543
544 static int statsd_shutdown (void) /* {{{ */
545 {
546   void *key;
547   void *value;
548
549   pthread_mutex_lock (&metrics_lock);
550
551   if (network_thread_running)
552   {
553     network_thread_shutdown = 1;
554     pthread_kill (network_thread, SIGTERM);
555     pthread_join (network_thread, /* retval = */ NULL);
556   }
557   network_thread_running = 0;
558
559   while (c_avl_pick (metrics_tree, &key, &value) == 0)
560   {
561     sfree (key);
562     sfree (value);
563   }
564   c_avl_destroy (metrics_tree);
565   metrics_tree = NULL;
566
567   pthread_mutex_unlock (&metrics_lock);
568
569   return (0);
570 } /* }}} int statsd_shutdown */
571
572 void module_register (void)
573 {
574   plugin_register_init ("statsd", statsd_init);
575   plugin_register_read ("statsd", statsd_read);
576   plugin_register_shutdown ("statsd", statsd_shutdown);
577 }
578
579 /* vim: set sw=2 sts=2 et fdm=marker : */