statsd plugin: Implement the "Delete{Counters,Timers,Gauges}" options.
[collectd.git] / src / statsd.c
1 /**
2  * collectd - src/statsd.c
3  *
4  * Copyright (C) 2013       Florian octo Forster
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF MIND, USE, DATA OR PROFITS, WHETHER
15  * IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING
16  * OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  *
18  * Authors:
19  *   Florian octo Forster <octo at collectd.org>
20  */
21
22 #include "collectd.h"
23 #include "plugin.h"
24 #include "common.h"
25 #include "configfile.h"
26 #include "utils_avltree.h"
27 #include "utils_complain.h"
28
29 #include <pthread.h>
30
31 #include <sys/types.h>
32 #include <sys/socket.h>
33 #include <netdb.h>
34 #include <poll.h>
35
36 #ifndef STATSD_DEFAULT_NODE
37 # define STATSD_DEFAULT_NODE NULL
38 #endif
39
40 #ifndef STATSD_DEFAULT_SERVICE
41 # define STATSD_DEFAULT_SERVICE "8125"
42 #endif
43
44 enum metric_type_e
45 {
46   STATSD_COUNTER,
47   STATSD_TIMER,
48   STATSD_GAUGE
49 };
50 typedef enum metric_type_e metric_type_t;
51
52 struct statsd_metric_s
53 {
54   metric_type_t type;
55   int64_t value;
56   unsigned long updates_num;
57 };
58 typedef struct statsd_metric_s statsd_metric_t;
59
60 static c_avl_tree_t   *metrics_tree = NULL;
61 static pthread_mutex_t metrics_lock = PTHREAD_MUTEX_INITIALIZER;
62
63 static pthread_t network_thread;
64 static _Bool     network_thread_running = 0;
65 static _Bool     network_thread_shutdown = 0;
66
67 static char *conf_node = NULL;
68 static char *conf_service = NULL;
69
70 static _Bool conf_delete_counters = 0;
71 static _Bool conf_delete_timers   = 0;
72 static _Bool conf_delete_gauges   = 0;
73
74 /* Must hold metrics_lock when calling this function. */
75 static int statsd_metric_set_unsafe (char const *name, int64_t value, /* {{{ */
76     metric_type_t type)
77 {
78   statsd_metric_t *metric;
79   char *key;
80   int status;
81
82   status = c_avl_get (metrics_tree, name, (void *) &metric);
83   if (status == 0)
84   {
85     metric->value = value;
86     metric->updates_num++;
87
88     return (0);
89   }
90
91   DEBUG ("stats plugin: Adding new metric \"%s\".", name);
92   /* FIXME: The keys should have a prefix so counter, gauge and timer with the
93    * same name can exist. */
94   key = strdup (name);
95   metric = calloc (1, sizeof (*metric));
96   if ((key == NULL) || (metric == NULL))
97   {
98     sfree (key);
99     sfree (metric);
100     return (-1);
101   }
102
103   metric->type = type;
104   metric->value = value;
105   metric->updates_num = 1;
106
107   status = c_avl_insert (metrics_tree, key, metric);
108   if (status != 0)
109   {
110     sfree (key);
111     sfree (metric);
112
113     return (-1);
114   }
115
116   return (0);
117 } /* }}} int statsd_metric_set_unsafe */
118
119 static int statsd_metric_set (char const *name, int64_t value, /* {{{ */
120     metric_type_t type)
121 {
122   int status;
123
124   pthread_mutex_lock (&metrics_lock);
125   status = statsd_metric_set_unsafe (name, value, type);
126   pthread_mutex_unlock (&metrics_lock);
127
128   return (status);
129 } /* }}} int statsd_metric_set */
130
131 static int statsd_metric_add (char const *name, int64_t delta, /* {{{ */
132     metric_type_t type)
133 {
134   statsd_metric_t *metric;
135   int status;
136
137   pthread_mutex_lock (&metrics_lock);
138
139   status = c_avl_get (metrics_tree, name, (void *) &metric);
140   if (status == 0)
141   {
142     metric->value += delta;
143     metric->updates_num++;
144
145     pthread_mutex_unlock (&metrics_lock);
146     return (0);
147   }
148   else /* no such value yet */
149   {
150     status = statsd_metric_set_unsafe (name, delta, type);
151
152     pthread_mutex_unlock (&metrics_lock);
153     return (status);
154   }
155 } /* }}} int statsd_metric_add */
156
157 static int statsd_handle_counter (char const *name, /* {{{ */
158     char const *value_str,
159     char const *extra)
160 {
161   value_t value;
162   value_t scale;
163   int status;
164
165   if ((extra != NULL) && (extra[0] != '@'))
166     return (-1);
167
168   scale.gauge = 1.0;
169   if (extra != NULL)
170   {
171     status = parse_value (extra + 1, &scale, DS_TYPE_GAUGE);
172     if (status != 0)
173       return (status);
174
175     if (!isfinite (scale.gauge) || (scale.gauge <= 0.0) || (scale.gauge > 1.0))
176       return (-1);
177   }
178
179   value.derive = 1;
180   status = parse_value (value_str, &value, DS_TYPE_DERIVE);
181   if (status != 0)
182     return (status);
183
184   if (value.derive < 1)
185     return (-1);
186
187   return (statsd_metric_add (name,
188         (int64_t) (((gauge_t) value.derive) / scale.gauge),
189         STATSD_COUNTER));
190 } /* }}} int statsd_handle_counter */
191
192 static int statsd_handle_gauge (char const *name, /* {{{ */
193     char const *value_str)
194 {
195   value_t value;
196   int status;
197
198   value.derive = 0;
199   status = parse_value (value_str, &value, DS_TYPE_DERIVE);
200   if (status != 0)
201     return (status);
202
203   if ((value_str[0] == '+') || (value_str[0] == '-'))
204     return (statsd_metric_add (name, (int64_t) value.derive, STATSD_GAUGE));
205   else
206     return (statsd_metric_set (name, (int64_t) value.derive, STATSD_GAUGE));
207 } /* }}} int statsd_handle_gauge */
208
209 static int statsd_handle_timer (char const *name, /* {{{ */
210     char const *value_str)
211 {
212   value_t value;
213   int status;
214
215   value.derive = 0;
216   status = parse_value (value_str, &value, DS_TYPE_DERIVE);
217   if (status != 0)
218     return (status);
219
220   return (statsd_metric_add (name, (int64_t) value.derive, STATSD_TIMER));
221 } /* }}} int statsd_handle_timer */
222
223 static int statsd_handle_set (char const *name __attribute__((unused)), /* {{{ */
224     char const *value_str __attribute__((unused)))
225 {
226   static c_complain_t c = C_COMPLAIN_INIT_STATIC;
227
228   c_complain (LOG_WARNING, &c,
229       "statsd plugin: Support for sets is not yet implemented.");
230
231   return (0);
232 } /* }}} int statsd_handle_set */
233
234 static int statsd_parse_line (char *buffer) /* {{{ */
235 {
236   char *name = buffer;
237   char *value;
238   char *type;
239   char *extra;
240
241   type = strchr (name, '|');
242   if (type == NULL)
243     return (-1);
244   *type = 0;
245   type++;
246
247   value = strrchr (name, ':');
248   if (value == NULL)
249     return (-1);
250   *value = 0;
251   value++;
252
253   extra = strchr (type, '|');
254   if (extra != NULL)
255   {
256     *extra = 0;
257     extra++;
258   }
259
260   if (strcmp ("c", type) == 0)
261     return (statsd_handle_counter (name, value, extra));
262
263   /* extra is only valid for counters */
264   if (extra != NULL)
265     return (-1);
266
267   if (strcmp ("g", type) == 0)
268     return (statsd_handle_gauge (name, value));
269   else if (strcmp ("ms", type) == 0)
270     return (statsd_handle_timer (name, value));
271   else if (strcmp ("s", type) == 0)
272     return (statsd_handle_set (name, value));
273   else
274     return (-1);
275 } /* }}} void statsd_parse_line */
276
277 static void statsd_parse_buffer (char *buffer) /* {{{ */
278 {
279   char *dummy;
280   char *saveptr = NULL;
281   char *ptr;
282
283   for (dummy = buffer;
284       (ptr = strtok_r (dummy, "\r\n", &saveptr)) != NULL;
285       dummy = NULL)
286   {
287     char *line_orig = sstrdup (ptr);
288     int status;
289
290     status = statsd_parse_line (ptr);
291     if (status != 0)
292       ERROR ("statsd plugin: Unable to parse line: \"%s\"", line_orig);
293
294     sfree (line_orig);
295   }
296 } /* }}} void statsd_parse_buffer */
297
298 static void statsd_network_read (int fd) /* {{{ */
299 {
300   char buffer[4096];
301   size_t buffer_size;
302   ssize_t status;
303
304   status = recv (fd, buffer, sizeof (buffer), /* flags = */ MSG_DONTWAIT);
305   if (status < 0)
306   {
307     char errbuf[1024];
308
309     if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
310       return;
311
312     ERROR ("statsd plugin: recv(2) failed: %s",
313         sstrerror (errno, errbuf, sizeof (errbuf)));
314     return;
315   }
316
317   buffer_size = (size_t) status;
318   if (buffer_size >= sizeof (buffer))
319     buffer_size = sizeof (buffer) - 1;
320   buffer[buffer_size] = 0;
321
322   statsd_parse_buffer (buffer);
323 } /* }}} void statsd_network_read */
324
325 static int statsd_network_init (struct pollfd **ret_fds, /* {{{ */
326     size_t *ret_fds_num)
327 {
328   struct pollfd *fds = NULL;
329   size_t fds_num = 0;
330
331   struct addrinfo ai_hints;
332   struct addrinfo *ai_list = NULL;
333   struct addrinfo *ai_ptr;
334   int status;
335
336   char const *node = (conf_node != NULL) ? conf_node : STATSD_DEFAULT_NODE;
337   char const *service = (conf_service != NULL)
338     ? conf_service : STATSD_DEFAULT_SERVICE;
339
340   memset (&ai_hints, 0, sizeof (ai_hints));
341   ai_hints.ai_flags = AI_PASSIVE;
342 #ifdef AI_ADDRCONFIG
343   ai_hints.ai_flags |= AI_ADDRCONFIG;
344 #endif
345   ai_hints.ai_family = AF_UNSPEC;
346   ai_hints.ai_socktype = SOCK_DGRAM;
347
348   status = getaddrinfo (node, service, &ai_hints, &ai_list);
349   if (status != 0)
350   {
351     ERROR ("statsd plugin: getaddrinfo (\"%s\", \"%s\") failed: %s",
352         node, service, gai_strerror (status));
353     return (status);
354   }
355
356   for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
357   {
358     int fd;
359     struct pollfd *tmp;
360
361     char dbg_node[NI_MAXHOST];
362     char dbg_service[NI_MAXSERV];
363
364     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
365     if (fd < 0)
366     {
367       char errbuf[1024];
368       ERROR ("statsd plugin: socket(2) failed: %s",
369           sstrerror (errno, errbuf, sizeof (errbuf)));
370       continue;
371     }
372
373     getnameinfo (ai_ptr->ai_addr, ai_ptr->ai_addrlen,
374         dbg_node, sizeof (dbg_node), dbg_service, sizeof (dbg_service),
375         NI_DGRAM | NI_NUMERICHOST | NI_NUMERICSERV);
376     DEBUG ("statsd plugin: Trying to bind to [%s]:%s ...", dbg_node, dbg_service);
377
378     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
379     if (status != 0)
380     {
381       char errbuf[1024];
382       ERROR ("statsd plugin: bind(2) failed: %s",
383           sstrerror (errno, errbuf, sizeof (errbuf)));
384       close (fd);
385       continue;
386     }
387
388     tmp = realloc (fds, sizeof (*fds) * (fds_num + 1));
389     if (tmp == NULL)
390     {
391       ERROR ("statsd plugin: realloc failed.");
392       continue;
393     }
394     fds = tmp;
395     tmp = fds + fds_num;
396     fds_num++;
397
398     memset (tmp, 0, sizeof (*tmp));
399     tmp->fd = fd;
400     tmp->events = POLLIN | POLLPRI;
401   }
402
403   freeaddrinfo (ai_list);
404
405   if (fds_num == 0)
406   {
407     ERROR ("statsd plugin: Unable to create listening socket for [%s]:%s.",
408         (node != NULL) ? node : "::", service);
409     return (ENOENT);
410   }
411
412   *ret_fds = fds;
413   *ret_fds_num = fds_num;
414   return (0);
415 } /* }}} int statsd_network_init */
416
417 static void *statsd_network_thread (void *args) /* {{{ */
418 {
419   struct pollfd *fds = NULL;
420   size_t fds_num = 0;
421   int status;
422   size_t i;
423
424   status = statsd_network_init (&fds, &fds_num);
425   if (status != 0)
426   {
427     ERROR ("statsd plugin: Unable to open listening sockets.");
428     pthread_exit ((void *) 0);
429   }
430
431   while (!network_thread_shutdown)
432   {
433     status = poll (fds, (nfds_t) fds_num, /* timeout = */ -1);
434     if (status < 0)
435     {
436       char errbuf[1024];
437
438       if ((errno == EINTR) || (errno == EAGAIN))
439         continue;
440
441       ERROR ("statsd plugin: poll(2) failed: %s",
442           sstrerror (errno, errbuf, sizeof (errbuf)));
443       break;
444     }
445
446     for (i = 0; i < fds_num; i++)
447     {
448       if ((fds[i].revents & (POLLIN | POLLPRI)) == 0)
449         continue;
450
451       statsd_network_read (fds[i].fd);
452       fds[i].revents = 0;
453     }
454   } /* while (!network_thread_shutdown) */
455
456   /* Clean up */
457   for (i = 0; i < fds_num; i++)
458     close (fds[i].fd);
459   sfree (fds);
460
461   return ((void *) 0);
462 } /* }}} void *statsd_network_thread */
463
464 static int statsd_config (oconfig_item_t *ci) /* {{{ */
465 {
466   int i;
467
468   for (i = 0; i < ci->children_num; i++)
469   {
470     oconfig_item_t *child = ci->children + i;
471
472     if (strcasecmp ("Host", child->key) == 0)
473       cf_util_get_string (child, &conf_node);
474     else if (strcasecmp ("Port", child->key) == 0)
475       cf_util_get_service (child, &conf_service);
476     else if (strcasecmp ("DeleteCounters", child->key) == 0)
477       cf_util_get_boolean (child, &conf_delete_counters);
478     else if (strcasecmp ("DeleteTimers", child->key) == 0)
479       cf_util_get_boolean (child, &conf_delete_timers);
480     else if (strcasecmp ("DeleteGauges", child->key) == 0)
481       cf_util_get_boolean (child, &conf_delete_gauges);
482     else
483       ERROR ("statsd plugin: The \"%s\" config option is not valid.",
484           child->key);
485   }
486
487   return (0);
488 } /* }}} int statsd_config */
489
490 static int statsd_init (void) /* {{{ */
491 {
492   pthread_mutex_lock (&metrics_lock);
493   if (metrics_tree == NULL)
494     metrics_tree = c_avl_create ((void *) strcasecmp);
495
496   if (!network_thread_running)
497   {
498     int status;
499
500     status = pthread_create (&network_thread,
501         /* attr = */ NULL,
502         statsd_network_thread,
503         /* args = */ NULL);
504     if (status != 0)
505     {
506       char errbuf[1024];
507       pthread_mutex_unlock (&metrics_lock);
508       ERROR ("statsd plugin: pthread_create failed: %s",
509           sstrerror (errno, errbuf, sizeof (errbuf)));
510       return (status);
511     }
512   }
513   network_thread_running = 1;
514
515   pthread_mutex_unlock (&metrics_lock);
516
517   return (0);
518 } /* }}} int statsd_init */
519
520 static int statsd_metric_submit (char const *name, /* {{{ */
521     statsd_metric_t const *metric)
522 {
523   value_t values[1];
524   value_list_t vl = VALUE_LIST_INIT;
525
526   if (metric->type == STATSD_GAUGE)
527     values[0].gauge = (gauge_t) metric->value;
528   else if (metric->type == STATSD_TIMER)
529   {
530     if (metric->updates_num == 0)
531       values[0].gauge = NAN;
532     else
533       values[0].gauge =
534         ((gauge_t) metric->value) / ((gauge_t) metric->updates_num);
535   }
536   else
537     values[0].derive = (derive_t) metric->value;
538
539   vl.values = values;
540   vl.values_len = 1;
541   sstrncpy (vl.host, hostname_g, sizeof (vl.host));
542   sstrncpy (vl.plugin, "statsd", sizeof (vl.plugin));
543
544   if (metric->type == STATSD_GAUGE)
545     sstrncpy (vl.type, "gauge", sizeof (vl.type));
546   else if (metric->type == STATSD_TIMER)
547     sstrncpy (vl.type, "latency", sizeof (vl.type));
548   else /* if (metric->type == STATSD_COUNTER) */
549     sstrncpy (vl.type, "derive", sizeof (vl.type));
550
551   sstrncpy (vl.type_instance, name, sizeof (vl.type_instance));
552
553   return (plugin_dispatch_values (&vl));
554 } /* }}} int statsd_metric_submit */
555
556 static int statsd_read (void) /* {{{ */
557 {
558   c_avl_iterator_t *iter;
559   char *name;
560   statsd_metric_t *metric;
561
562   char **to_be_deleted = NULL;
563   size_t to_be_deleted_num = 0;
564   size_t i;
565
566   pthread_mutex_lock (&metrics_lock);
567
568   if (metrics_tree == NULL)
569   {
570     pthread_mutex_unlock (&metrics_lock);
571     return (0);
572   }
573
574   iter = c_avl_get_iterator (metrics_tree);
575   while (c_avl_iterator_next (iter, (void *) &name, (void *) &metric) == 0)
576   {
577     if ((metric->updates_num == 0)
578         && ((conf_delete_counters && (metric->type == STATSD_COUNTER))
579           || (conf_delete_timers && (metric->type == STATSD_TIMER))
580           || (conf_delete_gauges && (metric->type == STATSD_GAUGE))))
581     {
582       DEBUG ("statsd plugin: Deleting metric \"%s\".", name);
583       strarray_add (&to_be_deleted, &to_be_deleted_num, name);
584       continue;
585     }
586
587     statsd_metric_submit (name, metric);
588     metric->updates_num = 0;
589   }
590   c_avl_iterator_destroy (iter);
591
592   for (i = 0; i < to_be_deleted_num; i++)
593   {
594     int status;
595
596     status = c_avl_remove (metrics_tree, to_be_deleted[i],
597         (void *) &name, (void *) &metric);
598     if (status != 0)
599     {
600       ERROR ("stats plugin: c_avl_remove (\"%s\") failed with status %i.",
601           to_be_deleted[i], status);
602       continue;
603     }
604
605     sfree (name);
606     sfree (metric);
607   }
608
609   pthread_mutex_unlock (&metrics_lock);
610
611   strarray_free (to_be_deleted, to_be_deleted_num);
612
613   return (0);
614 } /* }}} int statsd_read */
615
616 static int statsd_shutdown (void) /* {{{ */
617 {
618   void *key;
619   void *value;
620
621   pthread_mutex_lock (&metrics_lock);
622
623   if (network_thread_running)
624   {
625     network_thread_shutdown = 1;
626     pthread_kill (network_thread, SIGTERM);
627     pthread_join (network_thread, /* retval = */ NULL);
628   }
629   network_thread_running = 0;
630
631   while (c_avl_pick (metrics_tree, &key, &value) == 0)
632   {
633     sfree (key);
634     sfree (value);
635   }
636   c_avl_destroy (metrics_tree);
637   metrics_tree = NULL;
638
639   sfree (conf_node);
640   sfree (conf_service);
641
642   pthread_mutex_unlock (&metrics_lock);
643
644   return (0);
645 } /* }}} int statsd_shutdown */
646
647 void module_register (void)
648 {
649   plugin_register_complex_config ("statsd", statsd_config);
650   plugin_register_init ("statsd", statsd_init);
651   plugin_register_read ("statsd", statsd_read);
652   plugin_register_shutdown ("statsd", statsd_shutdown);
653 }
654
655 /* vim: set sw=2 sts=2 et fdm=marker : */