Merge branch 'ff/statsd'
[collectd.git] / src / statsd.c
1 /**
2  * collectd - src/statsd.c
3  *
4  * Copyright (C) 2013       Florian octo Forster
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF MIND, USE, DATA OR PROFITS, WHETHER
15  * IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING
16  * OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  *
18  * Authors:
19  *   Florian octo Forster <octo at collectd.org>
20  */
21
22 #include "collectd.h"
23 #include "plugin.h"
24 #include "common.h"
25 #include "configfile.h"
26 #include "utils_avltree.h"
27 #include "utils_complain.h"
28 #include "utils_latency.h"
29
30 #include <pthread.h>
31
32 #include <sys/types.h>
33 #include <sys/socket.h>
34 #include <netdb.h>
35 #include <poll.h>
36
37 #ifndef STATSD_DEFAULT_NODE
38 # define STATSD_DEFAULT_NODE NULL
39 #endif
40
41 #ifndef STATSD_DEFAULT_SERVICE
42 # define STATSD_DEFAULT_SERVICE "8125"
43 #endif
44
45 enum metric_type_e
46 {
47   STATSD_COUNTER,
48   STATSD_TIMER,
49   STATSD_GAUGE,
50   STATSD_SET
51 };
52 typedef enum metric_type_e metric_type_t;
53
54 struct statsd_metric_s
55 {
56   metric_type_t type;
57   double value;
58   latency_counter_t *latency;
59   c_avl_tree_t *set;
60   unsigned long updates_num;
61 };
62 typedef struct statsd_metric_s statsd_metric_t;
63
64 static c_avl_tree_t   *metrics_tree = NULL;
65 static pthread_mutex_t metrics_lock = PTHREAD_MUTEX_INITIALIZER;
66
67 static pthread_t network_thread;
68 static _Bool     network_thread_running = 0;
69 static _Bool     network_thread_shutdown = 0;
70
71 static char *conf_node = NULL;
72 static char *conf_service = NULL;
73
74 static _Bool conf_delete_counters = 0;
75 static _Bool conf_delete_timers   = 0;
76 static _Bool conf_delete_gauges   = 0;
77 static _Bool conf_delete_sets     = 0;
78
79 static double *conf_timer_percentile = NULL;
80 static size_t  conf_timer_percentile_num = 0;
81
82 /* Must hold metrics_lock when calling this function. */
83 static statsd_metric_t *statsd_metric_lookup_unsafe (char const *name, /* {{{ */
84     metric_type_t type)
85 {
86   char key[DATA_MAX_NAME_LEN + 2];
87   char *key_copy;
88   statsd_metric_t *metric;
89   int status;
90
91   switch (type)
92   {
93     case STATSD_COUNTER: key[0] = 'c'; break;
94     case STATSD_TIMER:   key[0] = 't'; break;
95     case STATSD_GAUGE:   key[0] = 'g'; break;
96     case STATSD_SET:     key[0] = 's'; break;
97     default: return (NULL);
98   }
99
100   key[1] = ':';
101   sstrncpy (&key[2], name, sizeof (key) - 2);
102
103   status = c_avl_get (metrics_tree, key, (void *) &metric);
104   if (status == 0)
105     return (metric);
106
107   key_copy = strdup (key);
108   if (key_copy == NULL)
109   {
110     ERROR ("statsd plugin: strdup failed.");
111     return (NULL);
112   }
113
114   metric = malloc (sizeof (*metric));
115   if (metric == NULL)
116   {
117     ERROR ("statsd plugin: malloc failed.");
118     sfree (key_copy);
119     return (NULL);
120   }
121   memset (metric, 0, sizeof (*metric));
122
123   metric->type = type;
124   metric->latency = NULL;
125   metric->set = NULL;
126
127   status = c_avl_insert (metrics_tree, key_copy, metric);
128   if (status != 0)
129   {
130     ERROR ("statsd plugin: c_avl_insert failed.");
131     sfree (key_copy);
132     sfree (metric);
133     return (NULL);
134   }
135
136   return (metric);
137 } /* }}} statsd_metric_lookup_unsafe */
138
139 static int statsd_metric_set (char const *name, double value, /* {{{ */
140     metric_type_t type)
141 {
142   statsd_metric_t *metric;
143
144   pthread_mutex_lock (&metrics_lock);
145
146   metric = statsd_metric_lookup_unsafe (name, type);
147   if (metric == NULL)
148   {
149     pthread_mutex_unlock (&metrics_lock);
150     return (-1);
151   }
152
153   metric->value = value;
154   metric->updates_num++;
155
156   pthread_mutex_unlock (&metrics_lock);
157
158   return (0);
159 } /* }}} int statsd_metric_set */
160
161 static int statsd_metric_add (char const *name, double delta, /* {{{ */
162     metric_type_t type)
163 {
164   statsd_metric_t *metric;
165
166   pthread_mutex_lock (&metrics_lock);
167
168   metric = statsd_metric_lookup_unsafe (name, type);
169   if (metric == NULL)
170   {
171     pthread_mutex_unlock (&metrics_lock);
172     return (-1);
173   }
174
175   metric->value += delta;
176   metric->updates_num++;
177
178   pthread_mutex_unlock (&metrics_lock);
179
180   return (0);
181 } /* }}} int statsd_metric_add */
182
183 static int statsd_parse_value (char const *str, value_t *ret_value) /* {{{ */
184 {
185   char *endptr = NULL;
186
187   ret_value->gauge = (gauge_t) strtod (str, &endptr);
188   if ((str == endptr) || ((endptr != NULL) && (*endptr != 0)))
189     return (-1);
190
191   return (0);
192 } /* }}} int statsd_parse_value */
193
194 static int statsd_handle_counter (char const *name, /* {{{ */
195     char const *value_str,
196     char const *extra)
197 {
198   value_t value;
199   value_t scale;
200   int status;
201
202   if ((extra != NULL) && (extra[0] != '@'))
203     return (-1);
204
205   scale.gauge = 1.0;
206   if (extra != NULL)
207   {
208     status = statsd_parse_value (extra + 1, &scale);
209     if (status != 0)
210       return (status);
211
212     if (!isfinite (scale.gauge) || (scale.gauge <= 0.0) || (scale.gauge > 1.0))
213       return (-1);
214   }
215
216   value.gauge = 1.0;
217   status = statsd_parse_value (value_str, &value);
218   if (status != 0)
219     return (status);
220
221   return (statsd_metric_add (name, (double) (value.gauge / scale.gauge),
222         STATSD_COUNTER));
223 } /* }}} int statsd_handle_counter */
224
225 static int statsd_handle_gauge (char const *name, /* {{{ */
226     char const *value_str)
227 {
228   value_t value;
229   int status;
230
231   value.gauge = 0;
232   status = statsd_parse_value (value_str, &value);
233   if (status != 0)
234     return (status);
235
236   if ((value_str[0] == '+') || (value_str[0] == '-'))
237     return (statsd_metric_add (name, (double) value.gauge, STATSD_GAUGE));
238   else
239     return (statsd_metric_set (name, (double) value.gauge, STATSD_GAUGE));
240 } /* }}} int statsd_handle_gauge */
241
242 static int statsd_handle_timer (char const *name, /* {{{ */
243     char const *value_str)
244 {
245   statsd_metric_t *metric;
246   value_t value_ms;
247   cdtime_t value;
248   int status;
249
250   value_ms.derive = 0;
251   status = statsd_parse_value (value_str, &value_ms);
252   if (status != 0)
253     return (status);
254
255   value = MS_TO_CDTIME_T (value_ms.gauge);
256
257   pthread_mutex_lock (&metrics_lock);
258
259   metric = statsd_metric_lookup_unsafe (name, STATSD_TIMER);
260   if (metric == NULL)
261   {
262     pthread_mutex_unlock (&metrics_lock);
263     return (-1);
264   }
265
266   if (metric->latency == NULL)
267     metric->latency = latency_counter_create ();
268   if (metric->latency == NULL)
269   {
270     pthread_mutex_unlock (&metrics_lock);
271     return (-1);
272   }
273
274   latency_counter_add (metric->latency, value);
275   metric->updates_num++;
276
277   pthread_mutex_unlock (&metrics_lock);
278   return (0);
279 } /* }}} int statsd_handle_timer */
280
281 static int statsd_handle_set (char const *name, /* {{{ */
282     char const *set_key_orig)
283 {
284   statsd_metric_t *metric = NULL;
285   char *set_key;
286   int status;
287
288   pthread_mutex_lock (&metrics_lock);
289
290   metric = statsd_metric_lookup_unsafe (name, STATSD_SET);
291   if (metric == NULL)
292   {
293     pthread_mutex_unlock (&metrics_lock);
294     return (-1);
295   }
296
297   /* Make sure metric->set exists. */
298   if (metric->set == NULL)
299     metric->set = c_avl_create ((void *) strcmp);
300
301   if (metric->set == NULL)
302   {
303     pthread_mutex_unlock (&metrics_lock);
304     ERROR ("statsd plugin: c_avl_create failed.");
305     return (-1);
306   }
307
308   set_key = strdup (set_key_orig);
309   if (set_key == NULL)
310   {
311     pthread_mutex_unlock (&metrics_lock);
312     ERROR ("statsd plugin: strdup failed.");
313     return (-1);
314   }
315
316   status = c_avl_insert (metric->set, set_key, /* value = */ NULL);
317   if (status < 0)
318   {
319     pthread_mutex_unlock (&metrics_lock);
320     if (status < 0)
321       ERROR ("statsd plugin: c_avl_insert (\"%s\") failed with status %i.",
322           set_key, status);
323     sfree (set_key);
324     return (-1);
325   }
326   else if (status > 0) /* key already exists */
327   {
328     sfree (set_key);
329   }
330
331   metric->updates_num++;
332
333   pthread_mutex_unlock (&metrics_lock);
334   return (0);
335 } /* }}} int statsd_handle_set */
336
337 static int statsd_parse_line (char *buffer) /* {{{ */
338 {
339   char *name = buffer;
340   char *value;
341   char *type;
342   char *extra;
343
344   type = strchr (name, '|');
345   if (type == NULL)
346     return (-1);
347   *type = 0;
348   type++;
349
350   value = strrchr (name, ':');
351   if (value == NULL)
352     return (-1);
353   *value = 0;
354   value++;
355
356   extra = strchr (type, '|');
357   if (extra != NULL)
358   {
359     *extra = 0;
360     extra++;
361   }
362
363   if (strcmp ("c", type) == 0)
364     return (statsd_handle_counter (name, value, extra));
365
366   /* extra is only valid for counters */
367   if (extra != NULL)
368     return (-1);
369
370   if (strcmp ("g", type) == 0)
371     return (statsd_handle_gauge (name, value));
372   else if (strcmp ("ms", type) == 0)
373     return (statsd_handle_timer (name, value));
374   else if (strcmp ("s", type) == 0)
375     return (statsd_handle_set (name, value));
376   else
377     return (-1);
378 } /* }}} void statsd_parse_line */
379
380 static void statsd_parse_buffer (char *buffer) /* {{{ */
381 {
382   while (buffer != NULL)
383   {
384     char orig[64];
385     char *next;
386     int status;
387
388     next = strchr (buffer, '\n');
389     if (next != NULL)
390     {
391       *next = 0;
392       next++;
393     }
394
395     if (*buffer == 0)
396     {
397       buffer = next;
398       continue;
399     }
400
401     sstrncpy (orig, buffer, sizeof (orig));
402
403     status = statsd_parse_line (buffer);
404     if (status != 0)
405       ERROR ("statsd plugin: Unable to parse line: \"%s\"", orig);
406
407     buffer = next;
408   }
409 } /* }}} void statsd_parse_buffer */
410
411 static void statsd_network_read (int fd) /* {{{ */
412 {
413   char buffer[4096];
414   size_t buffer_size;
415   ssize_t status;
416
417   status = recv (fd, buffer, sizeof (buffer), /* flags = */ MSG_DONTWAIT);
418   if (status < 0)
419   {
420     char errbuf[1024];
421
422     if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
423       return;
424
425     ERROR ("statsd plugin: recv(2) failed: %s",
426         sstrerror (errno, errbuf, sizeof (errbuf)));
427     return;
428   }
429
430   buffer_size = (size_t) status;
431   if (buffer_size >= sizeof (buffer))
432     buffer_size = sizeof (buffer) - 1;
433   buffer[buffer_size] = 0;
434
435   statsd_parse_buffer (buffer);
436 } /* }}} void statsd_network_read */
437
438 static int statsd_network_init (struct pollfd **ret_fds, /* {{{ */
439     size_t *ret_fds_num)
440 {
441   struct pollfd *fds = NULL;
442   size_t fds_num = 0;
443
444   struct addrinfo ai_hints;
445   struct addrinfo *ai_list = NULL;
446   struct addrinfo *ai_ptr;
447   int status;
448
449   char const *node = (conf_node != NULL) ? conf_node : STATSD_DEFAULT_NODE;
450   char const *service = (conf_service != NULL)
451     ? conf_service : STATSD_DEFAULT_SERVICE;
452
453   memset (&ai_hints, 0, sizeof (ai_hints));
454   ai_hints.ai_flags = AI_PASSIVE;
455 #ifdef AI_ADDRCONFIG
456   ai_hints.ai_flags |= AI_ADDRCONFIG;
457 #endif
458   ai_hints.ai_family = AF_UNSPEC;
459   ai_hints.ai_socktype = SOCK_DGRAM;
460
461   status = getaddrinfo (node, service, &ai_hints, &ai_list);
462   if (status != 0)
463   {
464     ERROR ("statsd plugin: getaddrinfo (\"%s\", \"%s\") failed: %s",
465         node, service, gai_strerror (status));
466     return (status);
467   }
468
469   for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
470   {
471     int fd;
472     struct pollfd *tmp;
473
474     char dbg_node[NI_MAXHOST];
475     char dbg_service[NI_MAXSERV];
476
477     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
478     if (fd < 0)
479     {
480       char errbuf[1024];
481       ERROR ("statsd plugin: socket(2) failed: %s",
482           sstrerror (errno, errbuf, sizeof (errbuf)));
483       continue;
484     }
485
486     getnameinfo (ai_ptr->ai_addr, ai_ptr->ai_addrlen,
487         dbg_node, sizeof (dbg_node), dbg_service, sizeof (dbg_service),
488         NI_DGRAM | NI_NUMERICHOST | NI_NUMERICSERV);
489     DEBUG ("statsd plugin: Trying to bind to [%s]:%s ...", dbg_node, dbg_service);
490
491     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
492     if (status != 0)
493     {
494       char errbuf[1024];
495       ERROR ("statsd plugin: bind(2) failed: %s",
496           sstrerror (errno, errbuf, sizeof (errbuf)));
497       close (fd);
498       continue;
499     }
500
501     tmp = realloc (fds, sizeof (*fds) * (fds_num + 1));
502     if (tmp == NULL)
503     {
504       ERROR ("statsd plugin: realloc failed.");
505       continue;
506     }
507     fds = tmp;
508     tmp = fds + fds_num;
509     fds_num++;
510
511     memset (tmp, 0, sizeof (*tmp));
512     tmp->fd = fd;
513     tmp->events = POLLIN | POLLPRI;
514   }
515
516   freeaddrinfo (ai_list);
517
518   if (fds_num == 0)
519   {
520     ERROR ("statsd plugin: Unable to create listening socket for [%s]:%s.",
521         (node != NULL) ? node : "::", service);
522     return (ENOENT);
523   }
524
525   *ret_fds = fds;
526   *ret_fds_num = fds_num;
527   return (0);
528 } /* }}} int statsd_network_init */
529
530 static void *statsd_network_thread (void *args) /* {{{ */
531 {
532   struct pollfd *fds = NULL;
533   size_t fds_num = 0;
534   int status;
535   size_t i;
536
537   status = statsd_network_init (&fds, &fds_num);
538   if (status != 0)
539   {
540     ERROR ("statsd plugin: Unable to open listening sockets.");
541     pthread_exit ((void *) 0);
542   }
543
544   while (!network_thread_shutdown)
545   {
546     status = poll (fds, (nfds_t) fds_num, /* timeout = */ -1);
547     if (status < 0)
548     {
549       char errbuf[1024];
550
551       if ((errno == EINTR) || (errno == EAGAIN))
552         continue;
553
554       ERROR ("statsd plugin: poll(2) failed: %s",
555           sstrerror (errno, errbuf, sizeof (errbuf)));
556       break;
557     }
558
559     for (i = 0; i < fds_num; i++)
560     {
561       if ((fds[i].revents & (POLLIN | POLLPRI)) == 0)
562         continue;
563
564       statsd_network_read (fds[i].fd);
565       fds[i].revents = 0;
566     }
567   } /* while (!network_thread_shutdown) */
568
569   /* Clean up */
570   for (i = 0; i < fds_num; i++)
571     close (fds[i].fd);
572   sfree (fds);
573
574   return ((void *) 0);
575 } /* }}} void *statsd_network_thread */
576
577 static int statsd_config_timer_percentile (oconfig_item_t *ci) /* {{{ */
578 {
579   double percent = NAN;
580   double *tmp;
581   int status;
582
583   status = cf_util_get_double (ci, &percent);
584   if (status != 0)
585     return (status);
586
587   if ((percent <= 0.0) || (percent >= 100))
588   {
589     ERROR ("statsd plugin: The value for \"%s\" must be between 0 and 100, "
590         "exclusively.", ci->key);
591     return (ERANGE);
592   }
593
594   tmp = realloc (conf_timer_percentile,
595       sizeof (*conf_timer_percentile) * (conf_timer_percentile_num + 1));
596   if (tmp == NULL)
597   {
598     ERROR ("statsd plugin: realloc failed.");
599     return (ENOMEM);
600   }
601   conf_timer_percentile = tmp;
602   conf_timer_percentile[conf_timer_percentile_num] = percent;
603   conf_timer_percentile_num++;
604
605   return (0);
606 } /* }}} int statsd_config_timer_percentile */
607
608 static int statsd_config (oconfig_item_t *ci) /* {{{ */
609 {
610   int i;
611
612   for (i = 0; i < ci->children_num; i++)
613   {
614     oconfig_item_t *child = ci->children + i;
615
616     if (strcasecmp ("Host", child->key) == 0)
617       cf_util_get_string (child, &conf_node);
618     else if (strcasecmp ("Port", child->key) == 0)
619       cf_util_get_service (child, &conf_service);
620     else if (strcasecmp ("DeleteCounters", child->key) == 0)
621       cf_util_get_boolean (child, &conf_delete_counters);
622     else if (strcasecmp ("DeleteTimers", child->key) == 0)
623       cf_util_get_boolean (child, &conf_delete_timers);
624     else if (strcasecmp ("DeleteGauges", child->key) == 0)
625       cf_util_get_boolean (child, &conf_delete_gauges);
626     else if (strcasecmp ("DeleteSets", child->key) == 0)
627       cf_util_get_boolean (child, &conf_delete_sets);
628     else if (strcasecmp ("TimerPercentile", child->key) == 0)
629       statsd_config_timer_percentile (child);
630     else
631       ERROR ("statsd plugin: The \"%s\" config option is not valid.",
632           child->key);
633   }
634
635   return (0);
636 } /* }}} int statsd_config */
637
638 static int statsd_init (void) /* {{{ */
639 {
640   pthread_mutex_lock (&metrics_lock);
641   if (metrics_tree == NULL)
642     metrics_tree = c_avl_create ((void *) strcmp);
643
644   if (!network_thread_running)
645   {
646     int status;
647
648     status = pthread_create (&network_thread,
649         /* attr = */ NULL,
650         statsd_network_thread,
651         /* args = */ NULL);
652     if (status != 0)
653     {
654       char errbuf[1024];
655       pthread_mutex_unlock (&metrics_lock);
656       ERROR ("statsd plugin: pthread_create failed: %s",
657           sstrerror (errno, errbuf, sizeof (errbuf)));
658       return (status);
659     }
660   }
661   network_thread_running = 1;
662
663   pthread_mutex_unlock (&metrics_lock);
664
665   return (0);
666 } /* }}} int statsd_init */
667
668 /* Must hold metrics_lock when calling this function. */
669 static int statsd_metric_clear_set_unsafe (statsd_metric_t *metric) /* {{{ */
670 {
671   void *key;
672   void *value;
673
674   if ((metric == NULL) || (metric->type != STATSD_SET))
675     return (EINVAL);
676
677   if (metric->set == NULL)
678     return (0);
679
680   while (c_avl_pick (metric->set, &key, &value) == 0)
681   {
682     sfree (key);
683     sfree (value);
684   }
685
686   return (0);
687 } /* }}} int statsd_metric_clear_set_unsafe */
688
689 /* Must hold metrics_lock when calling this function. */
690 static int statsd_metric_submit_unsafe (char const *name, /* {{{ */
691     statsd_metric_t const *metric)
692 {
693   value_t values[1];
694   value_list_t vl = VALUE_LIST_INIT;
695
696   vl.values = values;
697   vl.values_len = 1;
698   sstrncpy (vl.host, hostname_g, sizeof (vl.host));
699   sstrncpy (vl.plugin, "statsd", sizeof (vl.plugin));
700
701   if (metric->type == STATSD_GAUGE)
702     sstrncpy (vl.type, "gauge", sizeof (vl.type));
703   else if (metric->type == STATSD_TIMER)
704     sstrncpy (vl.type, "latency", sizeof (vl.type));
705   else if (metric->type == STATSD_SET)
706     sstrncpy (vl.type, "objects", sizeof (vl.type));
707   else /* if (metric->type == STATSD_COUNTER) */
708     sstrncpy (vl.type, "derive", sizeof (vl.type));
709
710   sstrncpy (vl.type_instance, name, sizeof (vl.type_instance));
711
712   if (metric->type == STATSD_GAUGE)
713     values[0].gauge = (gauge_t) metric->value;
714   else if (metric->type == STATSD_TIMER)
715   {
716     size_t i;
717
718     if (metric->updates_num == 0)
719       return (0);
720
721     vl.time = cdtime ();
722
723     ssnprintf (vl.type_instance, sizeof (vl.type_instance),
724         "%s-average", name);
725     values[0].gauge = CDTIME_T_TO_DOUBLE (
726         latency_counter_get_average (metric->latency));
727     plugin_dispatch_values (&vl);
728
729     for (i = 0; i < conf_timer_percentile_num; i++)
730     {
731       ssnprintf (vl.type_instance, sizeof (vl.type_instance),
732           "%s-percentile-%.0f", name, conf_timer_percentile[i]);
733       values[0].gauge = CDTIME_T_TO_DOUBLE (
734           latency_counter_get_percentile (
735             metric->latency, conf_timer_percentile[i]));
736       plugin_dispatch_values (&vl);
737     }
738
739     latency_counter_reset (metric->latency);
740     return (0);
741   }
742   else if (metric->type == STATSD_SET)
743   {
744     if (metric->set == NULL)
745       values[0].gauge = 0.0;
746     else
747       values[0].gauge = (gauge_t) c_avl_size (metric->set);
748   }
749   else
750     values[0].derive = (derive_t) metric->value;
751
752   return (plugin_dispatch_values (&vl));
753 } /* }}} int statsd_metric_submit_unsafe */
754
755 static int statsd_read (void) /* {{{ */
756 {
757   c_avl_iterator_t *iter;
758   char *name;
759   statsd_metric_t *metric;
760
761   char **to_be_deleted = NULL;
762   size_t to_be_deleted_num = 0;
763   size_t i;
764
765   pthread_mutex_lock (&metrics_lock);
766
767   if (metrics_tree == NULL)
768   {
769     pthread_mutex_unlock (&metrics_lock);
770     return (0);
771   }
772
773   iter = c_avl_get_iterator (metrics_tree);
774   while (c_avl_iterator_next (iter, (void *) &name, (void *) &metric) == 0)
775   {
776     if ((metric->updates_num == 0)
777         && ((conf_delete_counters && (metric->type == STATSD_COUNTER))
778           || (conf_delete_timers && (metric->type == STATSD_TIMER))
779           || (conf_delete_gauges && (metric->type == STATSD_GAUGE))
780           || (conf_delete_sets && (metric->type == STATSD_SET))))
781     {
782       DEBUG ("statsd plugin: Deleting metric \"%s\".", name);
783       strarray_add (&to_be_deleted, &to_be_deleted_num, name);
784       continue;
785     }
786
787     /* Names have a prefix, e.g. "c:", which determines the (statsd) type.
788      * Remove this here. */
789     statsd_metric_submit_unsafe (name + 2, metric);
790
791     /* Reset the metric. */
792     metric->updates_num = 0;
793     if (metric->type == STATSD_SET)
794       statsd_metric_clear_set_unsafe (metric);
795   }
796   c_avl_iterator_destroy (iter);
797
798   for (i = 0; i < to_be_deleted_num; i++)
799   {
800     int status;
801
802     status = c_avl_remove (metrics_tree, to_be_deleted[i],
803         (void *) &name, (void *) &metric);
804     if (status != 0)
805     {
806       ERROR ("stats plugin: c_avl_remove (\"%s\") failed with status %i.",
807           to_be_deleted[i], status);
808       continue;
809     }
810
811     sfree (name);
812     sfree (metric);
813   }
814
815   pthread_mutex_unlock (&metrics_lock);
816
817   strarray_free (to_be_deleted, to_be_deleted_num);
818
819   return (0);
820 } /* }}} int statsd_read */
821
822 static int statsd_shutdown (void) /* {{{ */
823 {
824   void *key;
825   void *value;
826
827   pthread_mutex_lock (&metrics_lock);
828
829   if (network_thread_running)
830   {
831     network_thread_shutdown = 1;
832     pthread_kill (network_thread, SIGTERM);
833     pthread_join (network_thread, /* retval = */ NULL);
834   }
835   network_thread_running = 0;
836
837   while (c_avl_pick (metrics_tree, &key, &value) == 0)
838   {
839     sfree (key);
840     sfree (value);
841   }
842   c_avl_destroy (metrics_tree);
843   metrics_tree = NULL;
844
845   sfree (conf_node);
846   sfree (conf_service);
847
848   pthread_mutex_unlock (&metrics_lock);
849
850   return (0);
851 } /* }}} int statsd_shutdown */
852
853 void module_register (void)
854 {
855   plugin_register_complex_config ("statsd", statsd_config);
856   plugin_register_init ("statsd", statsd_init);
857   plugin_register_read ("statsd", statsd_read);
858   plugin_register_shutdown ("statsd", statsd_shutdown);
859 }
860
861 /* vim: set sw=2 sts=2 et fdm=marker : */