X-Git-Url: https://git.octo.it/?p=collectd.git;a=blobdiff_plain;f=src%2Fwrite_tsdb.c;h=5c43eda3d2eb9de2b69d2027a54cf01b94f55e8c;hp=429c159757c8047016892ff18cc5ef8d56a371ec;hb=7111bb6df7628edce3a8e538b386fbe27633a191;hpb=c6baff42e7e8e547eb7ca7817c0e0e07ccea95db diff --git a/src/write_tsdb.c b/src/write_tsdb.c index 429c1597..5c43eda3 100644 --- a/src/write_tsdb.c +++ b/src/write_tsdb.c @@ -45,8 +45,8 @@ #include "common.h" #include "plugin.h" - #include "utils_cache.h" +#include "utils_random.h" #include @@ -71,6 +71,8 @@ * Private variables */ struct wt_callback { + struct addrinfo *ai; + cdtime_t ai_last_update; int sock_fd; char *node; @@ -86,8 +88,15 @@ struct wt_callback { cdtime_t send_buf_init_time; pthread_mutex_t send_lock; + + _Bool connect_failed_log_enabled; + int connect_dns_failed_attempts_remaining; + cdtime_t next_random_ttl; }; +static cdtime_t resolve_interval = 0; +static cdtime_t resolve_jitter = 0; + /* * Functions */ @@ -102,7 +111,7 @@ static int wt_send_buffer(struct wt_callback *cb) { ssize_t status = 0; status = swrite(cb->sock_fd, cb->send_buf, strlen(cb->send_buf)); - if (status < 0) { + if (status != 0) { char errbuf[1024]; ERROR("write_tsdb plugin: send failed with status %zi (%s)", status, sstrerror(errno, errbuf, sizeof(errbuf))); @@ -144,9 +153,16 @@ static int wt_flush_nolock(cdtime_t timeout, struct wt_callback *cb) { return status; } +static cdtime_t new_random_ttl() { + if (resolve_jitter == 0) + return 0; + + return (cdtime_t)cdrand_range(0, (long)resolve_jitter); +} + static int wt_callback_init(struct wt_callback *cb) { - struct addrinfo *ai_list; int status; + cdtime_t now; const char *node = cb->node ? cb->node : WT_DEFAULT_NODE; const char *service = cb->service ? cb->service : WT_DEFAULT_SERVICE; @@ -154,28 +170,68 @@ static int wt_callback_init(struct wt_callback *cb) { if (cb->sock_fd > 0) return 0; - struct addrinfo ai_hints = {.ai_family = AF_UNSPEC, - .ai_flags = AI_ADDRCONFIG, - .ai_socktype = SOCK_STREAM}; + now = cdtime(); + if (cb->ai) { + /* When we are here, we still have the IP in cache. + * If we have remaining attempts without calling the DNS, we update the + * last_update date so we keep the info until next time. + * If there is no more attempts, we need to flush the cache. + */ + + if ((cb->ai_last_update + resolve_interval + cb->next_random_ttl) < now) { + cb->next_random_ttl = new_random_ttl(); + if (cb->connect_dns_failed_attempts_remaining > 0) { + /* Warning : this is run under send_lock mutex. + * This is why we do not use another mutex here. + * */ + cb->ai_last_update = now; + cb->connect_dns_failed_attempts_remaining--; + } else { + freeaddrinfo(cb->ai); + cb->ai = NULL; + } + } + } - status = getaddrinfo(node, service, &ai_hints, &ai_list); - if (status != 0) { - ERROR("write_tsdb plugin: getaddrinfo (%s, %s) failed: %s", node, service, - gai_strerror(status)); - return -1; + if (cb->ai == NULL) { + if ((cb->ai_last_update + resolve_interval + cb->next_random_ttl) >= now) { + DEBUG("write_tsdb plugin: too many getaddrinfo(%s, %s) failures", node, + service); + return -1; + } + cb->ai_last_update = now; + cb->next_random_ttl = new_random_ttl(); + + struct addrinfo ai_hints = { + .ai_family = AF_UNSPEC, + .ai_flags = AI_ADDRCONFIG, + .ai_socktype = SOCK_STREAM, + }; + + status = getaddrinfo(node, service, &ai_hints, &cb->ai); + if (status != 0) { + if (cb->ai) { + freeaddrinfo(cb->ai); + cb->ai = NULL; + } + if (cb->connect_failed_log_enabled) { + ERROR("write_tsdb plugin: getaddrinfo(%s, %s) failed: %s", node, + service, gai_strerror(status)); + cb->connect_failed_log_enabled = 0; + } + return -1; + } } - assert(ai_list != NULL); - for (struct addrinfo *ai_ptr = ai_list; ai_ptr != NULL; - ai_ptr = ai_ptr->ai_next) { - cb->sock_fd = - socket(ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol); + assert(cb->ai != NULL); + for (struct addrinfo *ai = cb->ai; ai != NULL; ai = ai->ai_next) { + cb->sock_fd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol); if (cb->sock_fd < 0) continue; set_sock_opts(cb->sock_fd); - status = connect(cb->sock_fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen); + status = connect(cb->sock_fd, ai->ai_addr, ai->ai_addrlen); if (status != 0) { close(cb->sock_fd); cb->sock_fd = -1; @@ -185,8 +241,6 @@ static int wt_callback_init(struct wt_callback *cb) { break; } - freeaddrinfo(ai_list); - if (cb->sock_fd < 0) { char errbuf[1024]; ERROR("write_tsdb plugin: Connecting to %s:%s failed. " @@ -195,6 +249,12 @@ static int wt_callback_init(struct wt_callback *cb) { return -1; } + if (0 == cb->connect_failed_log_enabled) { + WARNING("write_tsdb plugin: Connecting to %s:%s succeeded.", node, service); + cb->connect_failed_log_enabled = 1; + } + cb->connect_dns_failed_attempts_remaining = 1; + wt_reset_buffer(cb); return 0; @@ -219,6 +279,7 @@ static void wt_callback_free(void *data) { sfree(cb->service); sfree(cb->host_tags); + pthread_mutex_unlock(&cb->send_lock); pthread_mutex_destroy(&cb->send_lock); sfree(cb); @@ -265,7 +326,7 @@ static int wt_format_values(char *ret, size_t ret_len, int ds_num, #define BUFFER_ADD(...) \ do { \ - status = ssnprintf(ret + offset, ret_len - offset, __VA_ARGS__); \ + status = snprintf(ret + offset, ret_len - offset, __VA_ARGS__); \ if (status < 1) { \ sfree(rates); \ return -1; \ @@ -328,36 +389,36 @@ static int wt_format_name(char *ret, int ret_len, const value_list_t *vl, if (ds_name != NULL) { if (vl->plugin_instance[0] == '\0') { if (vl->type_instance[0] == '\0') { - ssnprintf(ret, ret_len, "%s%s.%s.%s", prefix, vl->plugin, vl->type, - ds_name); + snprintf(ret, ret_len, "%s%s.%s.%s", prefix, vl->plugin, vl->type, + ds_name); } else { - ssnprintf(ret, ret_len, "%s%s.%s.%s.%s", prefix, vl->plugin, vl->type, - vl->type_instance, ds_name); + snprintf(ret, ret_len, "%s%s.%s.%s.%s", prefix, vl->plugin, vl->type, + vl->type_instance, ds_name); } } else { /* vl->plugin_instance != "" */ if (vl->type_instance[0] == '\0') { - ssnprintf(ret, ret_len, "%s%s.%s.%s.%s", prefix, vl->plugin, - vl->plugin_instance, vl->type, ds_name); + snprintf(ret, ret_len, "%s%s.%s.%s.%s", prefix, vl->plugin, + vl->plugin_instance, vl->type, ds_name); } else { - ssnprintf(ret, ret_len, "%s%s.%s.%s.%s.%s", prefix, vl->plugin, - vl->plugin_instance, vl->type, vl->type_instance, ds_name); + snprintf(ret, ret_len, "%s%s.%s.%s.%s.%s", prefix, vl->plugin, + vl->plugin_instance, vl->type, vl->type_instance, ds_name); } } } else { /* ds_name == NULL */ if (vl->plugin_instance[0] == '\0') { if (vl->type_instance[0] == '\0') { - ssnprintf(ret, ret_len, "%s%s.%s", prefix, vl->plugin, vl->type); + snprintf(ret, ret_len, "%s%s.%s", prefix, vl->plugin, vl->type); } else { - ssnprintf(ret, ret_len, "%s%s.%s.%s", prefix, vl->plugin, - vl->type_instance, vl->type); + snprintf(ret, ret_len, "%s%s.%s.%s", prefix, vl->plugin, + vl->type_instance, vl->type); } } else { /* vl->plugin_instance != "" */ if (vl->type_instance[0] == '\0') { - ssnprintf(ret, ret_len, "%s%s.%s.%s", prefix, vl->plugin, - vl->plugin_instance, vl->type); + snprintf(ret, ret_len, "%s%s.%s.%s", prefix, vl->plugin, + vl->plugin_instance, vl->type); } else { - ssnprintf(ret, ret_len, "%s%s.%s.%s.%s", prefix, vl->plugin, - vl->plugin_instance, vl->type, vl->type_instance); + snprintf(ret, ret_len, "%s%s.%s.%s.%s", prefix, vl->plugin, + vl->plugin_instance, vl->type, vl->type_instance); } } } @@ -396,8 +457,8 @@ static int wt_send_message(const char *key, const char *value, cdtime_t time, } status = - ssnprintf(message, sizeof(message), "put %s %.0f %s fqdn=%s %s %s\r\n", - key, CDTIME_T_TO_DOUBLE(time), value, host, tags, host_tags); + snprintf(message, sizeof(message), "put %s %.0f %s fqdn=%s %s %s\r\n", + key, CDTIME_T_TO_DOUBLE(time), value, host, tags, host_tags); sfree(temp); if (status < 0) return -1; @@ -522,10 +583,8 @@ static int wt_config_tsd(oconfig_item_t *ci) { return -1; } cb->sock_fd = -1; - cb->node = NULL; - cb->service = NULL; - cb->host_tags = NULL; - cb->store_rates = 0; + cb->connect_failed_log_enabled = 1; + cb->next_random_ttl = new_random_ttl(); pthread_mutex_init(&cb->send_lock, NULL); @@ -549,9 +608,9 @@ static int wt_config_tsd(oconfig_item_t *ci) { } } - ssnprintf(callback_name, sizeof(callback_name), "write_tsdb/%s/%s", - cb->node != NULL ? cb->node : WT_DEFAULT_NODE, - cb->service != NULL ? cb->service : WT_DEFAULT_SERVICE); + snprintf(callback_name, sizeof(callback_name), "write_tsdb/%s/%s", + cb->node != NULL ? cb->node : WT_DEFAULT_NODE, + cb->service != NULL ? cb->service : WT_DEFAULT_SERVICE); user_data_t user_data = {.data = cb, .free_func = wt_callback_free}; @@ -564,11 +623,18 @@ static int wt_config_tsd(oconfig_item_t *ci) { } static int wt_config(oconfig_item_t *ci) { + if ((resolve_interval == 0) && (resolve_jitter == 0)) + resolve_interval = resolve_jitter = plugin_get_interval(); + for (int i = 0; i < ci->children_num; i++) { oconfig_item_t *child = ci->children + i; if (strcasecmp("Node", child->key) == 0) wt_config_tsd(child); + else if (strcasecmp("ResolveInterval", child->key) == 0) + cf_util_get_cdtime(child, &resolve_interval); + else if (strcasecmp("ResolveJitter", child->key) == 0) + cf_util_get_cdtime(child, &resolve_jitter); else { ERROR("write_tsdb plugin: Invalid configuration " "option: %s.",