X-Git-Url: https://git.octo.it/?p=collectd.git;a=blobdiff_plain;f=src%2Fwrite_tsdb.c;h=349b0d0c6fd2d353d745438045f52b61627ecd09;hp=af8276bd950519867750b4e779b1e205f1ff662e;hb=a9e50e9e30ecde17e167e271060c8183bfcbf407;hpb=b8bcabba154bec2e9f33f830cc588b4e57841699 diff --git a/src/write_tsdb.c b/src/write_tsdb.c index af8276bd..349b0d0c 100644 --- a/src/write_tsdb.c +++ b/src/write_tsdb.c @@ -45,8 +45,8 @@ #include "common.h" #include "plugin.h" - #include "utils_cache.h" +#include "utils_random.h" #include @@ -67,26 +67,12 @@ #define WT_SEND_BUF_SIZE 1428 #endif -/* Default configuration */ - -/* WRITE_TSDB_DEFAULT_DNS_TTL is the time we keep the dns cached info - * (seconds) - */ -#define WRITE_TSDB_DEFAULT_DNS_TTL 600 - -/* WRITE_TSDB_DEFAULT_DNS_RANDOM_TTL helps define the max random - * time we keep the dns cached info : - * min = 0 - * max = WRITE_TSDB_DEFAULT_DNS_RANDOM_TTL * get_plugin_interval() - */ -#define WRITE_TSDB_DEFAULT_DNS_RANDOM_TTL 15 - /* * Private variables */ struct wt_callback { - struct addrinfo *sock_info; - cdtime_t sock_info_last_update; + struct addrinfo *ai; + cdtime_t ai_last_update; int sock_fd; char *node; @@ -108,9 +94,8 @@ struct wt_callback { cdtime_t next_random_ttl; }; -static cdtime_t dnsttl = TIME_T_TO_CDTIME_T_STATIC(WRITE_TSDB_DEFAULT_DNS_TTL); -static double dnsrandomttl = .0; -static _Bool use_dnsrandomttl = 0; +static cdtime_t resolve_interval = 0; +static cdtime_t resolve_jitter = 0; /* * Functions @@ -126,10 +111,9 @@ static int wt_send_buffer(struct wt_callback *cb) { ssize_t status = 0; status = swrite(cb->sock_fd, cb->send_buf, strlen(cb->send_buf)); - if (status < 0) { - char errbuf[1024]; + if (status != 0) { ERROR("write_tsdb plugin: send failed with status %zi (%s)", status, - sstrerror(errno, errbuf, sizeof(errbuf))); + STRERRNO); close(cb->sock_fd); cb->sock_fd = -1; @@ -145,7 +129,7 @@ static int wt_flush_nolock(cdtime_t timeout, struct wt_callback *cb) { int status; DEBUG("write_tsdb plugin: wt_flush_nolock: timeout = %.3f; " - "send_buf_fill = %zu;", + "send_buf_fill = %" PRIsz ";", (double)timeout, cb->send_buf_fill); /* timeout == 0 => flush unconditionally */ @@ -169,12 +153,10 @@ static int wt_flush_nolock(cdtime_t timeout, struct wt_callback *cb) { } static cdtime_t new_random_ttl() { - time_t ttl = 0; - if (use_dnsrandomttl) { - ttl = (time_t)(dnsrandomttl * ((double)random()) / - (((double)RAND_MAX) + 1.0)); - } - return TIME_T_TO_CDTIME_T(ttl); + if (resolve_jitter == 0) + return 0; + + return (cdtime_t)cdrand_range(0, (long)resolve_jitter); } static int wt_callback_init(struct wt_callback *cb) { @@ -188,51 +170,51 @@ static int wt_callback_init(struct wt_callback *cb) { return 0; now = cdtime(); - if (cb->sock_info) { + if (cb->ai) { /* When we are here, we still have the IP in cache. * If we have remaining attempts without calling the DNS, we update the * last_update date so we keep the info until next time. * If there is no more attempts, we need to flush the cache. */ - if ((cb->sock_info_last_update + dnsttl + cb->next_random_ttl) < now) { + if ((cb->ai_last_update + resolve_interval + cb->next_random_ttl) < now) { cb->next_random_ttl = new_random_ttl(); if (cb->connect_dns_failed_attempts_remaining > 0) { /* Warning : this is run under send_lock mutex. * This is why we do not use another mutex here. * */ - cb->sock_info_last_update = now; + cb->ai_last_update = now; cb->connect_dns_failed_attempts_remaining--; } else { - freeaddrinfo(cb->sock_info); - cb->sock_info = NULL; + freeaddrinfo(cb->ai); + cb->ai = NULL; } } } - if (NULL == cb->sock_info) { + if (cb->ai == NULL) { + if ((cb->ai_last_update + resolve_interval + cb->next_random_ttl) >= now) { + DEBUG("write_tsdb plugin: too many getaddrinfo(%s, %s) failures", node, + service); + return -1; + } + cb->ai_last_update = now; + cb->next_random_ttl = new_random_ttl(); + struct addrinfo ai_hints = { .ai_family = AF_UNSPEC, .ai_flags = AI_ADDRCONFIG, .ai_socktype = SOCK_STREAM, }; - if ((cb->sock_info_last_update + dnsttl + cb->next_random_ttl) >= now) { - DEBUG("write_tsdb plugin: too many getaddrinfo (%s, %s) failures", node, - service); - return (-1); - } - - cb->sock_info_last_update = now; - cb->next_random_ttl = new_random_ttl(); - status = getaddrinfo(node, service, &ai_hints, &(cb->sock_info)); + status = getaddrinfo(node, service, &ai_hints, &cb->ai); if (status != 0) { - if (cb->sock_info) { - freeaddrinfo(cb->sock_info); - cb->sock_info = NULL; + if (cb->ai) { + freeaddrinfo(cb->ai); + cb->ai = NULL; } if (cb->connect_failed_log_enabled) { - ERROR("write_tsdb plugin: getaddrinfo (%s, %s) failed: %s", node, + ERROR("write_tsdb plugin: getaddrinfo(%s, %s) failed: %s", node, service, gai_strerror(status)); cb->connect_failed_log_enabled = 0; } @@ -240,17 +222,15 @@ static int wt_callback_init(struct wt_callback *cb) { } } - assert(cb->sock_info != NULL); - for (struct addrinfo *ai_ptr = cb->sock_info; ai_ptr != NULL; - ai_ptr = ai_ptr->ai_next) { - cb->sock_fd = - socket(ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol); + assert(cb->ai != NULL); + for (struct addrinfo *ai = cb->ai; ai != NULL; ai = ai->ai_next) { + cb->sock_fd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol); if (cb->sock_fd < 0) continue; set_sock_opts(cb->sock_fd); - status = connect(cb->sock_fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen); + status = connect(cb->sock_fd, ai->ai_addr, ai->ai_addrlen); if (status != 0) { close(cb->sock_fd); cb->sock_fd = -1; @@ -261,10 +241,9 @@ static int wt_callback_init(struct wt_callback *cb) { } if (cb->sock_fd < 0) { - char errbuf[1024]; ERROR("write_tsdb plugin: Connecting to %s:%s failed. " "The last error was: %s", - node, service, sstrerror(errno, errbuf, sizeof(errbuf))); + node, service, STRERRNO); return -1; } @@ -298,6 +277,7 @@ static void wt_callback_free(void *data) { sfree(cb->service); sfree(cb->host_tags); + pthread_mutex_unlock(&cb->send_lock); pthread_mutex_destroy(&cb->send_lock); sfree(cb); @@ -344,7 +324,7 @@ static int wt_format_values(char *ret, size_t ret_len, int ds_num, #define BUFFER_ADD(...) \ do { \ - status = ssnprintf(ret + offset, ret_len - offset, __VA_ARGS__); \ + status = snprintf(ret + offset, ret_len - offset, __VA_ARGS__); \ if (status < 1) { \ sfree(rates); \ return -1; \ @@ -367,7 +347,7 @@ static int wt_format_values(char *ret, size_t ret_len, int ds_num, } BUFFER_ADD(GAUGE_FORMAT, rates[ds_num]); } else if (ds->ds[ds_num].type == DS_TYPE_COUNTER) - BUFFER_ADD("%llu", vl->values[ds_num].counter); + BUFFER_ADD("%" PRIu64, (uint64_t)vl->values[ds_num].counter); else if (ds->ds[ds_num].type == DS_TYPE_DERIVE) BUFFER_ADD("%" PRIi64, vl->values[ds_num].derive); else if (ds->ds[ds_num].type == DS_TYPE_ABSOLUTE) @@ -407,36 +387,36 @@ static int wt_format_name(char *ret, int ret_len, const value_list_t *vl, if (ds_name != NULL) { if (vl->plugin_instance[0] == '\0') { if (vl->type_instance[0] == '\0') { - ssnprintf(ret, ret_len, "%s%s.%s.%s", prefix, vl->plugin, vl->type, - ds_name); + snprintf(ret, ret_len, "%s%s.%s.%s", prefix, vl->plugin, vl->type, + ds_name); } else { - ssnprintf(ret, ret_len, "%s%s.%s.%s.%s", prefix, vl->plugin, vl->type, - vl->type_instance, ds_name); + snprintf(ret, ret_len, "%s%s.%s.%s.%s", prefix, vl->plugin, vl->type, + vl->type_instance, ds_name); } } else { /* vl->plugin_instance != "" */ if (vl->type_instance[0] == '\0') { - ssnprintf(ret, ret_len, "%s%s.%s.%s.%s", prefix, vl->plugin, - vl->plugin_instance, vl->type, ds_name); + snprintf(ret, ret_len, "%s%s.%s.%s.%s", prefix, vl->plugin, + vl->plugin_instance, vl->type, ds_name); } else { - ssnprintf(ret, ret_len, "%s%s.%s.%s.%s.%s", prefix, vl->plugin, - vl->plugin_instance, vl->type, vl->type_instance, ds_name); + snprintf(ret, ret_len, "%s%s.%s.%s.%s.%s", prefix, vl->plugin, + vl->plugin_instance, vl->type, vl->type_instance, ds_name); } } } else { /* ds_name == NULL */ if (vl->plugin_instance[0] == '\0') { if (vl->type_instance[0] == '\0') { - ssnprintf(ret, ret_len, "%s%s.%s", prefix, vl->plugin, vl->type); + snprintf(ret, ret_len, "%s%s.%s", prefix, vl->plugin, vl->type); } else { - ssnprintf(ret, ret_len, "%s%s.%s.%s", prefix, vl->plugin, - vl->type_instance, vl->type); + snprintf(ret, ret_len, "%s%s.%s.%s", prefix, vl->plugin, + vl->type_instance, vl->type); } } else { /* vl->plugin_instance != "" */ if (vl->type_instance[0] == '\0') { - ssnprintf(ret, ret_len, "%s%s.%s.%s", prefix, vl->plugin, - vl->plugin_instance, vl->type); + snprintf(ret, ret_len, "%s%s.%s.%s", prefix, vl->plugin, + vl->plugin_instance, vl->type); } else { - ssnprintf(ret, ret_len, "%s%s.%s.%s.%s", prefix, vl->plugin, - vl->plugin_instance, vl->type, vl->type_instance); + snprintf(ret, ret_len, "%s%s.%s.%s.%s", prefix, vl->plugin, + vl->plugin_instance, vl->type, vl->type_instance); } } } @@ -475,8 +455,8 @@ static int wt_send_message(const char *key, const char *value, cdtime_t time, } status = - ssnprintf(message, sizeof(message), "put %s %.0f %s fqdn=%s %s %s\r\n", - key, CDTIME_T_TO_DOUBLE(time), value, host, tags, host_tags); + snprintf(message, sizeof(message), "put %s %.0f %s fqdn=%s %s %s\r\n", + key, CDTIME_T_TO_DOUBLE(time), value, host, tags, host_tags); sfree(temp); if (status < 0) return -1; @@ -484,7 +464,7 @@ static int wt_send_message(const char *key, const char *value, cdtime_t time, if (message_len >= sizeof(message)) { ERROR("write_tsdb plugin: message buffer too small: " - "Need %zu bytes.", + "Need %" PRIsz " bytes.", message_len + 1); return -1; } @@ -517,8 +497,8 @@ static int wt_send_message(const char *key, const char *value, cdtime_t time, cb->send_buf_fill += message_len; cb->send_buf_free -= message_len; - DEBUG("write_tsdb plugin: [%s]:%s buf %zu/%zu (%.1f %%) \"%s\"", cb->node, - cb->service, cb->send_buf_fill, sizeof(cb->send_buf), + DEBUG("write_tsdb plugin: [%s]:%s buf %" PRIsz "/%" PRIsz " (%.1f %%) \"%s\"", + cb->node, cb->service, cb->send_buf_fill, sizeof(cb->send_buf), 100.0 * ((double)cb->send_buf_fill) / ((double)sizeof(cb->send_buf)), message); @@ -626,9 +606,9 @@ static int wt_config_tsd(oconfig_item_t *ci) { } } - ssnprintf(callback_name, sizeof(callback_name), "write_tsdb/%s/%s", - cb->node != NULL ? cb->node : WT_DEFAULT_NODE, - cb->service != NULL ? cb->service : WT_DEFAULT_SERVICE); + snprintf(callback_name, sizeof(callback_name), "write_tsdb/%s/%s", + cb->node != NULL ? cb->node : WT_DEFAULT_NODE, + cb->service != NULL ? cb->service : WT_DEFAULT_SERVICE); user_data_t user_data = {.data = cb, .free_func = wt_callback_free}; @@ -641,45 +621,28 @@ static int wt_config_tsd(oconfig_item_t *ci) { } static int wt_config(oconfig_item_t *ci) { - _Bool config_random_ttl = 0; + if ((resolve_interval == 0) && (resolve_jitter == 0)) + resolve_interval = resolve_jitter = plugin_get_interval(); for (int i = 0; i < ci->children_num; i++) { oconfig_item_t *child = ci->children + i; if (strcasecmp("Node", child->key) == 0) wt_config_tsd(child); - else if (strcasecmp("DNS_Cache_TTL", child->key) == 0) { - int ttl; - cf_util_get_int(child, &ttl); - dnsttl = TIME_T_TO_CDTIME_T(ttl); - } else if (strcasecmp("DNS_Random_Cache_TTL", child->key) == 0) { - int ttl; - cf_util_get_int(child, &ttl); - config_random_ttl = 1; - if (ttl) { - dnsrandomttl = (double)ttl; - use_dnsrandomttl = 1; - } else { - use_dnsrandomttl = 0; - } - } else { + else if (strcasecmp("ResolveInterval", child->key) == 0) + cf_util_get_cdtime(child, &resolve_interval); + else if (strcasecmp("ResolveJitter", child->key) == 0) + cf_util_get_cdtime(child, &resolve_jitter); + else { ERROR("write_tsdb plugin: Invalid configuration " "option: %s.", child->key); } } - if (!config_random_ttl) { - use_dnsrandomttl = 1; - dnsrandomttl = CDTIME_T_TO_DOUBLE(WRITE_TSDB_DEFAULT_DNS_RANDOM_TTL * - plugin_get_interval()); - } - return 0; } void module_register(void) { plugin_register_complex_config("write_tsdb", wt_config); } - -/* vim: set sw=4 ts=4 sts=4 tw=78 et : */