write_tsdb plugin: Use cdrand_range().
[collectd.git] / src / write_tsdb.c
1 /**
2  * collectd - src/write_tsdb.c
3  * Copyright (C) 2012       Pierre-Yves Ritschard
4  * Copyright (C) 2011       Scott Sanders
5  * Copyright (C) 2009       Paul Sadauskas
6  * Copyright (C) 2009       Doug MacEachern
7  * Copyright (C) 2007-2012  Florian octo Forster
8  * Copyright (C) 2013-2014  Limelight Networks, Inc.
9  * This program is free software; you can redistribute it and/or modify it
10  * under the terms of the GNU General Public License as published by the
11  * Free Software Foundation; only version 2 of the License is applicable.
12  *
13  * This program is distributed in the hope that it will be useful, but
14  * WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * General Public License for more details.
17  *
18  * You should have received a copy of the GNU General Public License along
19  * with this program; if not, write to the Free Software Foundation, Inc.,
20  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
21  *
22  * Based on the write_graphite plugin. Authors:
23  *   Florian octo Forster <octo at collectd.org>
24  *   Doug MacEachern <dougm at hyperic.com>
25  *   Paul Sadauskas <psadauskas at gmail.com>
26  *   Scott Sanders <scott at jssjr.com>
27  *   Pierre-Yves Ritschard <pyr at spootnik.org>
28  * write_tsdb Authors:
29  *   Brett Hawn <bhawn at llnw.com>
30  *   Kevin Bowling <kbowling@llnw.com>
31  **/
32
33 /* write_tsdb plugin configuation example
34  *
35  * <Plugin write_tsdb>
36  *   <Node>
37  *     Host "localhost"
38  *     Port "4242"
39  *     HostTags "status=production deviceclass=www"
40  *   </Node>
41  * </Plugin>
42  */
43
44 #include "collectd.h"
45
46 #include "common.h"
47 #include "plugin.h"
48 #include "utils_cache.h"
49 #include "utils_random.h"
50
51 #include <netdb.h>
52
53 #ifndef WT_DEFAULT_NODE
54 #define WT_DEFAULT_NODE "localhost"
55 #endif
56
57 #ifndef WT_DEFAULT_SERVICE
58 #define WT_DEFAULT_SERVICE "4242"
59 #endif
60
61 #ifndef WT_DEFAULT_ESCAPE
62 #define WT_DEFAULT_ESCAPE '.'
63 #endif
64
65 /* Ethernet - (IPv6 + TCP) = 1500 - (40 + 32) = 1428 */
66 #ifndef WT_SEND_BUF_SIZE
67 #define WT_SEND_BUF_SIZE 1428
68 #endif
69
70 /* Default configuration */
71
72 /* WRITE_TSDB_DEFAULT_DNS_TTL is the time we keep the dns cached info
73  * (seconds)
74  */
75 #define WRITE_TSDB_DEFAULT_DNS_TTL 600
76
77 /* WRITE_TSDB_DEFAULT_DNS_RANDOM_TTL helps define the max random
78  * time we keep the dns cached info :
79  * min = 0
80  * max = WRITE_TSDB_DEFAULT_DNS_RANDOM_TTL * get_plugin_interval()
81  */
82 #define WRITE_TSDB_DEFAULT_DNS_RANDOM_TTL 15
83
84 /*
85  * Private variables
86  */
87 struct wt_callback {
88   struct addrinfo *ai;
89   cdtime_t ai_last_update;
90   int sock_fd;
91
92   char *node;
93   char *service;
94   char *host_tags;
95
96   _Bool store_rates;
97   _Bool always_append_ds;
98
99   char send_buf[WT_SEND_BUF_SIZE];
100   size_t send_buf_free;
101   size_t send_buf_fill;
102   cdtime_t send_buf_init_time;
103
104   pthread_mutex_t send_lock;
105
106   _Bool connect_failed_log_enabled;
107   int connect_dns_failed_attempts_remaining;
108   cdtime_t next_random_ttl;
109 };
110
111 static cdtime_t resolve_interval =
112     TIME_T_TO_CDTIME_T_STATIC(WRITE_TSDB_DEFAULT_DNS_TTL);
113 static cdtime_t resolve_jitter = 0;
114
115 /*
116  * Functions
117  */
118 static void wt_reset_buffer(struct wt_callback *cb) {
119   memset(cb->send_buf, 0, sizeof(cb->send_buf));
120   cb->send_buf_free = sizeof(cb->send_buf);
121   cb->send_buf_fill = 0;
122   cb->send_buf_init_time = cdtime();
123 }
124
125 static int wt_send_buffer(struct wt_callback *cb) {
126   ssize_t status = 0;
127
128   status = swrite(cb->sock_fd, cb->send_buf, strlen(cb->send_buf));
129   if (status < 0) {
130     char errbuf[1024];
131     ERROR("write_tsdb plugin: send failed with status %zi (%s)", status,
132           sstrerror(errno, errbuf, sizeof(errbuf)));
133
134     close(cb->sock_fd);
135     cb->sock_fd = -1;
136
137     return -1;
138   }
139
140   return 0;
141 }
142
143 /* NOTE: You must hold cb->send_lock when calling this function! */
144 static int wt_flush_nolock(cdtime_t timeout, struct wt_callback *cb) {
145   int status;
146
147   DEBUG("write_tsdb plugin: wt_flush_nolock: timeout = %.3f; "
148         "send_buf_fill = %zu;",
149         (double)timeout, cb->send_buf_fill);
150
151   /* timeout == 0  => flush unconditionally */
152   if (timeout > 0) {
153     cdtime_t now;
154
155     now = cdtime();
156     if ((cb->send_buf_init_time + timeout) > now)
157       return 0;
158   }
159
160   if (cb->send_buf_fill == 0) {
161     cb->send_buf_init_time = cdtime();
162     return 0;
163   }
164
165   status = wt_send_buffer(cb);
166   wt_reset_buffer(cb);
167
168   return status;
169 }
170
171 static cdtime_t new_random_ttl() {
172   if (resolve_jitter == 0)
173     return 0;
174
175   return (cdtime_t)cdrand_range(0, (long)resolve_jitter);
176 }
177
178 static int wt_callback_init(struct wt_callback *cb) {
179   int status;
180   cdtime_t now;
181
182   const char *node = cb->node ? cb->node : WT_DEFAULT_NODE;
183   const char *service = cb->service ? cb->service : WT_DEFAULT_SERVICE;
184
185   if (cb->sock_fd > 0)
186     return 0;
187
188   now = cdtime();
189   if (cb->ai) {
190     /* When we are here, we still have the IP in cache.
191      * If we have remaining attempts without calling the DNS, we update the
192      * last_update date so we keep the info until next time.
193      * If there is no more attempts, we need to flush the cache.
194      */
195
196     if ((cb->ai_last_update + resolve_interval + cb->next_random_ttl) < now) {
197       cb->next_random_ttl = new_random_ttl();
198       if (cb->connect_dns_failed_attempts_remaining > 0) {
199         /* Warning : this is run under send_lock mutex.
200          * This is why we do not use another mutex here.
201          * */
202         cb->ai_last_update = now;
203         cb->connect_dns_failed_attempts_remaining--;
204       } else {
205         freeaddrinfo(cb->ai);
206         cb->ai = NULL;
207       }
208     }
209   }
210
211   if (cb->ai == NULL) {
212     if ((cb->ai_last_update + resolve_interval + cb->next_random_ttl) >= now) {
213       DEBUG("write_tsdb plugin: too many getaddrinfo(%s, %s) failures", node,
214             service);
215       return (-1);
216     }
217     cb->ai_last_update = now;
218     cb->next_random_ttl = new_random_ttl();
219
220     struct addrinfo ai_hints = {
221         .ai_family = AF_UNSPEC,
222         .ai_flags = AI_ADDRCONFIG,
223         .ai_socktype = SOCK_STREAM,
224     };
225
226     status = getaddrinfo(node, service, &ai_hints, &cb->ai);
227     if (status != 0) {
228       if (cb->ai) {
229         freeaddrinfo(cb->ai);
230         cb->ai = NULL;
231       }
232       if (cb->connect_failed_log_enabled) {
233         ERROR("write_tsdb plugin: getaddrinfo(%s, %s) failed: %s", node,
234               service, gai_strerror(status));
235         cb->connect_failed_log_enabled = 0;
236       }
237       return -1;
238     }
239   }
240
241   assert(cb->ai != NULL);
242   for (struct addrinfo *ai = cb->ai; ai != NULL; ai = ai->ai_next) {
243     cb->sock_fd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
244     if (cb->sock_fd < 0)
245       continue;
246
247     set_sock_opts(cb->sock_fd);
248
249     status = connect(cb->sock_fd, ai->ai_addr, ai->ai_addrlen);
250     if (status != 0) {
251       close(cb->sock_fd);
252       cb->sock_fd = -1;
253       continue;
254     }
255
256     break;
257   }
258
259   if (cb->sock_fd < 0) {
260     char errbuf[1024];
261     ERROR("write_tsdb plugin: Connecting to %s:%s failed. "
262           "The last error was: %s",
263           node, service, sstrerror(errno, errbuf, sizeof(errbuf)));
264     return -1;
265   }
266
267   if (0 == cb->connect_failed_log_enabled) {
268     WARNING("write_tsdb plugin: Connecting to %s:%s succeeded.", node, service);
269     cb->connect_failed_log_enabled = 1;
270   }
271   cb->connect_dns_failed_attempts_remaining = 1;
272
273   wt_reset_buffer(cb);
274
275   return 0;
276 }
277
278 static void wt_callback_free(void *data) {
279   struct wt_callback *cb;
280
281   if (data == NULL)
282     return;
283
284   cb = data;
285
286   pthread_mutex_lock(&cb->send_lock);
287
288   wt_flush_nolock(0, cb);
289
290   close(cb->sock_fd);
291   cb->sock_fd = -1;
292
293   sfree(cb->node);
294   sfree(cb->service);
295   sfree(cb->host_tags);
296
297   pthread_mutex_destroy(&cb->send_lock);
298
299   sfree(cb);
300 }
301
302 static int wt_flush(cdtime_t timeout,
303                     const char *identifier __attribute__((unused)),
304                     user_data_t *user_data) {
305   struct wt_callback *cb;
306   int status;
307
308   if (user_data == NULL)
309     return -EINVAL;
310
311   cb = user_data->data;
312
313   pthread_mutex_lock(&cb->send_lock);
314
315   if (cb->sock_fd < 0) {
316     status = wt_callback_init(cb);
317     if (status != 0) {
318       ERROR("write_tsdb plugin: wt_callback_init failed.");
319       pthread_mutex_unlock(&cb->send_lock);
320       return -1;
321     }
322   }
323
324   status = wt_flush_nolock(timeout, cb);
325   pthread_mutex_unlock(&cb->send_lock);
326
327   return status;
328 }
329
330 static int wt_format_values(char *ret, size_t ret_len, int ds_num,
331                             const data_set_t *ds, const value_list_t *vl,
332                             _Bool store_rates) {
333   size_t offset = 0;
334   int status;
335   gauge_t *rates = NULL;
336
337   assert(0 == strcmp(ds->type, vl->type));
338
339   memset(ret, 0, ret_len);
340
341 #define BUFFER_ADD(...)                                                        \
342   do {                                                                         \
343     status = ssnprintf(ret + offset, ret_len - offset, __VA_ARGS__);           \
344     if (status < 1) {                                                          \
345       sfree(rates);                                                            \
346       return -1;                                                               \
347     } else if (((size_t)status) >= (ret_len - offset)) {                       \
348       sfree(rates);                                                            \
349       return -1;                                                               \
350     } else                                                                     \
351       offset += ((size_t)status);                                              \
352   } while (0)
353
354   if (ds->ds[ds_num].type == DS_TYPE_GAUGE)
355     BUFFER_ADD(GAUGE_FORMAT, vl->values[ds_num].gauge);
356   else if (store_rates) {
357     if (rates == NULL)
358       rates = uc_get_rate(ds, vl);
359     if (rates == NULL) {
360       WARNING("format_values: "
361               "uc_get_rate failed.");
362       return -1;
363     }
364     BUFFER_ADD(GAUGE_FORMAT, rates[ds_num]);
365   } else if (ds->ds[ds_num].type == DS_TYPE_COUNTER)
366     BUFFER_ADD("%llu", vl->values[ds_num].counter);
367   else if (ds->ds[ds_num].type == DS_TYPE_DERIVE)
368     BUFFER_ADD("%" PRIi64, vl->values[ds_num].derive);
369   else if (ds->ds[ds_num].type == DS_TYPE_ABSOLUTE)
370     BUFFER_ADD("%" PRIu64, vl->values[ds_num].absolute);
371   else {
372     ERROR("format_values plugin: Unknown data source type: %i",
373           ds->ds[ds_num].type);
374     sfree(rates);
375     return -1;
376   }
377
378 #undef BUFFER_ADD
379
380   sfree(rates);
381   return 0;
382 }
383
384 static int wt_format_name(char *ret, int ret_len, const value_list_t *vl,
385                           const struct wt_callback *cb, const char *ds_name) {
386   int status;
387   char *temp = NULL;
388   const char *prefix = "";
389   const char *meta_prefix = "tsdb_prefix";
390
391   if (vl->meta) {
392     status = meta_data_get_string(vl->meta, meta_prefix, &temp);
393     if (status == -ENOENT) {
394       /* defaults to empty string */
395     } else if (status < 0) {
396       sfree(temp);
397       return status;
398     } else {
399       prefix = temp;
400     }
401   }
402
403   if (ds_name != NULL) {
404     if (vl->plugin_instance[0] == '\0') {
405       if (vl->type_instance[0] == '\0') {
406         ssnprintf(ret, ret_len, "%s%s.%s.%s", prefix, vl->plugin, vl->type,
407                   ds_name);
408       } else {
409         ssnprintf(ret, ret_len, "%s%s.%s.%s.%s", prefix, vl->plugin, vl->type,
410                   vl->type_instance, ds_name);
411       }
412     } else { /* vl->plugin_instance != "" */
413       if (vl->type_instance[0] == '\0') {
414         ssnprintf(ret, ret_len, "%s%s.%s.%s.%s", prefix, vl->plugin,
415                   vl->plugin_instance, vl->type, ds_name);
416       } else {
417         ssnprintf(ret, ret_len, "%s%s.%s.%s.%s.%s", prefix, vl->plugin,
418                   vl->plugin_instance, vl->type, vl->type_instance, ds_name);
419       }
420     }
421   } else { /* ds_name == NULL */
422     if (vl->plugin_instance[0] == '\0') {
423       if (vl->type_instance[0] == '\0') {
424         ssnprintf(ret, ret_len, "%s%s.%s", prefix, vl->plugin, vl->type);
425       } else {
426         ssnprintf(ret, ret_len, "%s%s.%s.%s", prefix, vl->plugin,
427                   vl->type_instance, vl->type);
428       }
429     } else { /* vl->plugin_instance != "" */
430       if (vl->type_instance[0] == '\0') {
431         ssnprintf(ret, ret_len, "%s%s.%s.%s", prefix, vl->plugin,
432                   vl->plugin_instance, vl->type);
433       } else {
434         ssnprintf(ret, ret_len, "%s%s.%s.%s.%s", prefix, vl->plugin,
435                   vl->plugin_instance, vl->type, vl->type_instance);
436       }
437     }
438   }
439
440   sfree(temp);
441   return 0;
442 }
443
444 static int wt_send_message(const char *key, const char *value, cdtime_t time,
445                            struct wt_callback *cb, const char *host,
446                            meta_data_t *md) {
447   int status;
448   size_t message_len;
449   char *temp = NULL;
450   const char *tags = "";
451   char message[1024];
452   const char *host_tags = cb->host_tags ? cb->host_tags : "";
453   const char *meta_tsdb = "tsdb_tags";
454
455   /* skip if value is NaN */
456   if (value[0] == 'n')
457     return 0;
458
459   if (md) {
460     status = meta_data_get_string(md, meta_tsdb, &temp);
461     if (status == -ENOENT) {
462       /* defaults to empty string */
463     } else if (status < 0) {
464       ERROR("write_tsdb plugin: tags metadata get failure");
465       sfree(temp);
466       pthread_mutex_unlock(&cb->send_lock);
467       return status;
468     } else {
469       tags = temp;
470     }
471   }
472
473   status =
474       ssnprintf(message, sizeof(message), "put %s %.0f %s fqdn=%s %s %s\r\n",
475                 key, CDTIME_T_TO_DOUBLE(time), value, host, tags, host_tags);
476   sfree(temp);
477   if (status < 0)
478     return -1;
479   message_len = (size_t)status;
480
481   if (message_len >= sizeof(message)) {
482     ERROR("write_tsdb plugin: message buffer too small: "
483           "Need %zu bytes.",
484           message_len + 1);
485     return -1;
486   }
487
488   pthread_mutex_lock(&cb->send_lock);
489
490   if (cb->sock_fd < 0) {
491     status = wt_callback_init(cb);
492     if (status != 0) {
493       ERROR("write_tsdb plugin: wt_callback_init failed.");
494       pthread_mutex_unlock(&cb->send_lock);
495       return -1;
496     }
497   }
498
499   if (message_len >= cb->send_buf_free) {
500     status = wt_flush_nolock(0, cb);
501     if (status != 0) {
502       pthread_mutex_unlock(&cb->send_lock);
503       return status;
504     }
505   }
506
507   /* Assert that we have enough space for this message. */
508   assert(message_len < cb->send_buf_free);
509
510   /* `message_len + 1' because `message_len' does not include the
511    * trailing null byte. Neither does `send_buffer_fill'. */
512   memcpy(cb->send_buf + cb->send_buf_fill, message, message_len + 1);
513   cb->send_buf_fill += message_len;
514   cb->send_buf_free -= message_len;
515
516   DEBUG("write_tsdb plugin: [%s]:%s buf %zu/%zu (%.1f %%) \"%s\"", cb->node,
517         cb->service, cb->send_buf_fill, sizeof(cb->send_buf),
518         100.0 * ((double)cb->send_buf_fill) / ((double)sizeof(cb->send_buf)),
519         message);
520
521   pthread_mutex_unlock(&cb->send_lock);
522
523   return 0;
524 }
525
526 static int wt_write_messages(const data_set_t *ds, const value_list_t *vl,
527                              struct wt_callback *cb) {
528   char key[10 * DATA_MAX_NAME_LEN];
529   char values[512];
530
531   int status;
532
533   if (0 != strcmp(ds->type, vl->type)) {
534     ERROR("write_tsdb plugin: DS type does not match "
535           "value list type");
536     return -1;
537   }
538
539   for (size_t i = 0; i < ds->ds_num; i++) {
540     const char *ds_name = NULL;
541
542     if (cb->always_append_ds || (ds->ds_num > 1))
543       ds_name = ds->ds[i].name;
544
545     /* Copy the identifier to 'key' and escape it. */
546     status = wt_format_name(key, sizeof(key), vl, cb, ds_name);
547     if (status != 0) {
548       ERROR("write_tsdb plugin: error with format_name");
549       return status;
550     }
551
552     escape_string(key, sizeof(key));
553     /* Convert the values to an ASCII representation and put that into
554      * 'values'. */
555     status =
556         wt_format_values(values, sizeof(values), i, ds, vl, cb->store_rates);
557     if (status != 0) {
558       ERROR("write_tsdb plugin: error with "
559             "wt_format_values");
560       return status;
561     }
562
563     /* Send the message to tsdb */
564     status = wt_send_message(key, values, vl->time, cb, vl->host, vl->meta);
565     if (status != 0) {
566       ERROR("write_tsdb plugin: error with "
567             "wt_send_message");
568       return status;
569     }
570   }
571
572   return 0;
573 }
574
575 static int wt_write(const data_set_t *ds, const value_list_t *vl,
576                     user_data_t *user_data) {
577   struct wt_callback *cb;
578   int status;
579
580   if (user_data == NULL)
581     return EINVAL;
582
583   cb = user_data->data;
584
585   status = wt_write_messages(ds, vl, cb);
586
587   return status;
588 }
589
590 static int wt_config_tsd(oconfig_item_t *ci) {
591   struct wt_callback *cb;
592   char callback_name[DATA_MAX_NAME_LEN];
593
594   cb = calloc(1, sizeof(*cb));
595   if (cb == NULL) {
596     ERROR("write_tsdb plugin: calloc failed.");
597     return -1;
598   }
599   cb->sock_fd = -1;
600   cb->connect_failed_log_enabled = 1;
601   cb->next_random_ttl = new_random_ttl();
602
603   pthread_mutex_init(&cb->send_lock, NULL);
604
605   for (int i = 0; i < ci->children_num; i++) {
606     oconfig_item_t *child = ci->children + i;
607
608     if (strcasecmp("Host", child->key) == 0)
609       cf_util_get_string(child, &cb->node);
610     else if (strcasecmp("Port", child->key) == 0)
611       cf_util_get_service(child, &cb->service);
612     else if (strcasecmp("HostTags", child->key) == 0)
613       cf_util_get_string(child, &cb->host_tags);
614     else if (strcasecmp("StoreRates", child->key) == 0)
615       cf_util_get_boolean(child, &cb->store_rates);
616     else if (strcasecmp("AlwaysAppendDS", child->key) == 0)
617       cf_util_get_boolean(child, &cb->always_append_ds);
618     else {
619       ERROR("write_tsdb plugin: Invalid configuration "
620             "option: %s.",
621             child->key);
622     }
623   }
624
625   ssnprintf(callback_name, sizeof(callback_name), "write_tsdb/%s/%s",
626             cb->node != NULL ? cb->node : WT_DEFAULT_NODE,
627             cb->service != NULL ? cb->service : WT_DEFAULT_SERVICE);
628
629   user_data_t user_data = {.data = cb, .free_func = wt_callback_free};
630
631   plugin_register_write(callback_name, wt_write, &user_data);
632
633   user_data.free_func = NULL;
634   plugin_register_flush(callback_name, wt_flush, &user_data);
635
636   return 0;
637 }
638
639 static int wt_config(oconfig_item_t *ci) {
640   _Bool config_random_ttl = 0;
641
642   for (int i = 0; i < ci->children_num; i++) {
643     oconfig_item_t *child = ci->children + i;
644
645     if (strcasecmp("Node", child->key) == 0)
646       wt_config_tsd(child);
647     else if (strcasecmp("ResolveInterval", child->key) == 0)
648       cf_util_get_cdtime(child, &resolve_interval);
649     else if (strcasecmp("ResolveJitter", child->key) == 0) {
650       config_random_ttl = 1;
651       cf_util_get_cdtime(child, &resolve_jitter);
652     } else {
653       ERROR("write_tsdb plugin: Invalid configuration "
654             "option: %s.",
655             child->key);
656     }
657   }
658
659   if (!config_random_ttl)
660     resolve_jitter = CDTIME_T_TO_DOUBLE(WRITE_TSDB_DEFAULT_DNS_RANDOM_TTL *
661                                         plugin_get_interval());
662
663   return 0;
664 }
665
666 void module_register(void) {
667   plugin_register_complex_config("write_tsdb", wt_config);
668 }
669
670 /* vim: set sw=4 ts=4 sts=4 tw=78 et : */