Merge branch 'collectd-5.7' into collectd-5.8
[collectd.git] / src / statsd.c
1 /**
2  * collectd - src/statsd.c
3  * Copyright (C) 2013       Florian octo Forster
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Florian octo Forster <octo at collectd.org>
25  */
26
27 #include "collectd.h"
28
29 #include "common.h"
30 #include "plugin.h"
31 #include "utils_avltree.h"
32 #include "utils_latency.h"
33
34 #include <netdb.h>
35 #include <poll.h>
36 #include <sys/types.h>
37
38 /* AIX doesn't have MSG_DONTWAIT */
39 #ifndef MSG_DONTWAIT
40 #define MSG_DONTWAIT MSG_NONBLOCK
41 #endif
42
43 #ifndef STATSD_DEFAULT_NODE
44 #define STATSD_DEFAULT_NODE NULL
45 #endif
46
47 #ifndef STATSD_DEFAULT_SERVICE
48 #define STATSD_DEFAULT_SERVICE "8125"
49 #endif
50
51 enum metric_type_e { STATSD_COUNTER, STATSD_TIMER, STATSD_GAUGE, STATSD_SET };
52 typedef enum metric_type_e metric_type_t;
53
54 struct statsd_metric_s {
55   metric_type_t type;
56   double value;
57   derive_t counter;
58   latency_counter_t *latency;
59   c_avl_tree_t *set;
60   unsigned long updates_num;
61 };
62 typedef struct statsd_metric_s statsd_metric_t;
63
64 static c_avl_tree_t *metrics_tree = NULL;
65 static pthread_mutex_t metrics_lock = PTHREAD_MUTEX_INITIALIZER;
66
67 static pthread_t network_thread;
68 static _Bool network_thread_running = 0;
69 static _Bool network_thread_shutdown = 0;
70
71 static char *conf_node = NULL;
72 static char *conf_service = NULL;
73
74 static _Bool conf_delete_counters = 0;
75 static _Bool conf_delete_timers = 0;
76 static _Bool conf_delete_gauges = 0;
77 static _Bool conf_delete_sets = 0;
78
79 static double *conf_timer_percentile = NULL;
80 static size_t conf_timer_percentile_num = 0;
81
82 static _Bool conf_counter_sum = 0;
83 static _Bool conf_timer_lower = 0;
84 static _Bool conf_timer_upper = 0;
85 static _Bool conf_timer_sum = 0;
86 static _Bool conf_timer_count = 0;
87
88 /* Must hold metrics_lock when calling this function. */
89 static statsd_metric_t *statsd_metric_lookup_unsafe(char const *name, /* {{{ */
90                                                     metric_type_t type) {
91   char key[DATA_MAX_NAME_LEN + 2];
92   char *key_copy;
93   statsd_metric_t *metric;
94   int status;
95
96   switch (type) {
97   case STATSD_COUNTER:
98     key[0] = 'c';
99     break;
100   case STATSD_TIMER:
101     key[0] = 't';
102     break;
103   case STATSD_GAUGE:
104     key[0] = 'g';
105     break;
106   case STATSD_SET:
107     key[0] = 's';
108     break;
109   default:
110     return NULL;
111   }
112
113   key[1] = ':';
114   sstrncpy(&key[2], name, sizeof(key) - 2);
115
116   status = c_avl_get(metrics_tree, key, (void *)&metric);
117   if (status == 0)
118     return metric;
119
120   key_copy = strdup(key);
121   if (key_copy == NULL) {
122     ERROR("statsd plugin: strdup failed.");
123     return NULL;
124   }
125
126   metric = calloc(1, sizeof(*metric));
127   if (metric == NULL) {
128     ERROR("statsd plugin: calloc failed.");
129     sfree(key_copy);
130     return NULL;
131   }
132
133   metric->type = type;
134   metric->latency = NULL;
135   metric->set = NULL;
136
137   status = c_avl_insert(metrics_tree, key_copy, metric);
138   if (status != 0) {
139     ERROR("statsd plugin: c_avl_insert failed.");
140     sfree(key_copy);
141     sfree(metric);
142     return NULL;
143   }
144
145   return metric;
146 } /* }}} statsd_metric_lookup_unsafe */
147
148 static int statsd_metric_set(char const *name, double value, /* {{{ */
149                              metric_type_t type) {
150   statsd_metric_t *metric;
151
152   pthread_mutex_lock(&metrics_lock);
153
154   metric = statsd_metric_lookup_unsafe(name, type);
155   if (metric == NULL) {
156     pthread_mutex_unlock(&metrics_lock);
157     return -1;
158   }
159
160   metric->value = value;
161   metric->updates_num++;
162
163   pthread_mutex_unlock(&metrics_lock);
164
165   return 0;
166 } /* }}} int statsd_metric_set */
167
168 static int statsd_metric_add(char const *name, double delta, /* {{{ */
169                              metric_type_t type) {
170   statsd_metric_t *metric;
171
172   pthread_mutex_lock(&metrics_lock);
173
174   metric = statsd_metric_lookup_unsafe(name, type);
175   if (metric == NULL) {
176     pthread_mutex_unlock(&metrics_lock);
177     return -1;
178   }
179
180   metric->value += delta;
181   metric->updates_num++;
182
183   pthread_mutex_unlock(&metrics_lock);
184
185   return 0;
186 } /* }}} int statsd_metric_add */
187
188 static void statsd_metric_free(statsd_metric_t *metric) /* {{{ */
189 {
190   if (metric == NULL)
191     return;
192
193   if (metric->latency != NULL) {
194     latency_counter_destroy(metric->latency);
195     metric->latency = NULL;
196   }
197
198   if (metric->set != NULL) {
199     void *key;
200     void *value;
201
202     while (c_avl_pick(metric->set, &key, &value) == 0) {
203       sfree(key);
204       assert(value == NULL);
205     }
206
207     c_avl_destroy(metric->set);
208     metric->set = NULL;
209   }
210
211   sfree(metric);
212 } /* }}} void statsd_metric_free */
213
214 static int statsd_parse_value(char const *str, value_t *ret_value) /* {{{ */
215 {
216   char *endptr = NULL;
217
218   ret_value->gauge = (gauge_t)strtod(str, &endptr);
219   if ((str == endptr) || ((endptr != NULL) && (*endptr != 0)))
220     return -1;
221
222   return 0;
223 } /* }}} int statsd_parse_value */
224
225 static int statsd_handle_counter(char const *name, /* {{{ */
226                                  char const *value_str, char const *extra) {
227   value_t value;
228   value_t scale;
229   int status;
230
231   if ((extra != NULL) && (extra[0] != '@'))
232     return -1;
233
234   scale.gauge = 1.0;
235   if (extra != NULL) {
236     status = statsd_parse_value(extra + 1, &scale);
237     if (status != 0)
238       return status;
239
240     if (!isfinite(scale.gauge) || (scale.gauge <= 0.0) || (scale.gauge > 1.0))
241       return -1;
242   }
243
244   value.gauge = 1.0;
245   status = statsd_parse_value(value_str, &value);
246   if (status != 0)
247     return status;
248
249   /* Changes to the counter are added to (statsd_metric_t*)->value. ->counter is
250    * only updated in statsd_metric_submit_unsafe(). */
251   return statsd_metric_add(name, (double)(value.gauge / scale.gauge),
252                            STATSD_COUNTER);
253 } /* }}} int statsd_handle_counter */
254
255 static int statsd_handle_gauge(char const *name, /* {{{ */
256                                char const *value_str) {
257   value_t value;
258   int status;
259
260   value.gauge = 0;
261   status = statsd_parse_value(value_str, &value);
262   if (status != 0)
263     return status;
264
265   if ((value_str[0] == '+') || (value_str[0] == '-'))
266     return statsd_metric_add(name, (double)value.gauge, STATSD_GAUGE);
267   else
268     return statsd_metric_set(name, (double)value.gauge, STATSD_GAUGE);
269 } /* }}} int statsd_handle_gauge */
270
271 static int statsd_handle_timer(char const *name, /* {{{ */
272                                char const *value_str, char const *extra) {
273   statsd_metric_t *metric;
274   value_t value_ms;
275   value_t scale;
276   cdtime_t value;
277   int status;
278
279   if ((extra != NULL) && (extra[0] != '@'))
280     return -1;
281
282   scale.gauge = 1.0;
283   if (extra != NULL) {
284     status = statsd_parse_value(extra + 1, &scale);
285     if (status != 0)
286       return status;
287
288     if (!isfinite(scale.gauge) || (scale.gauge <= 0.0) || (scale.gauge > 1.0))
289       return -1;
290   }
291
292   value_ms.derive = 0;
293   status = statsd_parse_value(value_str, &value_ms);
294   if (status != 0)
295     return status;
296
297   value = MS_TO_CDTIME_T(value_ms.gauge / scale.gauge);
298
299   pthread_mutex_lock(&metrics_lock);
300
301   metric = statsd_metric_lookup_unsafe(name, STATSD_TIMER);
302   if (metric == NULL) {
303     pthread_mutex_unlock(&metrics_lock);
304     return -1;
305   }
306
307   if (metric->latency == NULL)
308     metric->latency = latency_counter_create();
309   if (metric->latency == NULL) {
310     pthread_mutex_unlock(&metrics_lock);
311     return -1;
312   }
313
314   latency_counter_add(metric->latency, value);
315   metric->updates_num++;
316
317   pthread_mutex_unlock(&metrics_lock);
318   return 0;
319 } /* }}} int statsd_handle_timer */
320
321 static int statsd_handle_set(char const *name, /* {{{ */
322                              char const *set_key_orig) {
323   statsd_metric_t *metric = NULL;
324   char *set_key;
325   int status;
326
327   pthread_mutex_lock(&metrics_lock);
328
329   metric = statsd_metric_lookup_unsafe(name, STATSD_SET);
330   if (metric == NULL) {
331     pthread_mutex_unlock(&metrics_lock);
332     return -1;
333   }
334
335   /* Make sure metric->set exists. */
336   if (metric->set == NULL)
337     metric->set = c_avl_create((int (*)(const void *, const void *))strcmp);
338
339   if (metric->set == NULL) {
340     pthread_mutex_unlock(&metrics_lock);
341     ERROR("statsd plugin: c_avl_create failed.");
342     return -1;
343   }
344
345   set_key = strdup(set_key_orig);
346   if (set_key == NULL) {
347     pthread_mutex_unlock(&metrics_lock);
348     ERROR("statsd plugin: strdup failed.");
349     return -1;
350   }
351
352   status = c_avl_insert(metric->set, set_key, /* value = */ NULL);
353   if (status < 0) {
354     pthread_mutex_unlock(&metrics_lock);
355     if (status < 0)
356       ERROR("statsd plugin: c_avl_insert (\"%s\") failed with status %i.",
357             set_key, status);
358     sfree(set_key);
359     return -1;
360   } else if (status > 0) /* key already exists */
361   {
362     sfree(set_key);
363   }
364
365   metric->updates_num++;
366
367   pthread_mutex_unlock(&metrics_lock);
368   return 0;
369 } /* }}} int statsd_handle_set */
370
371 static int statsd_parse_line(char *buffer) /* {{{ */
372 {
373   char *name = buffer;
374   char *value;
375   char *type;
376   char *extra;
377
378   type = strchr(name, '|');
379   if (type == NULL)
380     return -1;
381   *type = 0;
382   type++;
383
384   value = strrchr(name, ':');
385   if (value == NULL)
386     return -1;
387   *value = 0;
388   value++;
389
390   extra = strchr(type, '|');
391   if (extra != NULL) {
392     *extra = 0;
393     extra++;
394   }
395
396   if (strcmp("c", type) == 0)
397     return statsd_handle_counter(name, value, extra);
398   else if (strcmp("ms", type) == 0)
399     return statsd_handle_timer(name, value, extra);
400
401   /* extra is only valid for counters and timers */
402   if (extra != NULL)
403     return -1;
404
405   if (strcmp("g", type) == 0)
406     return statsd_handle_gauge(name, value);
407   else if (strcmp("s", type) == 0)
408     return statsd_handle_set(name, value);
409   else
410     return -1;
411 } /* }}} void statsd_parse_line */
412
413 static void statsd_parse_buffer(char *buffer) /* {{{ */
414 {
415   while (buffer != NULL) {
416     char orig[64];
417     char *next;
418     int status;
419
420     next = strchr(buffer, '\n');
421     if (next != NULL) {
422       *next = 0;
423       next++;
424     }
425
426     if (*buffer == 0) {
427       buffer = next;
428       continue;
429     }
430
431     sstrncpy(orig, buffer, sizeof(orig));
432
433     status = statsd_parse_line(buffer);
434     if (status != 0)
435       ERROR("statsd plugin: Unable to parse line: \"%s\"", orig);
436
437     buffer = next;
438   }
439 } /* }}} void statsd_parse_buffer */
440
441 static void statsd_network_read(int fd) /* {{{ */
442 {
443   char buffer[4096];
444   size_t buffer_size;
445   ssize_t status;
446
447   status = recv(fd, buffer, sizeof(buffer), /* flags = */ MSG_DONTWAIT);
448   if (status < 0) {
449     char errbuf[1024];
450
451     if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
452       return;
453
454     ERROR("statsd plugin: recv(2) failed: %s",
455           sstrerror(errno, errbuf, sizeof(errbuf)));
456     return;
457   }
458
459   buffer_size = (size_t)status;
460   if (buffer_size >= sizeof(buffer))
461     buffer_size = sizeof(buffer) - 1;
462   buffer[buffer_size] = 0;
463
464   statsd_parse_buffer(buffer);
465 } /* }}} void statsd_network_read */
466
467 static int statsd_network_init(struct pollfd **ret_fds, /* {{{ */
468                                size_t *ret_fds_num) {
469   struct pollfd *fds = NULL;
470   size_t fds_num = 0;
471
472   struct addrinfo *ai_list;
473   int status;
474
475   char const *node = (conf_node != NULL) ? conf_node : STATSD_DEFAULT_NODE;
476   char const *service =
477       (conf_service != NULL) ? conf_service : STATSD_DEFAULT_SERVICE;
478
479   struct addrinfo ai_hints = {.ai_family = AF_UNSPEC,
480                               .ai_flags = AI_PASSIVE | AI_ADDRCONFIG,
481                               .ai_socktype = SOCK_DGRAM};
482
483   status = getaddrinfo(node, service, &ai_hints, &ai_list);
484   if (status != 0) {
485     ERROR("statsd plugin: getaddrinfo (\"%s\", \"%s\") failed: %s", node,
486           service, gai_strerror(status));
487     return status;
488   }
489
490   for (struct addrinfo *ai_ptr = ai_list; ai_ptr != NULL;
491        ai_ptr = ai_ptr->ai_next) {
492     int fd;
493     struct pollfd *tmp;
494
495     char dbg_node[NI_MAXHOST];
496     char dbg_service[NI_MAXSERV];
497
498     fd = socket(ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
499     if (fd < 0) {
500       char errbuf[1024];
501       ERROR("statsd plugin: socket(2) failed: %s",
502             sstrerror(errno, errbuf, sizeof(errbuf)));
503       continue;
504     }
505
506     getnameinfo(ai_ptr->ai_addr, ai_ptr->ai_addrlen, dbg_node, sizeof(dbg_node),
507                 dbg_service, sizeof(dbg_service),
508                 NI_DGRAM | NI_NUMERICHOST | NI_NUMERICSERV);
509     DEBUG("statsd plugin: Trying to bind to [%s]:%s ...", dbg_node,
510           dbg_service);
511
512     status = bind(fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
513     if (status != 0) {
514       char errbuf[1024];
515       ERROR("statsd plugin: bind(2) failed: %s",
516             sstrerror(errno, errbuf, sizeof(errbuf)));
517       close(fd);
518       continue;
519     }
520
521     tmp = realloc(fds, sizeof(*fds) * (fds_num + 1));
522     if (tmp == NULL) {
523       ERROR("statsd plugin: realloc failed.");
524       close(fd);
525       continue;
526     }
527     fds = tmp;
528     tmp = fds + fds_num;
529     fds_num++;
530
531     memset(tmp, 0, sizeof(*tmp));
532     tmp->fd = fd;
533     tmp->events = POLLIN | POLLPRI;
534   }
535
536   freeaddrinfo(ai_list);
537
538   if (fds_num == 0) {
539     ERROR("statsd plugin: Unable to create listening socket for [%s]:%s.",
540           (node != NULL) ? node : "::", service);
541     return ENOENT;
542   }
543
544   *ret_fds = fds;
545   *ret_fds_num = fds_num;
546   return 0;
547 } /* }}} int statsd_network_init */
548
549 static void *statsd_network_thread(void *args) /* {{{ */
550 {
551   struct pollfd *fds = NULL;
552   size_t fds_num = 0;
553   int status;
554
555   status = statsd_network_init(&fds, &fds_num);
556   if (status != 0) {
557     ERROR("statsd plugin: Unable to open listening sockets.");
558     pthread_exit((void *)0);
559   }
560
561   while (!network_thread_shutdown) {
562     status = poll(fds, (nfds_t)fds_num, /* timeout = */ -1);
563     if (status < 0) {
564       char errbuf[1024];
565
566       if ((errno == EINTR) || (errno == EAGAIN))
567         continue;
568
569       ERROR("statsd plugin: poll(2) failed: %s",
570             sstrerror(errno, errbuf, sizeof(errbuf)));
571       break;
572     }
573
574     for (size_t i = 0; i < fds_num; i++) {
575       if ((fds[i].revents & (POLLIN | POLLPRI)) == 0)
576         continue;
577
578       statsd_network_read(fds[i].fd);
579       fds[i].revents = 0;
580     }
581   } /* while (!network_thread_shutdown) */
582
583   /* Clean up */
584   for (size_t i = 0; i < fds_num; i++)
585     close(fds[i].fd);
586   sfree(fds);
587
588   return (void *)0;
589 } /* }}} void *statsd_network_thread */
590
591 static int statsd_config_timer_percentile(oconfig_item_t *ci) /* {{{ */
592 {
593   double percent = NAN;
594   double *tmp;
595   int status;
596
597   status = cf_util_get_double(ci, &percent);
598   if (status != 0)
599     return status;
600
601   if ((percent <= 0.0) || (percent >= 100)) {
602     ERROR("statsd plugin: The value for \"%s\" must be between 0 and 100, "
603           "exclusively.",
604           ci->key);
605     return ERANGE;
606   }
607
608   tmp =
609       realloc(conf_timer_percentile,
610               sizeof(*conf_timer_percentile) * (conf_timer_percentile_num + 1));
611   if (tmp == NULL) {
612     ERROR("statsd plugin: realloc failed.");
613     return ENOMEM;
614   }
615   conf_timer_percentile = tmp;
616   conf_timer_percentile[conf_timer_percentile_num] = percent;
617   conf_timer_percentile_num++;
618
619   return 0;
620 } /* }}} int statsd_config_timer_percentile */
621
622 static int statsd_config(oconfig_item_t *ci) /* {{{ */
623 {
624   for (int i = 0; i < ci->children_num; i++) {
625     oconfig_item_t *child = ci->children + i;
626
627     if (strcasecmp("Host", child->key) == 0)
628       cf_util_get_string(child, &conf_node);
629     else if (strcasecmp("Port", child->key) == 0)
630       cf_util_get_service(child, &conf_service);
631     else if (strcasecmp("DeleteCounters", child->key) == 0)
632       cf_util_get_boolean(child, &conf_delete_counters);
633     else if (strcasecmp("DeleteTimers", child->key) == 0)
634       cf_util_get_boolean(child, &conf_delete_timers);
635     else if (strcasecmp("DeleteGauges", child->key) == 0)
636       cf_util_get_boolean(child, &conf_delete_gauges);
637     else if (strcasecmp("DeleteSets", child->key) == 0)
638       cf_util_get_boolean(child, &conf_delete_sets);
639     else if (strcasecmp("CounterSum", child->key) == 0)
640       cf_util_get_boolean(child, &conf_counter_sum);
641     else if (strcasecmp("TimerLower", child->key) == 0)
642       cf_util_get_boolean(child, &conf_timer_lower);
643     else if (strcasecmp("TimerUpper", child->key) == 0)
644       cf_util_get_boolean(child, &conf_timer_upper);
645     else if (strcasecmp("TimerSum", child->key) == 0)
646       cf_util_get_boolean(child, &conf_timer_sum);
647     else if (strcasecmp("TimerCount", child->key) == 0)
648       cf_util_get_boolean(child, &conf_timer_count);
649     else if (strcasecmp("TimerPercentile", child->key) == 0)
650       statsd_config_timer_percentile(child);
651     else
652       ERROR("statsd plugin: The \"%s\" config option is not valid.",
653             child->key);
654   }
655
656   return 0;
657 } /* }}} int statsd_config */
658
659 static int statsd_init(void) /* {{{ */
660 {
661   pthread_mutex_lock(&metrics_lock);
662   if (metrics_tree == NULL)
663     metrics_tree = c_avl_create((int (*)(const void *, const void *))strcmp);
664
665   if (!network_thread_running) {
666     int status;
667
668     status = pthread_create(&network_thread,
669                             /* attr = */ NULL, statsd_network_thread,
670                             /* args = */ NULL);
671     if (status != 0) {
672       char errbuf[1024];
673       pthread_mutex_unlock(&metrics_lock);
674       ERROR("statsd plugin: pthread_create failed: %s",
675             sstrerror(errno, errbuf, sizeof(errbuf)));
676       return status;
677     }
678   }
679   network_thread_running = 1;
680
681   pthread_mutex_unlock(&metrics_lock);
682
683   return 0;
684 } /* }}} int statsd_init */
685
686 /* Must hold metrics_lock when calling this function. */
687 static int statsd_metric_clear_set_unsafe(statsd_metric_t *metric) /* {{{ */
688 {
689   void *key;
690   void *value;
691
692   if ((metric == NULL) || (metric->type != STATSD_SET))
693     return EINVAL;
694
695   if (metric->set == NULL)
696     return 0;
697
698   while (c_avl_pick(metric->set, &key, &value) == 0) {
699     sfree(key);
700     sfree(value);
701   }
702
703   return 0;
704 } /* }}} int statsd_metric_clear_set_unsafe */
705
706 /* Must hold metrics_lock when calling this function. */
707 static int statsd_metric_submit_unsafe(char const *name,
708                                        statsd_metric_t *metric) /* {{{ */
709 {
710   value_list_t vl = VALUE_LIST_INIT;
711
712   vl.values = &(value_t){.gauge = NAN};
713   vl.values_len = 1;
714   sstrncpy(vl.plugin, "statsd", sizeof(vl.plugin));
715
716   if (metric->type == STATSD_GAUGE)
717     sstrncpy(vl.type, "gauge", sizeof(vl.type));
718   else if (metric->type == STATSD_TIMER)
719     sstrncpy(vl.type, "latency", sizeof(vl.type));
720   else if (metric->type == STATSD_SET)
721     sstrncpy(vl.type, "objects", sizeof(vl.type));
722   else /* if (metric->type == STATSD_COUNTER) */
723     sstrncpy(vl.type, "derive", sizeof(vl.type));
724
725   sstrncpy(vl.type_instance, name, sizeof(vl.type_instance));
726
727   if (metric->type == STATSD_GAUGE)
728     vl.values[0].gauge = (gauge_t)metric->value;
729   else if (metric->type == STATSD_TIMER) {
730     _Bool have_events = (metric->updates_num > 0);
731
732     /* Make sure all timer metrics share the *same* timestamp. */
733     vl.time = cdtime();
734
735     snprintf(vl.type_instance, sizeof(vl.type_instance), "%s-average", name);
736     vl.values[0].gauge =
737         have_events
738             ? CDTIME_T_TO_DOUBLE(latency_counter_get_average(metric->latency))
739             : NAN;
740     plugin_dispatch_values(&vl);
741
742     if (conf_timer_lower) {
743       snprintf(vl.type_instance, sizeof(vl.type_instance), "%s-lower", name);
744       vl.values[0].gauge =
745           have_events
746               ? CDTIME_T_TO_DOUBLE(latency_counter_get_min(metric->latency))
747               : NAN;
748       plugin_dispatch_values(&vl);
749     }
750
751     if (conf_timer_upper) {
752       snprintf(vl.type_instance, sizeof(vl.type_instance), "%s-upper", name);
753       vl.values[0].gauge =
754           have_events
755               ? CDTIME_T_TO_DOUBLE(latency_counter_get_max(metric->latency))
756               : NAN;
757       plugin_dispatch_values(&vl);
758     }
759
760     if (conf_timer_sum) {
761       snprintf(vl.type_instance, sizeof(vl.type_instance), "%s-sum", name);
762       vl.values[0].gauge =
763           have_events
764               ? CDTIME_T_TO_DOUBLE(latency_counter_get_sum(metric->latency))
765               : NAN;
766       plugin_dispatch_values(&vl);
767     }
768
769     for (size_t i = 0; i < conf_timer_percentile_num; i++) {
770       snprintf(vl.type_instance, sizeof(vl.type_instance), "%s-percentile-%.0f",
771                name, conf_timer_percentile[i]);
772       vl.values[0].gauge =
773           have_events ? CDTIME_T_TO_DOUBLE(latency_counter_get_percentile(
774                             metric->latency, conf_timer_percentile[i]))
775                       : NAN;
776       plugin_dispatch_values(&vl);
777     }
778
779     /* Keep this at the end, since vl.type is set to "gauge" here. The
780      * vl.type's above are implicitly set to "latency". */
781     if (conf_timer_count) {
782       sstrncpy(vl.type, "gauge", sizeof(vl.type));
783       snprintf(vl.type_instance, sizeof(vl.type_instance), "%s-count", name);
784       vl.values[0].gauge = latency_counter_get_num(metric->latency);
785       plugin_dispatch_values(&vl);
786     }
787
788     latency_counter_reset(metric->latency);
789     return 0;
790   } else if (metric->type == STATSD_SET) {
791     if (metric->set == NULL)
792       vl.values[0].gauge = 0.0;
793     else
794       vl.values[0].gauge = (gauge_t)c_avl_size(metric->set);
795   } else { /* STATSD_COUNTER */
796     gauge_t delta = nearbyint(metric->value);
797
798     /* Etsy's statsd writes counters as two metrics: a rate and the change since
799      * the last write. Since collectd does not reset its DERIVE metrics to zero,
800      * this makes little sense, but we're dispatching a "count" metric here
801      * anyway - if requested by the user - for compatibility reasons. */
802     if (conf_counter_sum) {
803       sstrncpy(vl.type, "count", sizeof(vl.type));
804       vl.values[0].gauge = delta;
805       plugin_dispatch_values(&vl);
806
807       /* restore vl.type */
808       sstrncpy(vl.type, "derive", sizeof(vl.type));
809     }
810
811     /* Rather than resetting value to zero, subtract delta so we correctly keep
812      * track of residuals. */
813     metric->value -= delta;
814     metric->counter += (derive_t)delta;
815
816     vl.values[0].derive = metric->counter;
817   }
818
819   return plugin_dispatch_values(&vl);
820 } /* }}} int statsd_metric_submit_unsafe */
821
822 static int statsd_read(void) /* {{{ */
823 {
824   c_avl_iterator_t *iter;
825   char *name;
826   statsd_metric_t *metric;
827
828   char **to_be_deleted = NULL;
829   size_t to_be_deleted_num = 0;
830
831   pthread_mutex_lock(&metrics_lock);
832
833   if (metrics_tree == NULL) {
834     pthread_mutex_unlock(&metrics_lock);
835     return 0;
836   }
837
838   iter = c_avl_get_iterator(metrics_tree);
839   while (c_avl_iterator_next(iter, (void *)&name, (void *)&metric) == 0) {
840     if ((metric->updates_num == 0) &&
841         ((conf_delete_counters && (metric->type == STATSD_COUNTER)) ||
842          (conf_delete_timers && (metric->type == STATSD_TIMER)) ||
843          (conf_delete_gauges && (metric->type == STATSD_GAUGE)) ||
844          (conf_delete_sets && (metric->type == STATSD_SET)))) {
845       DEBUG("statsd plugin: Deleting metric \"%s\".", name);
846       strarray_add(&to_be_deleted, &to_be_deleted_num, name);
847       continue;
848     }
849
850     /* Names have a prefix, e.g. "c:", which determines the (statsd) type.
851      * Remove this here. */
852     statsd_metric_submit_unsafe(name + 2, metric);
853
854     /* Reset the metric. */
855     metric->updates_num = 0;
856     if (metric->type == STATSD_SET)
857       statsd_metric_clear_set_unsafe(metric);
858   }
859   c_avl_iterator_destroy(iter);
860
861   for (size_t i = 0; i < to_be_deleted_num; i++) {
862     int status;
863
864     status = c_avl_remove(metrics_tree, to_be_deleted[i], (void *)&name,
865                           (void *)&metric);
866     if (status != 0) {
867       ERROR("stats plugin: c_avl_remove (\"%s\") failed with status %i.",
868             to_be_deleted[i], status);
869       continue;
870     }
871
872     sfree(name);
873     statsd_metric_free(metric);
874   }
875
876   pthread_mutex_unlock(&metrics_lock);
877
878   strarray_free(to_be_deleted, to_be_deleted_num);
879
880   return 0;
881 } /* }}} int statsd_read */
882
883 static int statsd_shutdown(void) /* {{{ */
884 {
885   void *key;
886   void *value;
887
888   if (network_thread_running) {
889     network_thread_shutdown = 1;
890     pthread_kill(network_thread, SIGTERM);
891     pthread_join(network_thread, /* retval = */ NULL);
892   }
893   network_thread_running = 0;
894
895   pthread_mutex_lock(&metrics_lock);
896
897   while (c_avl_pick(metrics_tree, &key, &value) == 0) {
898     sfree(key);
899     statsd_metric_free(value);
900   }
901   c_avl_destroy(metrics_tree);
902   metrics_tree = NULL;
903
904   sfree(conf_node);
905   sfree(conf_service);
906
907   pthread_mutex_unlock(&metrics_lock);
908
909   return 0;
910 } /* }}} int statsd_shutdown */
911
912 void module_register(void) {
913   plugin_register_complex_config("statsd", statsd_config);
914   plugin_register_init("statsd", statsd_init);
915   plugin_register_read("statsd", statsd_read);
916   plugin_register_shutdown("statsd", statsd_shutdown);
917 }