Merge branch 'collectd-5.7' into collectd-5.8
[collectd.git] / src / write_tsdb.c
index d1d65ca..5c43eda 100644 (file)
@@ -45,8 +45,8 @@
 
 #include "common.h"
 #include "plugin.h"
-
 #include "utils_cache.h"
+#include "utils_random.h"
 
 #include <netdb.h>
 
@@ -71,8 +71,8 @@
  * Private variables
  */
 struct wt_callback {
-  struct addrinfo *sock_info;
-  cdtime_t sock_info_last_update;
+  struct addrinfo *ai;
+  cdtime_t ai_last_update;
   int sock_fd;
 
   char *node;
@@ -88,9 +88,14 @@ struct wt_callback {
   cdtime_t send_buf_init_time;
 
   pthread_mutex_t send_lock;
+
+  _Bool connect_failed_log_enabled;
+  int connect_dns_failed_attempts_remaining;
+  cdtime_t next_random_ttl;
 };
 
-static cdtime_t dnsttl = TIME_T_TO_CDTIME_T_STATIC(60);
+static cdtime_t resolve_interval = 0;
+static cdtime_t resolve_jitter = 0;
 
 /*
  * Functions
@@ -106,7 +111,7 @@ static int wt_send_buffer(struct wt_callback *cb) {
   ssize_t status = 0;
 
   status = swrite(cb->sock_fd, cb->send_buf, strlen(cb->send_buf));
-  if (status < 0) {
+  if (status != 0) {
     char errbuf[1024];
     ERROR("write_tsdb plugin: send failed with status %zi (%s)", status,
           sstrerror(errno, errbuf, sizeof(errbuf)));
@@ -148,6 +153,13 @@ static int wt_flush_nolock(cdtime_t timeout, struct wt_callback *cb) {
   return status;
 }
 
+static cdtime_t new_random_ttl() {
+  if (resolve_jitter == 0)
+    return 0;
+
+  return (cdtime_t)cdrand_range(0, (long)resolve_jitter);
+}
+
 static int wt_callback_init(struct wt_callback *cb) {
   int status;
   cdtime_t now;
@@ -159,35 +171,51 @@ static int wt_callback_init(struct wt_callback *cb) {
     return 0;
 
   now = cdtime();
-  if ((cb->sock_info_last_update + dnsttl) < now) {
-    if (cb->sock_info) {
-      freeaddrinfo(cb->sock_info);
-      cb->sock_info = NULL;
+  if (cb->ai) {
+    /* When we are here, we still have the IP in cache.
+     * If we have remaining attempts without calling the DNS, we update the
+     * last_update date so we keep the info until next time.
+     * If there is no more attempts, we need to flush the cache.
+     */
+
+    if ((cb->ai_last_update + resolve_interval + cb->next_random_ttl) < now) {
+      cb->next_random_ttl = new_random_ttl();
+      if (cb->connect_dns_failed_attempts_remaining > 0) {
+        /* Warning : this is run under send_lock mutex.
+         * This is why we do not use another mutex here.
+         * */
+        cb->ai_last_update = now;
+        cb->connect_dns_failed_attempts_remaining--;
+      } else {
+        freeaddrinfo(cb->ai);
+        cb->ai = NULL;
+      }
     }
   }
 
-  if (NULL == cb->sock_info) {
+  if (cb->ai == NULL) {
+    if ((cb->ai_last_update + resolve_interval + cb->next_random_ttl) >= now) {
+      DEBUG("write_tsdb plugin: too many getaddrinfo(%s, %s) failures", node,
+            service);
+      return -1;
+    }
+    cb->ai_last_update = now;
+    cb->next_random_ttl = new_random_ttl();
+
     struct addrinfo ai_hints = {
         .ai_family = AF_UNSPEC,
         .ai_flags = AI_ADDRCONFIG,
         .ai_socktype = SOCK_STREAM,
     };
 
-    if ((cb->sock_info_last_update + dnsttl) >= now) {
-      DEBUG("write_tsdb plugin: too many getaddrinfo (%s, %s) failures", node,
-            service);
-      return (-1);
-    }
-
-    cb->sock_info_last_update = now;
-    status = getaddrinfo(node, service, &ai_hints, &(cb->sock_info));
+    status = getaddrinfo(node, service, &ai_hints, &cb->ai);
     if (status != 0) {
-      if (cb->sock_info) {
-        freeaddrinfo(cb->sock_info);
-        cb->sock_info = NULL;
+      if (cb->ai) {
+        freeaddrinfo(cb->ai);
+        cb->ai = NULL;
       }
       if (cb->connect_failed_log_enabled) {
-        ERROR("write_tsdb plugin: getaddrinfo (%s, %s) failed: %s", node,
+        ERROR("write_tsdb plugin: getaddrinfo(%s, %s) failed: %s", node,
               service, gai_strerror(status));
         cb->connect_failed_log_enabled = 0;
       }
@@ -195,17 +223,15 @@ static int wt_callback_init(struct wt_callback *cb) {
     }
   }
 
-  assert(cb->sock_info != NULL);
-  for (struct addrinfo *ai_ptr = cb->sock_info; ai_ptr != NULL;
-       ai_ptr = ai_ptr->ai_next) {
-    cb->sock_fd =
-        socket(ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
+  assert(cb->ai != NULL);
+  for (struct addrinfo *ai = cb->ai; ai != NULL; ai = ai->ai_next) {
+    cb->sock_fd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
     if (cb->sock_fd < 0)
       continue;
 
     set_sock_opts(cb->sock_fd);
 
-    status = connect(cb->sock_fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
+    status = connect(cb->sock_fd, ai->ai_addr, ai->ai_addrlen);
     if (status != 0) {
       close(cb->sock_fd);
       cb->sock_fd = -1;
@@ -223,6 +249,12 @@ static int wt_callback_init(struct wt_callback *cb) {
     return -1;
   }
 
+  if (0 == cb->connect_failed_log_enabled) {
+    WARNING("write_tsdb plugin: Connecting to %s:%s succeeded.", node, service);
+    cb->connect_failed_log_enabled = 1;
+  }
+  cb->connect_dns_failed_attempts_remaining = 1;
+
   wt_reset_buffer(cb);
 
   return 0;
@@ -247,6 +279,7 @@ static void wt_callback_free(void *data) {
   sfree(cb->service);
   sfree(cb->host_tags);
 
+  pthread_mutex_unlock(&cb->send_lock);
   pthread_mutex_destroy(&cb->send_lock);
 
   sfree(cb);
@@ -293,7 +326,7 @@ static int wt_format_values(char *ret, size_t ret_len, int ds_num,
 
 #define BUFFER_ADD(...)                                                        \
   do {                                                                         \
-    status = ssnprintf(ret + offset, ret_len - offset, __VA_ARGS__);           \
+    status = snprintf(ret + offset, ret_len - offset, __VA_ARGS__);            \
     if (status < 1) {                                                          \
       sfree(rates);                                                            \
       return -1;                                                               \
@@ -356,36 +389,36 @@ static int wt_format_name(char *ret, int ret_len, const value_list_t *vl,
   if (ds_name != NULL) {
     if (vl->plugin_instance[0] == '\0') {
       if (vl->type_instance[0] == '\0') {
-        ssnprintf(ret, ret_len, "%s%s.%s.%s", prefix, vl->plugin, vl->type,
-                  ds_name);
+        snprintf(ret, ret_len, "%s%s.%s.%s", prefix, vl->plugin, vl->type,
+                 ds_name);
       } else {
-        ssnprintf(ret, ret_len, "%s%s.%s.%s.%s", prefix, vl->plugin, vl->type,
-                  vl->type_instance, ds_name);
+        snprintf(ret, ret_len, "%s%s.%s.%s.%s", prefix, vl->plugin, vl->type,
+                 vl->type_instance, ds_name);
       }
     } else { /* vl->plugin_instance != "" */
       if (vl->type_instance[0] == '\0') {
-        ssnprintf(ret, ret_len, "%s%s.%s.%s.%s", prefix, vl->plugin,
-                  vl->plugin_instance, vl->type, ds_name);
+        snprintf(ret, ret_len, "%s%s.%s.%s.%s", prefix, vl->plugin,
+                 vl->plugin_instance, vl->type, ds_name);
       } else {
-        ssnprintf(ret, ret_len, "%s%s.%s.%s.%s.%s", prefix, vl->plugin,
-                  vl->plugin_instance, vl->type, vl->type_instance, ds_name);
+        snprintf(ret, ret_len, "%s%s.%s.%s.%s.%s", prefix, vl->plugin,
+                 vl->plugin_instance, vl->type, vl->type_instance, ds_name);
       }
     }
   } else { /* ds_name == NULL */
     if (vl->plugin_instance[0] == '\0') {
       if (vl->type_instance[0] == '\0') {
-        ssnprintf(ret, ret_len, "%s%s.%s", prefix, vl->plugin, vl->type);
+        snprintf(ret, ret_len, "%s%s.%s", prefix, vl->plugin, vl->type);
       } else {
-        ssnprintf(ret, ret_len, "%s%s.%s.%s", prefix, vl->plugin,
-                  vl->type_instance, vl->type);
+        snprintf(ret, ret_len, "%s%s.%s.%s", prefix, vl->plugin,
+                 vl->type_instance, vl->type);
       }
     } else { /* vl->plugin_instance != "" */
       if (vl->type_instance[0] == '\0') {
-        ssnprintf(ret, ret_len, "%s%s.%s.%s", prefix, vl->plugin,
-                  vl->plugin_instance, vl->type);
+        snprintf(ret, ret_len, "%s%s.%s.%s", prefix, vl->plugin,
+                 vl->plugin_instance, vl->type);
       } else {
-        ssnprintf(ret, ret_len, "%s%s.%s.%s.%s", prefix, vl->plugin,
-                  vl->plugin_instance, vl->type, vl->type_instance);
+        snprintf(ret, ret_len, "%s%s.%s.%s.%s", prefix, vl->plugin,
+                 vl->plugin_instance, vl->type, vl->type_instance);
       }
     }
   }
@@ -424,8 +457,8 @@ static int wt_send_message(const char *key, const char *value, cdtime_t time,
   }
 
   status =
-      ssnprintf(message, sizeof(message), "put %s %.0f %s fqdn=%s %s %s\r\n",
-                key, CDTIME_T_TO_DOUBLE(time), value, host, tags, host_tags);
+      snprintf(message, sizeof(message), "put %s %.0f %s fqdn=%s %s %s\r\n",
+               key, CDTIME_T_TO_DOUBLE(time), value, host, tags, host_tags);
   sfree(temp);
   if (status < 0)
     return -1;
@@ -550,6 +583,8 @@ static int wt_config_tsd(oconfig_item_t *ci) {
     return -1;
   }
   cb->sock_fd = -1;
+  cb->connect_failed_log_enabled = 1;
+  cb->next_random_ttl = new_random_ttl();
 
   pthread_mutex_init(&cb->send_lock, NULL);
 
@@ -573,9 +608,9 @@ static int wt_config_tsd(oconfig_item_t *ci) {
     }
   }
 
-  ssnprintf(callback_name, sizeof(callback_name), "write_tsdb/%s/%s",
-            cb->node != NULL ? cb->node : WT_DEFAULT_NODE,
-            cb->service != NULL ? cb->service : WT_DEFAULT_SERVICE);
+  snprintf(callback_name, sizeof(callback_name), "write_tsdb/%s/%s",
+           cb->node != NULL ? cb->node : WT_DEFAULT_NODE,
+           cb->service != NULL ? cb->service : WT_DEFAULT_SERVICE);
 
   user_data_t user_data = {.data = cb, .free_func = wt_callback_free};
 
@@ -588,16 +623,19 @@ static int wt_config_tsd(oconfig_item_t *ci) {
 }
 
 static int wt_config(oconfig_item_t *ci) {
+  if ((resolve_interval == 0) && (resolve_jitter == 0))
+    resolve_interval = resolve_jitter = plugin_get_interval();
+
   for (int i = 0; i < ci->children_num; i++) {
     oconfig_item_t *child = ci->children + i;
 
     if (strcasecmp("Node", child->key) == 0)
       wt_config_tsd(child);
-    if (strcasecmp("DNS_Cache_TTL", child->key) == 0) {
-      int ttl;
-      cf_util_get_int(child, &ttl);
-      dnsttl = TIME_T_TO_CDTIME_T(ttl);
-    else {
+    else if (strcasecmp("ResolveInterval", child->key) == 0)
+      cf_util_get_cdtime(child, &resolve_interval);
+    else if (strcasecmp("ResolveJitter", child->key) == 0)
+      cf_util_get_cdtime(child, &resolve_jitter);
+    else {
       ERROR("write_tsdb plugin: Invalid configuration "
             "option: %s.",
             child->key);
@@ -610,5 +648,3 @@ static int wt_config(oconfig_item_t *ci) {
 void module_register(void) {
   plugin_register_complex_config("write_tsdb", wt_config);
 }
-
-/* vim: set sw=4 ts=4 sts=4 tw=78 et : */