write_tsdb plugin: Rename options to "Resolve{Interval,Jitter}".
[collectd.git] / src / write_tsdb.c
1 /**
2  * collectd - src/write_tsdb.c
3  * Copyright (C) 2012       Pierre-Yves Ritschard
4  * Copyright (C) 2011       Scott Sanders
5  * Copyright (C) 2009       Paul Sadauskas
6  * Copyright (C) 2009       Doug MacEachern
7  * Copyright (C) 2007-2012  Florian octo Forster
8  * Copyright (C) 2013-2014  Limelight Networks, Inc.
9  * This program is free software; you can redistribute it and/or modify it
10  * under the terms of the GNU General Public License as published by the
11  * Free Software Foundation; only version 2 of the License is applicable.
12  *
13  * This program is distributed in the hope that it will be useful, but
14  * WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * General Public License for more details.
17  *
18  * You should have received a copy of the GNU General Public License along
19  * with this program; if not, write to the Free Software Foundation, Inc.,
20  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
21  *
22  * Based on the write_graphite plugin. Authors:
23  *   Florian octo Forster <octo at collectd.org>
24  *   Doug MacEachern <dougm at hyperic.com>
25  *   Paul Sadauskas <psadauskas at gmail.com>
26  *   Scott Sanders <scott at jssjr.com>
27  *   Pierre-Yves Ritschard <pyr at spootnik.org>
28  * write_tsdb Authors:
29  *   Brett Hawn <bhawn at llnw.com>
30  *   Kevin Bowling <kbowling@llnw.com>
31  **/
32
33 /* write_tsdb plugin configuation example
34  *
35  * <Plugin write_tsdb>
36  *   <Node>
37  *     Host "localhost"
38  *     Port "4242"
39  *     HostTags "status=production deviceclass=www"
40  *   </Node>
41  * </Plugin>
42  */
43
44 #include "collectd.h"
45
46 #include "common.h"
47 #include "plugin.h"
48
49 #include "utils_cache.h"
50
51 #include <netdb.h>
52
53 #ifndef WT_DEFAULT_NODE
54 #define WT_DEFAULT_NODE "localhost"
55 #endif
56
57 #ifndef WT_DEFAULT_SERVICE
58 #define WT_DEFAULT_SERVICE "4242"
59 #endif
60
61 #ifndef WT_DEFAULT_ESCAPE
62 #define WT_DEFAULT_ESCAPE '.'
63 #endif
64
65 /* Ethernet - (IPv6 + TCP) = 1500 - (40 + 32) = 1428 */
66 #ifndef WT_SEND_BUF_SIZE
67 #define WT_SEND_BUF_SIZE 1428
68 #endif
69
70 /* Default configuration */
71
72 /* WRITE_TSDB_DEFAULT_DNS_TTL is the time we keep the dns cached info
73  * (seconds)
74  */
75 #define WRITE_TSDB_DEFAULT_DNS_TTL 600
76
77 /* WRITE_TSDB_DEFAULT_DNS_RANDOM_TTL helps define the max random
78  * time we keep the dns cached info :
79  * min = 0
80  * max = WRITE_TSDB_DEFAULT_DNS_RANDOM_TTL * get_plugin_interval()
81  */
82 #define WRITE_TSDB_DEFAULT_DNS_RANDOM_TTL 15
83
84 /*
85  * Private variables
86  */
87 struct wt_callback {
88   struct addrinfo *ai;
89   cdtime_t ai_last_update;
90   int sock_fd;
91
92   char *node;
93   char *service;
94   char *host_tags;
95
96   _Bool store_rates;
97   _Bool always_append_ds;
98
99   char send_buf[WT_SEND_BUF_SIZE];
100   size_t send_buf_free;
101   size_t send_buf_fill;
102   cdtime_t send_buf_init_time;
103
104   pthread_mutex_t send_lock;
105
106   _Bool connect_failed_log_enabled;
107   int connect_dns_failed_attempts_remaining;
108   cdtime_t next_random_ttl;
109 };
110
111 static cdtime_t dnsttl = TIME_T_TO_CDTIME_T_STATIC(WRITE_TSDB_DEFAULT_DNS_TTL);
112 static cdtime_t dnsrandomttl = 0;
113
114 /*
115  * Functions
116  */
117 static void wt_reset_buffer(struct wt_callback *cb) {
118   memset(cb->send_buf, 0, sizeof(cb->send_buf));
119   cb->send_buf_free = sizeof(cb->send_buf);
120   cb->send_buf_fill = 0;
121   cb->send_buf_init_time = cdtime();
122 }
123
124 static int wt_send_buffer(struct wt_callback *cb) {
125   ssize_t status = 0;
126
127   status = swrite(cb->sock_fd, cb->send_buf, strlen(cb->send_buf));
128   if (status < 0) {
129     char errbuf[1024];
130     ERROR("write_tsdb plugin: send failed with status %zi (%s)", status,
131           sstrerror(errno, errbuf, sizeof(errbuf)));
132
133     close(cb->sock_fd);
134     cb->sock_fd = -1;
135
136     return -1;
137   }
138
139   return 0;
140 }
141
142 /* NOTE: You must hold cb->send_lock when calling this function! */
143 static int wt_flush_nolock(cdtime_t timeout, struct wt_callback *cb) {
144   int status;
145
146   DEBUG("write_tsdb plugin: wt_flush_nolock: timeout = %.3f; "
147         "send_buf_fill = %zu;",
148         (double)timeout, cb->send_buf_fill);
149
150   /* timeout == 0  => flush unconditionally */
151   if (timeout > 0) {
152     cdtime_t now;
153
154     now = cdtime();
155     if ((cb->send_buf_init_time + timeout) > now)
156       return 0;
157   }
158
159   if (cb->send_buf_fill == 0) {
160     cb->send_buf_init_time = cdtime();
161     return 0;
162   }
163
164   status = wt_send_buffer(cb);
165   wt_reset_buffer(cb);
166
167   return status;
168 }
169
170 static cdtime_t new_random_ttl() {
171   if (dnsrandomttl == 0)
172     return 0;
173
174   time_t ttl = (time_t)(CDTIME_T_TO_DOUBLE(dnsrandomttl) * ((double)random()) /
175                         (((double)RAND_MAX) + 1.0));
176   return TIME_T_TO_CDTIME_T(ttl);
177 }
178
179 static int wt_callback_init(struct wt_callback *cb) {
180   int status;
181   cdtime_t now;
182
183   const char *node = cb->node ? cb->node : WT_DEFAULT_NODE;
184   const char *service = cb->service ? cb->service : WT_DEFAULT_SERVICE;
185
186   if (cb->sock_fd > 0)
187     return 0;
188
189   now = cdtime();
190   if (cb->ai) {
191     /* When we are here, we still have the IP in cache.
192      * If we have remaining attempts without calling the DNS, we update the
193      * last_update date so we keep the info until next time.
194      * If there is no more attempts, we need to flush the cache.
195      */
196
197     if ((cb->ai_last_update + dnsttl + cb->next_random_ttl) < now) {
198       cb->next_random_ttl = new_random_ttl();
199       if (cb->connect_dns_failed_attempts_remaining > 0) {
200         /* Warning : this is run under send_lock mutex.
201          * This is why we do not use another mutex here.
202          * */
203         cb->ai_last_update = now;
204         cb->connect_dns_failed_attempts_remaining--;
205       } else {
206         freeaddrinfo(cb->ai);
207         cb->ai = NULL;
208       }
209     }
210   }
211
212   if (cb->ai == NULL) {
213     if ((cb->ai_last_update + dnsttl + cb->next_random_ttl) >= now) {
214       DEBUG("write_tsdb plugin: too many getaddrinfo(%s, %s) failures", node,
215             service);
216       return (-1);
217     }
218     cb->ai_last_update = now;
219     cb->next_random_ttl = new_random_ttl();
220
221     struct addrinfo ai_hints = {
222         .ai_family = AF_UNSPEC,
223         .ai_flags = AI_ADDRCONFIG,
224         .ai_socktype = SOCK_STREAM,
225     };
226
227     status = getaddrinfo(node, service, &ai_hints, &cb->ai);
228     if (status != 0) {
229       if (cb->ai) {
230         freeaddrinfo(cb->ai);
231         cb->ai = NULL;
232       }
233       if (cb->connect_failed_log_enabled) {
234         ERROR("write_tsdb plugin: getaddrinfo(%s, %s) failed: %s", node,
235               service, gai_strerror(status));
236         cb->connect_failed_log_enabled = 0;
237       }
238       return -1;
239     }
240   }
241
242   assert(cb->ai != NULL);
243   for (struct addrinfo *ai = cb->ai; ai != NULL; ai = ai->ai_next) {
244     cb->sock_fd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
245     if (cb->sock_fd < 0)
246       continue;
247
248     set_sock_opts(cb->sock_fd);
249
250     status = connect(cb->sock_fd, ai->ai_addr, ai->ai_addrlen);
251     if (status != 0) {
252       close(cb->sock_fd);
253       cb->sock_fd = -1;
254       continue;
255     }
256
257     break;
258   }
259
260   if (cb->sock_fd < 0) {
261     char errbuf[1024];
262     ERROR("write_tsdb plugin: Connecting to %s:%s failed. "
263           "The last error was: %s",
264           node, service, sstrerror(errno, errbuf, sizeof(errbuf)));
265     return -1;
266   }
267
268   if (0 == cb->connect_failed_log_enabled) {
269     WARNING("write_tsdb plugin: Connecting to %s:%s succeeded.", node, service);
270     cb->connect_failed_log_enabled = 1;
271   }
272   cb->connect_dns_failed_attempts_remaining = 1;
273
274   wt_reset_buffer(cb);
275
276   return 0;
277 }
278
279 static void wt_callback_free(void *data) {
280   struct wt_callback *cb;
281
282   if (data == NULL)
283     return;
284
285   cb = data;
286
287   pthread_mutex_lock(&cb->send_lock);
288
289   wt_flush_nolock(0, cb);
290
291   close(cb->sock_fd);
292   cb->sock_fd = -1;
293
294   sfree(cb->node);
295   sfree(cb->service);
296   sfree(cb->host_tags);
297
298   pthread_mutex_destroy(&cb->send_lock);
299
300   sfree(cb);
301 }
302
303 static int wt_flush(cdtime_t timeout,
304                     const char *identifier __attribute__((unused)),
305                     user_data_t *user_data) {
306   struct wt_callback *cb;
307   int status;
308
309   if (user_data == NULL)
310     return -EINVAL;
311
312   cb = user_data->data;
313
314   pthread_mutex_lock(&cb->send_lock);
315
316   if (cb->sock_fd < 0) {
317     status = wt_callback_init(cb);
318     if (status != 0) {
319       ERROR("write_tsdb plugin: wt_callback_init failed.");
320       pthread_mutex_unlock(&cb->send_lock);
321       return -1;
322     }
323   }
324
325   status = wt_flush_nolock(timeout, cb);
326   pthread_mutex_unlock(&cb->send_lock);
327
328   return status;
329 }
330
331 static int wt_format_values(char *ret, size_t ret_len, int ds_num,
332                             const data_set_t *ds, const value_list_t *vl,
333                             _Bool store_rates) {
334   size_t offset = 0;
335   int status;
336   gauge_t *rates = NULL;
337
338   assert(0 == strcmp(ds->type, vl->type));
339
340   memset(ret, 0, ret_len);
341
342 #define BUFFER_ADD(...)                                                        \
343   do {                                                                         \
344     status = ssnprintf(ret + offset, ret_len - offset, __VA_ARGS__);           \
345     if (status < 1) {                                                          \
346       sfree(rates);                                                            \
347       return -1;                                                               \
348     } else if (((size_t)status) >= (ret_len - offset)) {                       \
349       sfree(rates);                                                            \
350       return -1;                                                               \
351     } else                                                                     \
352       offset += ((size_t)status);                                              \
353   } while (0)
354
355   if (ds->ds[ds_num].type == DS_TYPE_GAUGE)
356     BUFFER_ADD(GAUGE_FORMAT, vl->values[ds_num].gauge);
357   else if (store_rates) {
358     if (rates == NULL)
359       rates = uc_get_rate(ds, vl);
360     if (rates == NULL) {
361       WARNING("format_values: "
362               "uc_get_rate failed.");
363       return -1;
364     }
365     BUFFER_ADD(GAUGE_FORMAT, rates[ds_num]);
366   } else if (ds->ds[ds_num].type == DS_TYPE_COUNTER)
367     BUFFER_ADD("%llu", vl->values[ds_num].counter);
368   else if (ds->ds[ds_num].type == DS_TYPE_DERIVE)
369     BUFFER_ADD("%" PRIi64, vl->values[ds_num].derive);
370   else if (ds->ds[ds_num].type == DS_TYPE_ABSOLUTE)
371     BUFFER_ADD("%" PRIu64, vl->values[ds_num].absolute);
372   else {
373     ERROR("format_values plugin: Unknown data source type: %i",
374           ds->ds[ds_num].type);
375     sfree(rates);
376     return -1;
377   }
378
379 #undef BUFFER_ADD
380
381   sfree(rates);
382   return 0;
383 }
384
385 static int wt_format_name(char *ret, int ret_len, const value_list_t *vl,
386                           const struct wt_callback *cb, const char *ds_name) {
387   int status;
388   char *temp = NULL;
389   const char *prefix = "";
390   const char *meta_prefix = "tsdb_prefix";
391
392   if (vl->meta) {
393     status = meta_data_get_string(vl->meta, meta_prefix, &temp);
394     if (status == -ENOENT) {
395       /* defaults to empty string */
396     } else if (status < 0) {
397       sfree(temp);
398       return status;
399     } else {
400       prefix = temp;
401     }
402   }
403
404   if (ds_name != NULL) {
405     if (vl->plugin_instance[0] == '\0') {
406       if (vl->type_instance[0] == '\0') {
407         ssnprintf(ret, ret_len, "%s%s.%s.%s", prefix, vl->plugin, vl->type,
408                   ds_name);
409       } else {
410         ssnprintf(ret, ret_len, "%s%s.%s.%s.%s", prefix, vl->plugin, vl->type,
411                   vl->type_instance, ds_name);
412       }
413     } else { /* vl->plugin_instance != "" */
414       if (vl->type_instance[0] == '\0') {
415         ssnprintf(ret, ret_len, "%s%s.%s.%s.%s", prefix, vl->plugin,
416                   vl->plugin_instance, vl->type, ds_name);
417       } else {
418         ssnprintf(ret, ret_len, "%s%s.%s.%s.%s.%s", prefix, vl->plugin,
419                   vl->plugin_instance, vl->type, vl->type_instance, ds_name);
420       }
421     }
422   } else { /* ds_name == NULL */
423     if (vl->plugin_instance[0] == '\0') {
424       if (vl->type_instance[0] == '\0') {
425         ssnprintf(ret, ret_len, "%s%s.%s", prefix, vl->plugin, vl->type);
426       } else {
427         ssnprintf(ret, ret_len, "%s%s.%s.%s", prefix, vl->plugin,
428                   vl->type_instance, vl->type);
429       }
430     } else { /* vl->plugin_instance != "" */
431       if (vl->type_instance[0] == '\0') {
432         ssnprintf(ret, ret_len, "%s%s.%s.%s", prefix, vl->plugin,
433                   vl->plugin_instance, vl->type);
434       } else {
435         ssnprintf(ret, ret_len, "%s%s.%s.%s.%s", prefix, vl->plugin,
436                   vl->plugin_instance, vl->type, vl->type_instance);
437       }
438     }
439   }
440
441   sfree(temp);
442   return 0;
443 }
444
445 static int wt_send_message(const char *key, const char *value, cdtime_t time,
446                            struct wt_callback *cb, const char *host,
447                            meta_data_t *md) {
448   int status;
449   size_t message_len;
450   char *temp = NULL;
451   const char *tags = "";
452   char message[1024];
453   const char *host_tags = cb->host_tags ? cb->host_tags : "";
454   const char *meta_tsdb = "tsdb_tags";
455
456   /* skip if value is NaN */
457   if (value[0] == 'n')
458     return 0;
459
460   if (md) {
461     status = meta_data_get_string(md, meta_tsdb, &temp);
462     if (status == -ENOENT) {
463       /* defaults to empty string */
464     } else if (status < 0) {
465       ERROR("write_tsdb plugin: tags metadata get failure");
466       sfree(temp);
467       pthread_mutex_unlock(&cb->send_lock);
468       return status;
469     } else {
470       tags = temp;
471     }
472   }
473
474   status =
475       ssnprintf(message, sizeof(message), "put %s %.0f %s fqdn=%s %s %s\r\n",
476                 key, CDTIME_T_TO_DOUBLE(time), value, host, tags, host_tags);
477   sfree(temp);
478   if (status < 0)
479     return -1;
480   message_len = (size_t)status;
481
482   if (message_len >= sizeof(message)) {
483     ERROR("write_tsdb plugin: message buffer too small: "
484           "Need %zu bytes.",
485           message_len + 1);
486     return -1;
487   }
488
489   pthread_mutex_lock(&cb->send_lock);
490
491   if (cb->sock_fd < 0) {
492     status = wt_callback_init(cb);
493     if (status != 0) {
494       ERROR("write_tsdb plugin: wt_callback_init failed.");
495       pthread_mutex_unlock(&cb->send_lock);
496       return -1;
497     }
498   }
499
500   if (message_len >= cb->send_buf_free) {
501     status = wt_flush_nolock(0, cb);
502     if (status != 0) {
503       pthread_mutex_unlock(&cb->send_lock);
504       return status;
505     }
506   }
507
508   /* Assert that we have enough space for this message. */
509   assert(message_len < cb->send_buf_free);
510
511   /* `message_len + 1' because `message_len' does not include the
512    * trailing null byte. Neither does `send_buffer_fill'. */
513   memcpy(cb->send_buf + cb->send_buf_fill, message, message_len + 1);
514   cb->send_buf_fill += message_len;
515   cb->send_buf_free -= message_len;
516
517   DEBUG("write_tsdb plugin: [%s]:%s buf %zu/%zu (%.1f %%) \"%s\"", cb->node,
518         cb->service, cb->send_buf_fill, sizeof(cb->send_buf),
519         100.0 * ((double)cb->send_buf_fill) / ((double)sizeof(cb->send_buf)),
520         message);
521
522   pthread_mutex_unlock(&cb->send_lock);
523
524   return 0;
525 }
526
527 static int wt_write_messages(const data_set_t *ds, const value_list_t *vl,
528                              struct wt_callback *cb) {
529   char key[10 * DATA_MAX_NAME_LEN];
530   char values[512];
531
532   int status;
533
534   if (0 != strcmp(ds->type, vl->type)) {
535     ERROR("write_tsdb plugin: DS type does not match "
536           "value list type");
537     return -1;
538   }
539
540   for (size_t i = 0; i < ds->ds_num; i++) {
541     const char *ds_name = NULL;
542
543     if (cb->always_append_ds || (ds->ds_num > 1))
544       ds_name = ds->ds[i].name;
545
546     /* Copy the identifier to 'key' and escape it. */
547     status = wt_format_name(key, sizeof(key), vl, cb, ds_name);
548     if (status != 0) {
549       ERROR("write_tsdb plugin: error with format_name");
550       return status;
551     }
552
553     escape_string(key, sizeof(key));
554     /* Convert the values to an ASCII representation and put that into
555      * 'values'. */
556     status =
557         wt_format_values(values, sizeof(values), i, ds, vl, cb->store_rates);
558     if (status != 0) {
559       ERROR("write_tsdb plugin: error with "
560             "wt_format_values");
561       return status;
562     }
563
564     /* Send the message to tsdb */
565     status = wt_send_message(key, values, vl->time, cb, vl->host, vl->meta);
566     if (status != 0) {
567       ERROR("write_tsdb plugin: error with "
568             "wt_send_message");
569       return status;
570     }
571   }
572
573   return 0;
574 }
575
576 static int wt_write(const data_set_t *ds, const value_list_t *vl,
577                     user_data_t *user_data) {
578   struct wt_callback *cb;
579   int status;
580
581   if (user_data == NULL)
582     return EINVAL;
583
584   cb = user_data->data;
585
586   status = wt_write_messages(ds, vl, cb);
587
588   return status;
589 }
590
591 static int wt_config_tsd(oconfig_item_t *ci) {
592   struct wt_callback *cb;
593   char callback_name[DATA_MAX_NAME_LEN];
594
595   cb = calloc(1, sizeof(*cb));
596   if (cb == NULL) {
597     ERROR("write_tsdb plugin: calloc failed.");
598     return -1;
599   }
600   cb->sock_fd = -1;
601   cb->connect_failed_log_enabled = 1;
602   cb->next_random_ttl = new_random_ttl();
603
604   pthread_mutex_init(&cb->send_lock, NULL);
605
606   for (int i = 0; i < ci->children_num; i++) {
607     oconfig_item_t *child = ci->children + i;
608
609     if (strcasecmp("Host", child->key) == 0)
610       cf_util_get_string(child, &cb->node);
611     else if (strcasecmp("Port", child->key) == 0)
612       cf_util_get_service(child, &cb->service);
613     else if (strcasecmp("HostTags", child->key) == 0)
614       cf_util_get_string(child, &cb->host_tags);
615     else if (strcasecmp("StoreRates", child->key) == 0)
616       cf_util_get_boolean(child, &cb->store_rates);
617     else if (strcasecmp("AlwaysAppendDS", child->key) == 0)
618       cf_util_get_boolean(child, &cb->always_append_ds);
619     else {
620       ERROR("write_tsdb plugin: Invalid configuration "
621             "option: %s.",
622             child->key);
623     }
624   }
625
626   ssnprintf(callback_name, sizeof(callback_name), "write_tsdb/%s/%s",
627             cb->node != NULL ? cb->node : WT_DEFAULT_NODE,
628             cb->service != NULL ? cb->service : WT_DEFAULT_SERVICE);
629
630   user_data_t user_data = {.data = cb, .free_func = wt_callback_free};
631
632   plugin_register_write(callback_name, wt_write, &user_data);
633
634   user_data.free_func = NULL;
635   plugin_register_flush(callback_name, wt_flush, &user_data);
636
637   return 0;
638 }
639
640 static int wt_config(oconfig_item_t *ci) {
641   _Bool config_random_ttl = 0;
642
643   for (int i = 0; i < ci->children_num; i++) {
644     oconfig_item_t *child = ci->children + i;
645
646     if (strcasecmp("Node", child->key) == 0)
647       wt_config_tsd(child);
648     else if (strcasecmp("ResolveInterval", child->key) == 0)
649       cf_util_get_cdtime(child, &dnsttl);
650     else if (strcasecmp("ResolveJitter", child->key) == 0) {
651       config_random_ttl = 1;
652       cf_util_get_cdtime(child, &dnsrandomttl);
653     } else {
654       ERROR("write_tsdb plugin: Invalid configuration "
655             "option: %s.",
656             child->key);
657     }
658   }
659
660   if (!config_random_ttl)
661     dnsrandomttl = CDTIME_T_TO_DOUBLE(WRITE_TSDB_DEFAULT_DNS_RANDOM_TTL *
662                                       plugin_get_interval());
663
664   return 0;
665 }
666
667 void module_register(void) {
668   plugin_register_complex_config("write_tsdb", wt_config);
669 }
670
671 /* vim: set sw=4 ts=4 sts=4 tw=78 et : */