write_tsdb : Add a random TTL before querying the DNS again
[collectd.git] / src / write_tsdb.c
1 /**
2  * collectd - src/write_tsdb.c
3  * Copyright (C) 2012       Pierre-Yves Ritschard
4  * Copyright (C) 2011       Scott Sanders
5  * Copyright (C) 2009       Paul Sadauskas
6  * Copyright (C) 2009       Doug MacEachern
7  * Copyright (C) 2007-2012  Florian octo Forster
8  * Copyright (C) 2013-2014  Limelight Networks, Inc.
9  * This program is free software; you can redistribute it and/or modify it
10  * under the terms of the GNU General Public License as published by the
11  * Free Software Foundation; only version 2 of the License is applicable.
12  *
13  * This program is distributed in the hope that it will be useful, but
14  * WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * General Public License for more details.
17  *
18  * You should have received a copy of the GNU General Public License along
19  * with this program; if not, write to the Free Software Foundation, Inc.,
20  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
21  *
22  * Based on the write_graphite plugin. Authors:
23  *   Florian octo Forster <octo at collectd.org>
24  *   Doug MacEachern <dougm at hyperic.com>
25  *   Paul Sadauskas <psadauskas at gmail.com>
26  *   Scott Sanders <scott at jssjr.com>
27  *   Pierre-Yves Ritschard <pyr at spootnik.org>
28  * write_tsdb Authors:
29  *   Brett Hawn <bhawn at llnw.com>
30  *   Kevin Bowling <kbowling@llnw.com>
31  **/
32
33 /* write_tsdb plugin configuation example
34  *
35  * <Plugin write_tsdb>
36  *   <Node>
37  *     Host "localhost"
38  *     Port "4242"
39  *     HostTags "status=production deviceclass=www"
40  *   </Node>
41  * </Plugin>
42  */
43
44 #include "collectd.h"
45
46 #include "common.h"
47 #include "plugin.h"
48
49 #include "utils_cache.h"
50
51 #include <netdb.h>
52
53 #ifndef WT_DEFAULT_NODE
54 #define WT_DEFAULT_NODE "localhost"
55 #endif
56
57 #ifndef WT_DEFAULT_SERVICE
58 #define WT_DEFAULT_SERVICE "4242"
59 #endif
60
61 #ifndef WT_DEFAULT_ESCAPE
62 #define WT_DEFAULT_ESCAPE '.'
63 #endif
64
65 /* Ethernet - (IPv6 + TCP) = 1500 - (40 + 32) = 1428 */
66 #ifndef WT_SEND_BUF_SIZE
67 #define WT_SEND_BUF_SIZE 1428
68 #endif
69
70 /* Default configuration */
71
72 /* WRITE_TSDB_DEFAULT_DNS_TTL is the time we keep the dns cached info
73  * (seconds)
74  */
75 #define WRITE_TSDB_DEFAULT_DNS_TTL 600
76
77 /* WRITE_TSDB_DEFAULT_DNS_RANDOM_TTL helps define the max random
78  * time we keep the dns cached info :
79  * min = 0
80  * max = WRITE_TSDB_DEFAULT_DNS_RANDOM_TTL * get_plugin_interval()
81  */
82 #define WRITE_TSDB_DEFAULT_DNS_RANDOM_TTL 15
83
84 /*
85  * Private variables
86  */
87 struct wt_callback {
88   struct addrinfo *sock_info;
89   cdtime_t sock_info_last_update;
90   int sock_fd;
91
92   char *node;
93   char *service;
94   char *host_tags;
95
96   _Bool store_rates;
97   _Bool always_append_ds;
98
99   char send_buf[WT_SEND_BUF_SIZE];
100   size_t send_buf_free;
101   size_t send_buf_fill;
102   cdtime_t send_buf_init_time;
103
104   pthread_mutex_t send_lock;
105
106   _Bool connect_failed_log_enabled;
107   int connect_dns_failed_attempts_remaining;
108   cdtime_t next_random_ttl;
109 };
110
111 static cdtime_t dnsttl = TIME_T_TO_CDTIME_T_STATIC(WRITE_TSDB_DEFAULT_DNS_TTL);
112 static double dnsrandomttl = .0;
113 static _Bool use_dnsrandomttl = 0;
114
115 /*
116  * Functions
117  */
118 static void wt_reset_buffer(struct wt_callback *cb) {
119   memset(cb->send_buf, 0, sizeof(cb->send_buf));
120   cb->send_buf_free = sizeof(cb->send_buf);
121   cb->send_buf_fill = 0;
122   cb->send_buf_init_time = cdtime();
123 }
124
125 static int wt_send_buffer(struct wt_callback *cb) {
126   ssize_t status = 0;
127
128   status = swrite(cb->sock_fd, cb->send_buf, strlen(cb->send_buf));
129   if (status < 0) {
130     char errbuf[1024];
131     ERROR("write_tsdb plugin: send failed with status %zi (%s)", status,
132           sstrerror(errno, errbuf, sizeof(errbuf)));
133
134     close(cb->sock_fd);
135     cb->sock_fd = -1;
136
137     return -1;
138   }
139
140   return 0;
141 }
142
143 /* NOTE: You must hold cb->send_lock when calling this function! */
144 static int wt_flush_nolock(cdtime_t timeout, struct wt_callback *cb) {
145   int status;
146
147   DEBUG("write_tsdb plugin: wt_flush_nolock: timeout = %.3f; "
148         "send_buf_fill = %zu;",
149         (double)timeout, cb->send_buf_fill);
150
151   /* timeout == 0  => flush unconditionally */
152   if (timeout > 0) {
153     cdtime_t now;
154
155     now = cdtime();
156     if ((cb->send_buf_init_time + timeout) > now)
157       return 0;
158   }
159
160   if (cb->send_buf_fill == 0) {
161     cb->send_buf_init_time = cdtime();
162     return 0;
163   }
164
165   status = wt_send_buffer(cb);
166   wt_reset_buffer(cb);
167
168   return status;
169 }
170
171 static cdtime_t new_random_ttl() {
172   time_t ttl = 0;
173   if (use_dnsrandomttl) {
174     ttl = (time_t)(dnsrandomttl * ((double)random()) /
175                    (((double)RAND_MAX) + 1.0));
176   }
177   return TIME_T_TO_CDTIME_T(ttl);
178 }
179
180 static int wt_callback_init(struct wt_callback *cb) {
181   int status;
182   cdtime_t now;
183
184   const char *node = cb->node ? cb->node : WT_DEFAULT_NODE;
185   const char *service = cb->service ? cb->service : WT_DEFAULT_SERVICE;
186
187   if (cb->sock_fd > 0)
188     return 0;
189
190   now = cdtime();
191   if (cb->sock_info) {
192     /* When we are here, we still have the IP in cache.
193      * If we have remaining attempts without calling the DNS, we update the
194      * last_update date so we keep the info until next time.
195      * If there is no more attempts, we need to flush the cache.
196      */
197
198     if ((cb->sock_info_last_update + dnsttl + cb->next_random_ttl) < now) {
199       cb->next_random_ttl = new_random_ttl();
200       if (cb->connect_dns_failed_attempts_remaining > 0) {
201         /* Warning : this is run under send_lock mutex.
202          * This is why we do not use another mutex here.
203          * */
204         cb->sock_info_last_update = now;
205         cb->connect_dns_failed_attempts_remaining--;
206       } else {
207         freeaddrinfo(cb->sock_info);
208         cb->sock_info = NULL;
209       }
210     }
211   }
212
213   if (NULL == cb->sock_info) {
214     struct addrinfo ai_hints = {
215         .ai_family = AF_UNSPEC,
216         .ai_flags = AI_ADDRCONFIG,
217         .ai_socktype = SOCK_STREAM,
218     };
219
220     if ((cb->sock_info_last_update + dnsttl + cb->next_random_ttl) >= now) {
221       DEBUG("write_tsdb plugin: too many getaddrinfo (%s, %s) failures", node,
222             service);
223       return (-1);
224     }
225
226     cb->sock_info_last_update = now;
227     cb->next_random_ttl = new_random_ttl();
228     status = getaddrinfo(node, service, &ai_hints, &(cb->sock_info));
229     if (status != 0) {
230       if (cb->sock_info) {
231         freeaddrinfo(cb->sock_info);
232         cb->sock_info = NULL;
233       }
234       if (cb->connect_failed_log_enabled) {
235         ERROR("write_tsdb plugin: getaddrinfo (%s, %s) failed: %s", node,
236               service, gai_strerror(status));
237         cb->connect_failed_log_enabled = 0;
238       }
239       return -1;
240     }
241   }
242
243   assert(cb->sock_info != NULL);
244   for (struct addrinfo *ai_ptr = cb->sock_info; ai_ptr != NULL;
245        ai_ptr = ai_ptr->ai_next) {
246     cb->sock_fd =
247         socket(ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
248     if (cb->sock_fd < 0)
249       continue;
250
251     set_sock_opts(cb->sock_fd);
252
253     status = connect(cb->sock_fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
254     if (status != 0) {
255       close(cb->sock_fd);
256       cb->sock_fd = -1;
257       continue;
258     }
259
260     break;
261   }
262
263   if (cb->sock_fd < 0) {
264     char errbuf[1024];
265     ERROR("write_tsdb plugin: Connecting to %s:%s failed. "
266           "The last error was: %s",
267           node, service, sstrerror(errno, errbuf, sizeof(errbuf)));
268     return -1;
269   }
270
271   if (0 == cb->connect_failed_log_enabled) {
272     WARNING("write_tsdb plugin: Connecting to %s:%s succeeded.", node, service);
273     cb->connect_failed_log_enabled = 1;
274   }
275   cb->connect_dns_failed_attempts_remaining = 1;
276
277   wt_reset_buffer(cb);
278
279   return 0;
280 }
281
282 static void wt_callback_free(void *data) {
283   struct wt_callback *cb;
284
285   if (data == NULL)
286     return;
287
288   cb = data;
289
290   pthread_mutex_lock(&cb->send_lock);
291
292   wt_flush_nolock(0, cb);
293
294   close(cb->sock_fd);
295   cb->sock_fd = -1;
296
297   sfree(cb->node);
298   sfree(cb->service);
299   sfree(cb->host_tags);
300
301   pthread_mutex_destroy(&cb->send_lock);
302
303   sfree(cb);
304 }
305
306 static int wt_flush(cdtime_t timeout,
307                     const char *identifier __attribute__((unused)),
308                     user_data_t *user_data) {
309   struct wt_callback *cb;
310   int status;
311
312   if (user_data == NULL)
313     return -EINVAL;
314
315   cb = user_data->data;
316
317   pthread_mutex_lock(&cb->send_lock);
318
319   if (cb->sock_fd < 0) {
320     status = wt_callback_init(cb);
321     if (status != 0) {
322       ERROR("write_tsdb plugin: wt_callback_init failed.");
323       pthread_mutex_unlock(&cb->send_lock);
324       return -1;
325     }
326   }
327
328   status = wt_flush_nolock(timeout, cb);
329   pthread_mutex_unlock(&cb->send_lock);
330
331   return status;
332 }
333
334 static int wt_format_values(char *ret, size_t ret_len, int ds_num,
335                             const data_set_t *ds, const value_list_t *vl,
336                             _Bool store_rates) {
337   size_t offset = 0;
338   int status;
339   gauge_t *rates = NULL;
340
341   assert(0 == strcmp(ds->type, vl->type));
342
343   memset(ret, 0, ret_len);
344
345 #define BUFFER_ADD(...)                                                        \
346   do {                                                                         \
347     status = ssnprintf(ret + offset, ret_len - offset, __VA_ARGS__);           \
348     if (status < 1) {                                                          \
349       sfree(rates);                                                            \
350       return -1;                                                               \
351     } else if (((size_t)status) >= (ret_len - offset)) {                       \
352       sfree(rates);                                                            \
353       return -1;                                                               \
354     } else                                                                     \
355       offset += ((size_t)status);                                              \
356   } while (0)
357
358   if (ds->ds[ds_num].type == DS_TYPE_GAUGE)
359     BUFFER_ADD(GAUGE_FORMAT, vl->values[ds_num].gauge);
360   else if (store_rates) {
361     if (rates == NULL)
362       rates = uc_get_rate(ds, vl);
363     if (rates == NULL) {
364       WARNING("format_values: "
365               "uc_get_rate failed.");
366       return -1;
367     }
368     BUFFER_ADD(GAUGE_FORMAT, rates[ds_num]);
369   } else if (ds->ds[ds_num].type == DS_TYPE_COUNTER)
370     BUFFER_ADD("%llu", vl->values[ds_num].counter);
371   else if (ds->ds[ds_num].type == DS_TYPE_DERIVE)
372     BUFFER_ADD("%" PRIi64, vl->values[ds_num].derive);
373   else if (ds->ds[ds_num].type == DS_TYPE_ABSOLUTE)
374     BUFFER_ADD("%" PRIu64, vl->values[ds_num].absolute);
375   else {
376     ERROR("format_values plugin: Unknown data source type: %i",
377           ds->ds[ds_num].type);
378     sfree(rates);
379     return -1;
380   }
381
382 #undef BUFFER_ADD
383
384   sfree(rates);
385   return 0;
386 }
387
388 static int wt_format_name(char *ret, int ret_len, const value_list_t *vl,
389                           const struct wt_callback *cb, const char *ds_name) {
390   int status;
391   char *temp = NULL;
392   const char *prefix = "";
393   const char *meta_prefix = "tsdb_prefix";
394
395   if (vl->meta) {
396     status = meta_data_get_string(vl->meta, meta_prefix, &temp);
397     if (status == -ENOENT) {
398       /* defaults to empty string */
399     } else if (status < 0) {
400       sfree(temp);
401       return status;
402     } else {
403       prefix = temp;
404     }
405   }
406
407   if (ds_name != NULL) {
408     if (vl->plugin_instance[0] == '\0') {
409       if (vl->type_instance[0] == '\0') {
410         ssnprintf(ret, ret_len, "%s%s.%s.%s", prefix, vl->plugin, vl->type,
411                   ds_name);
412       } else {
413         ssnprintf(ret, ret_len, "%s%s.%s.%s.%s", prefix, vl->plugin, vl->type,
414                   vl->type_instance, ds_name);
415       }
416     } else { /* vl->plugin_instance != "" */
417       if (vl->type_instance[0] == '\0') {
418         ssnprintf(ret, ret_len, "%s%s.%s.%s.%s", prefix, vl->plugin,
419                   vl->plugin_instance, vl->type, ds_name);
420       } else {
421         ssnprintf(ret, ret_len, "%s%s.%s.%s.%s.%s", prefix, vl->plugin,
422                   vl->plugin_instance, vl->type, vl->type_instance, ds_name);
423       }
424     }
425   } else { /* ds_name == NULL */
426     if (vl->plugin_instance[0] == '\0') {
427       if (vl->type_instance[0] == '\0') {
428         ssnprintf(ret, ret_len, "%s%s.%s", prefix, vl->plugin, vl->type);
429       } else {
430         ssnprintf(ret, ret_len, "%s%s.%s.%s", prefix, vl->plugin,
431                   vl->type_instance, vl->type);
432       }
433     } else { /* vl->plugin_instance != "" */
434       if (vl->type_instance[0] == '\0') {
435         ssnprintf(ret, ret_len, "%s%s.%s.%s", prefix, vl->plugin,
436                   vl->plugin_instance, vl->type);
437       } else {
438         ssnprintf(ret, ret_len, "%s%s.%s.%s.%s", prefix, vl->plugin,
439                   vl->plugin_instance, vl->type, vl->type_instance);
440       }
441     }
442   }
443
444   sfree(temp);
445   return 0;
446 }
447
448 static int wt_send_message(const char *key, const char *value, cdtime_t time,
449                            struct wt_callback *cb, const char *host,
450                            meta_data_t *md) {
451   int status;
452   size_t message_len;
453   char *temp = NULL;
454   const char *tags = "";
455   char message[1024];
456   const char *host_tags = cb->host_tags ? cb->host_tags : "";
457   const char *meta_tsdb = "tsdb_tags";
458
459   /* skip if value is NaN */
460   if (value[0] == 'n')
461     return 0;
462
463   if (md) {
464     status = meta_data_get_string(md, meta_tsdb, &temp);
465     if (status == -ENOENT) {
466       /* defaults to empty string */
467     } else if (status < 0) {
468       ERROR("write_tsdb plugin: tags metadata get failure");
469       sfree(temp);
470       pthread_mutex_unlock(&cb->send_lock);
471       return status;
472     } else {
473       tags = temp;
474     }
475   }
476
477   status =
478       ssnprintf(message, sizeof(message), "put %s %.0f %s fqdn=%s %s %s\r\n",
479                 key, CDTIME_T_TO_DOUBLE(time), value, host, tags, host_tags);
480   sfree(temp);
481   if (status < 0)
482     return -1;
483   message_len = (size_t)status;
484
485   if (message_len >= sizeof(message)) {
486     ERROR("write_tsdb plugin: message buffer too small: "
487           "Need %zu bytes.",
488           message_len + 1);
489     return -1;
490   }
491
492   pthread_mutex_lock(&cb->send_lock);
493
494   if (cb->sock_fd < 0) {
495     status = wt_callback_init(cb);
496     if (status != 0) {
497       ERROR("write_tsdb plugin: wt_callback_init failed.");
498       pthread_mutex_unlock(&cb->send_lock);
499       return -1;
500     }
501   }
502
503   if (message_len >= cb->send_buf_free) {
504     status = wt_flush_nolock(0, cb);
505     if (status != 0) {
506       pthread_mutex_unlock(&cb->send_lock);
507       return status;
508     }
509   }
510
511   /* Assert that we have enough space for this message. */
512   assert(message_len < cb->send_buf_free);
513
514   /* `message_len + 1' because `message_len' does not include the
515    * trailing null byte. Neither does `send_buffer_fill'. */
516   memcpy(cb->send_buf + cb->send_buf_fill, message, message_len + 1);
517   cb->send_buf_fill += message_len;
518   cb->send_buf_free -= message_len;
519
520   DEBUG("write_tsdb plugin: [%s]:%s buf %zu/%zu (%.1f %%) \"%s\"", cb->node,
521         cb->service, cb->send_buf_fill, sizeof(cb->send_buf),
522         100.0 * ((double)cb->send_buf_fill) / ((double)sizeof(cb->send_buf)),
523         message);
524
525   pthread_mutex_unlock(&cb->send_lock);
526
527   return 0;
528 }
529
530 static int wt_write_messages(const data_set_t *ds, const value_list_t *vl,
531                              struct wt_callback *cb) {
532   char key[10 * DATA_MAX_NAME_LEN];
533   char values[512];
534
535   int status;
536
537   if (0 != strcmp(ds->type, vl->type)) {
538     ERROR("write_tsdb plugin: DS type does not match "
539           "value list type");
540     return -1;
541   }
542
543   for (size_t i = 0; i < ds->ds_num; i++) {
544     const char *ds_name = NULL;
545
546     if (cb->always_append_ds || (ds->ds_num > 1))
547       ds_name = ds->ds[i].name;
548
549     /* Copy the identifier to 'key' and escape it. */
550     status = wt_format_name(key, sizeof(key), vl, cb, ds_name);
551     if (status != 0) {
552       ERROR("write_tsdb plugin: error with format_name");
553       return status;
554     }
555
556     escape_string(key, sizeof(key));
557     /* Convert the values to an ASCII representation and put that into
558      * 'values'. */
559     status =
560         wt_format_values(values, sizeof(values), i, ds, vl, cb->store_rates);
561     if (status != 0) {
562       ERROR("write_tsdb plugin: error with "
563             "wt_format_values");
564       return status;
565     }
566
567     /* Send the message to tsdb */
568     status = wt_send_message(key, values, vl->time, cb, vl->host, vl->meta);
569     if (status != 0) {
570       ERROR("write_tsdb plugin: error with "
571             "wt_send_message");
572       return status;
573     }
574   }
575
576   return 0;
577 }
578
579 static int wt_write(const data_set_t *ds, const value_list_t *vl,
580                     user_data_t *user_data) {
581   struct wt_callback *cb;
582   int status;
583
584   if (user_data == NULL)
585     return EINVAL;
586
587   cb = user_data->data;
588
589   status = wt_write_messages(ds, vl, cb);
590
591   return status;
592 }
593
594 static int wt_config_tsd(oconfig_item_t *ci) {
595   struct wt_callback *cb;
596   char callback_name[DATA_MAX_NAME_LEN];
597
598   cb = calloc(1, sizeof(*cb));
599   if (cb == NULL) {
600     ERROR("write_tsdb plugin: calloc failed.");
601     return -1;
602   }
603   cb->sock_fd = -1;
604   cb->connect_failed_log_enabled = 1;
605   cb->next_random_ttl = new_random_ttl();
606
607   pthread_mutex_init(&cb->send_lock, NULL);
608
609   for (int i = 0; i < ci->children_num; i++) {
610     oconfig_item_t *child = ci->children + i;
611
612     if (strcasecmp("Host", child->key) == 0)
613       cf_util_get_string(child, &cb->node);
614     else if (strcasecmp("Port", child->key) == 0)
615       cf_util_get_service(child, &cb->service);
616     else if (strcasecmp("HostTags", child->key) == 0)
617       cf_util_get_string(child, &cb->host_tags);
618     else if (strcasecmp("StoreRates", child->key) == 0)
619       cf_util_get_boolean(child, &cb->store_rates);
620     else if (strcasecmp("AlwaysAppendDS", child->key) == 0)
621       cf_util_get_boolean(child, &cb->always_append_ds);
622     else {
623       ERROR("write_tsdb plugin: Invalid configuration "
624             "option: %s.",
625             child->key);
626     }
627   }
628
629   ssnprintf(callback_name, sizeof(callback_name), "write_tsdb/%s/%s",
630             cb->node != NULL ? cb->node : WT_DEFAULT_NODE,
631             cb->service != NULL ? cb->service : WT_DEFAULT_SERVICE);
632
633   user_data_t user_data = {.data = cb, .free_func = wt_callback_free};
634
635   plugin_register_write(callback_name, wt_write, &user_data);
636
637   user_data.free_func = NULL;
638   plugin_register_flush(callback_name, wt_flush, &user_data);
639
640   return 0;
641 }
642
643 static int wt_config(oconfig_item_t *ci) {
644   _Bool config_random_ttl = 0;
645
646   for (int i = 0; i < ci->children_num; i++) {
647     oconfig_item_t *child = ci->children + i;
648
649     if (strcasecmp("Node", child->key) == 0)
650       wt_config_tsd(child);
651     else if (strcasecmp("DNS_Cache_TTL", child->key) == 0) {
652       int ttl;
653       cf_util_get_int(child, &ttl);
654       dnsttl = TIME_T_TO_CDTIME_T(ttl);
655     } else if (strcasecmp("DNS_Random_Cache_TTL", child->key) == 0) {
656       int ttl;
657       cf_util_get_int(child, &ttl);
658       config_random_ttl = 1;
659       if (ttl) {
660         dnsrandomttl = (double)ttl;
661         use_dnsrandomttl = 1;
662       } else {
663         use_dnsrandomttl = 0;
664       }
665     } else {
666       ERROR("write_tsdb plugin: Invalid configuration "
667             "option: %s.",
668             child->key);
669     }
670   }
671
672   if (!config_random_ttl) {
673     use_dnsrandomttl = 1;
674     dnsrandomttl = CDTIME_T_TO_DOUBLE(WRITE_TSDB_DEFAULT_DNS_RANDOM_TTL *
675                                       plugin_get_interval());
676   }
677
678   return 0;
679 }
680
681 void module_register(void) {
682   plugin_register_complex_config("write_tsdb", wt_config);
683 }
684
685 /* vim: set sw=4 ts=4 sts=4 tw=78 et : */