Merge branch 'collectd-5.6' into collectd-5.7
[collectd.git] / src / daemon / plugin.c
index 7a503c7..0f06737 100644 (file)
@@ -25,6 +25,9 @@
  *   Sebastian Harl <sh at tokkee.org>
  **/
 
+/* _GNU_SOURCE is needed in Linux to use pthread_setname_np */
+#define _GNU_SOURCE
+
 #include "collectd.h"
 
 #include "common.h"
 #include "utils_random.h"
 #include "utils_time.h"
 
+#if HAVE_PTHREAD_NP_H
+#include <pthread_np.h> /* for pthread_set_name_np(3) */
+#endif
+
 #include <ltdl.h>
 
 /*
@@ -105,7 +112,7 @@ static c_avl_tree_t *data_sets;
 static char *plugindir = NULL;
 
 #ifndef DEFAULT_MAX_READ_INTERVAL
-#define DEFAULT_MAX_READ_INTERVAL TIME_T_TO_CDTIME_T(86400)
+#define DEFAULT_MAX_READ_INTERVAL TIME_T_TO_CDTIME_T_STATIC(86400)
 #endif
 static c_heap_t *read_heap = NULL;
 static llist_t *read_list;
@@ -113,7 +120,7 @@ static int read_loop = 1;
 static pthread_mutex_t read_lock = PTHREAD_MUTEX_INITIALIZER;
 static pthread_cond_t read_cond = PTHREAD_COND_INITIALIZER;
 static pthread_t *read_threads = NULL;
-static int read_threads_num = 0;
+static size_t read_threads_num = 0;
 static cdtime_t max_read_interval = DEFAULT_MAX_READ_INTERVAL;
 
 static write_queue_t *write_queue_head;
@@ -146,34 +153,27 @@ static const char *plugin_get_dir(void) {
     return (plugindir);
 }
 
-static void plugin_update_internal_statistics(void) { /* {{{ */
-  derive_t copy_write_queue_length;
-  value_list_t vl = VALUE_LIST_INIT;
-  value_t values[2];
-
-  copy_write_queue_length = write_queue_length;
+static int plugin_update_internal_statistics(void) { /* {{{ */
+  gauge_t copy_write_queue_length = (gauge_t)write_queue_length;
 
   /* Initialize `vl' */
-  vl.values = values;
-  vl.values_len = 2;
-  vl.time = 0;
-  sstrncpy(vl.host, hostname_g, sizeof(vl.host));
+  value_list_t vl = VALUE_LIST_INIT;
   sstrncpy(vl.plugin, "collectd", sizeof(vl.plugin));
-
-  vl.type_instance[0] = 0;
-  vl.values_len = 1;
+  vl.interval = plugin_get_interval();
 
   /* Write queue */
   sstrncpy(vl.plugin_instance, "write_queue", sizeof(vl.plugin_instance));
 
   /* Write queue : queue length */
-  vl.values[0].gauge = (gauge_t)copy_write_queue_length;
+  vl.values = &(value_t){.gauge = copy_write_queue_length};
+  vl.values_len = 1;
   sstrncpy(vl.type, "queue_length", sizeof(vl.type));
   vl.type_instance[0] = 0;
   plugin_dispatch_values(&vl);
 
   /* Write queue : Values dropped (queue length > low limit) */
-  vl.values[0].derive = (derive_t)stats_values_dropped;
+  vl.values = &(value_t){.gauge = (gauge_t)stats_values_dropped};
+  vl.values_len = 1;
   sstrncpy(vl.type, "derive", sizeof(vl.type));
   sstrncpy(vl.type_instance, "dropped", sizeof(vl.type_instance));
   plugin_dispatch_values(&vl);
@@ -182,13 +182,14 @@ static void plugin_update_internal_statistics(void) { /* {{{ */
   sstrncpy(vl.plugin_instance, "cache", sizeof(vl.plugin_instance));
 
   /* Cache : Nb entry in cache tree */
-  vl.values[0].gauge = (gauge_t)uc_get_size();
+  vl.values = &(value_t){.gauge = (gauge_t)uc_get_size()};
+  vl.values_len = 1;
   sstrncpy(vl.type, "cache_size", sizeof(vl.type));
   vl.type_instance[0] = 0;
   plugin_dispatch_values(&vl);
 
-  return;
-} /* }}} void plugin_update_internal_statistics */
+  return 0;
+} /* }}} int plugin_update_internal_statistics */
 
 static void destroy_callback(callback_func_t *cf) /* {{{ */
 {
@@ -486,11 +487,8 @@ static void *plugin_read_thread(void __attribute__((unused)) * args) {
      * pthread_cond_timedwait returns. */
     rc = 0;
     while ((read_loop != 0) && (cdtime() < rf->rf_next_read) && rc == 0) {
-      struct timespec ts = {0};
-
-      CDTIME_T_TO_TIMESPEC(rf->rf_next_read, &ts);
-
-      rc = pthread_cond_timedwait(&read_cond, &read_lock, &ts);
+      rc = pthread_cond_timedwait(&read_cond, &read_lock,
+                                  &CDTIME_T_TO_TIMESPEC(rf->rf_next_read));
     }
 
     /* Must hold `read_lock' when accessing `rf->rf_type'. */
@@ -601,7 +599,38 @@ static void *plugin_read_thread(void __attribute__((unused)) * args) {
   return ((void *)0);
 } /* void *plugin_read_thread */
 
-static void start_read_threads(int num) {
+#ifdef PTHREAD_MAX_NAMELEN_NP
+#define THREAD_NAME_MAX PTHREAD_MAX_NAMELEN_NP
+#else
+#define THREAD_NAME_MAX 16
+#endif
+
+static void set_thread_name(pthread_t tid, char const *name) {
+#if defined(HAVE_PTHREAD_SETNAME_NP) || defined(HAVE_PTHREAD_SET_NAME_NP)
+
+  /* glibc limits the length of the name and fails if the passed string
+   * is too long, so we truncate it here. */
+  char n[THREAD_NAME_MAX];
+  if (strlen(name) >= THREAD_NAME_MAX)
+    WARNING("set_thread_name(\"%s\"): name too long", name);
+  sstrncpy(n, name, sizeof(n));
+
+#if defined(HAVE_PTHREAD_SETNAME_NP)
+  int status = pthread_setname_np(tid, n);
+  if (status != 0) {
+    char errbuf[1024];
+    ERROR("set_thread_name(\"%s\"): %s", n,
+          sstrerror(status, errbuf, sizeof(errbuf)));
+  }
+#else /* if defined(HAVE_PTHREAD_SET_NAME_NP) */
+  pthread_set_name_np(tid, n);
+#endif
+
+#endif
+}
+
+static void start_read_threads(size_t num) /* {{{ */
+{
   if (read_threads != NULL)
     return;
 
@@ -612,22 +641,31 @@ static void start_read_threads(int num) {
   }
 
   read_threads_num = 0;
-  for (int i = 0; i < num; i++) {
-    if (pthread_create(read_threads + read_threads_num, NULL,
-                       plugin_read_thread, NULL) == 0) {
-      read_threads_num++;
-    } else {
-      ERROR("plugin: start_read_threads: pthread_create failed.");
+  for (size_t i = 0; i < num; i++) {
+    int status = pthread_create(read_threads + read_threads_num,
+                                /* attr = */ NULL, plugin_read_thread,
+                                /* arg = */ NULL);
+    if (status != 0) {
+      char errbuf[1024];
+      ERROR("plugin: start_read_threads: pthread_create failed "
+            "with status %i (%s).",
+            status, sstrerror(status, errbuf, sizeof(errbuf)));
       return;
     }
+
+    char name[THREAD_NAME_MAX];
+    ssnprintf(name, sizeof(name), "reader#%zu", read_threads_num);
+    set_thread_name(read_threads[read_threads_num], name);
+
+    read_threads_num++;
   } /* for (i) */
-} /* void start_read_threads */
+} /* }}} void start_read_threads */
 
 static void stop_read_threads(void) {
   if (read_threads == NULL)
     return;
 
-  INFO("collectd: Stopping %i read threads.", read_threads_num);
+  INFO("collectd: Stopping %zu read threads.", read_threads_num);
 
   pthread_mutex_lock(&read_lock);
   read_loop = 0;
@@ -635,7 +673,7 @@ static void stop_read_threads(void) {
   pthread_cond_broadcast(&read_cond);
   pthread_mutex_unlock(&read_lock);
 
-  for (int i = 0; i < read_threads_num; i++) {
+  for (size_t i = 0; i < read_threads_num; i++) {
     if (pthread_join(read_threads[i], NULL) != 0) {
       ERROR("plugin: stop_read_threads: pthread_join failed.");
     }
@@ -668,6 +706,9 @@ plugin_value_list_clone(value_list_t const *vl_orig) /* {{{ */
     return (NULL);
   memcpy(vl, vl_orig, sizeof(*vl));
 
+  if (vl->host[0] == 0)
+    sstrncpy(vl->host, hostname_g, sizeof(vl->host));
+
   vl->values = calloc(vl_orig->values_len, sizeof(*vl->values));
   if (vl->values == NULL) {
     plugin_value_list_free(vl);
@@ -808,11 +849,9 @@ static void start_write_threads(size_t num) /* {{{ */
 
   write_threads_num = 0;
   for (size_t i = 0; i < num; i++) {
-    int status;
-
-    status = pthread_create(write_threads + write_threads_num,
-                            /* attr = */ NULL, plugin_write_thread,
-                            /* arg = */ NULL);
+    int status = pthread_create(write_threads + write_threads_num,
+                                /* attr = */ NULL, plugin_write_thread,
+                                /* arg = */ NULL);
     if (status != 0) {
       char errbuf[1024];
       ERROR("plugin: start_write_threads: pthread_create failed "
@@ -821,6 +860,10 @@ static void start_write_threads(size_t num) /* {{{ */
       return;
     }
 
+    char name[THREAD_NAME_MAX];
+    ssnprintf(name, sizeof(name), "writer#%zu", write_threads_num);
+    set_thread_name(write_threads[write_threads_num], name);
+
     write_threads_num++;
   } /* for (i) */
 } /* }}} void start_write_threads */
@@ -1528,8 +1571,10 @@ int plugin_init_all(void) {
   /* Init the value cache */
   uc_init();
 
-  if (IS_TRUE(global_option_get("CollectInternalStats")))
+  if (IS_TRUE(global_option_get("CollectInternalStats"))) {
     record_statistics = 1;
+    plugin_register_read("collectd", plugin_update_internal_statistics);
+  }
 
   chain_name = global_option_get("PreCacheChain");
   pre_cache_chain = fc_chain_get_by_name(chain_name);
@@ -1610,16 +1655,13 @@ int plugin_init_all(void) {
     rt = global_option_get("ReadThreads");
     num = atoi(rt);
     if (num != -1)
-      start_read_threads((num > 0) ? num : 5);
+      start_read_threads((num > 0) ? ((size_t)num) : 5);
   }
   return ret;
 } /* void plugin_init_all */
 
 /* TODO: Rename this function. */
 void plugin_read_all(void) {
-  if (record_statistics) {
-    plugin_update_internal_statistics();
-  }
   uc_check_timeout();
 
   return;
@@ -1882,15 +1924,16 @@ static int plugin_dispatch_values_internal(value_list_t *vl) {
   int status;
   static c_complain_t no_write_complaint = C_COMPLAIN_INIT_STATIC;
 
-  value_t *saved_values;
-  int saved_values_len;
-
   data_set_t *ds;
 
-  int free_meta_data = 0;
+  _Bool free_meta_data = 0;
+
+  assert(vl != NULL);
 
-  assert(vl);
-  assert(vl->plugin);
+  /* These fields are initialized by plugin_value_list_clone() if needed: */
+  assert(vl->host[0] != 0);
+  assert(vl->time != 0); /* The time is determined at _enqueue_ time. */
+  assert(vl->interval != 0);
 
   if (vl->type[0] == 0 || vl->values == NULL || vl->values_len < 1) {
     ERROR("plugin_dispatch_values: Invalid value list "
@@ -1928,11 +1971,6 @@ static int plugin_dispatch_values_internal(value_list_t *vl) {
     return (-1);
   }
 
-  /* Assured by plugin_value_list_clone(). The time is determined at
-   * _enqueue_ time. */
-  assert(vl->time != 0);
-  assert(vl->interval != 0);
-
   DEBUG("plugin_dispatch_values: time = %.3f; interval = %.3f; "
         "host = %s; "
         "plugin = %s; plugin_instance = %s; "
@@ -1966,26 +2004,6 @@ static int plugin_dispatch_values_internal(value_list_t *vl) {
   escape_slashes(vl->type, sizeof(vl->type));
   escape_slashes(vl->type_instance, sizeof(vl->type_instance));
 
-  /* Copy the values. This way, we can assure `targets' that they get
-   * dynamically allocated values, which they can free and replace if
-   * they like. */
-  if ((pre_cache_chain != NULL) || (post_cache_chain != NULL)) {
-    saved_values = vl->values;
-    saved_values_len = vl->values_len;
-
-    vl->values = (value_t *)calloc(vl->values_len, sizeof(*vl->values));
-    if (vl->values == NULL) {
-      ERROR("plugin_dispatch_values: calloc failed.");
-      vl->values = saved_values;
-      return (-1);
-    }
-    memcpy(vl->values, saved_values, vl->values_len * sizeof(*vl->values));
-  } else /* if ((pre == NULL) && (post == NULL)) */
-  {
-    saved_values = NULL;
-    saved_values_len = 0;
-  }
-
   if (pre_cache_chain != NULL) {
     status = fc_process_chain(ds, vl, pre_cache_chain);
     if (status < 0) {
@@ -1993,16 +2011,8 @@ static int plugin_dispatch_values_internal(value_list_t *vl) {
               "pre-cache chain failed with "
               "status %i (%#x).",
               status, status);
-    } else if (status == FC_TARGET_STOP) {
-      /* Restore the state of the value_list so that plugins
-       * don't get confused.. */
-      if (saved_values != NULL) {
-        sfree(vl->values);
-        vl->values = saved_values;
-        vl->values_len = saved_values_len;
-      }
+    } else if (status == FC_TARGET_STOP)
       return (0);
-    }
   }
 
   /* Update the value cache */
@@ -2019,14 +2029,6 @@ static int plugin_dispatch_values_internal(value_list_t *vl) {
   } else
     fc_default_action(ds, vl);
 
-  /* Restore the state of the value_list so that plugins don't get
-   * confused.. */
-  if (saved_values != NULL) {
-    sfree(vl->values);
-    vl->values = saved_values;
-    vl->values_len = saved_values_len;
-  }
-
   if ((free_meta_data != 0) && (vl->meta != NULL)) {
     meta_data_destroy(vl->meta);
     vl->meta = NULL;
@@ -2565,18 +2567,28 @@ static void *plugin_thread_start(void *arg) {
 } /* void *plugin_thread_start */
 
 int plugin_thread_create(pthread_t *thread, const pthread_attr_t *attr,
-                         void *(*start_routine)(void *), void *arg) {
+                         void *(*start_routine)(void *), void *arg,
+                         char const *name) {
   plugin_thread_t *plugin_thread;
 
   plugin_thread = malloc(sizeof(*plugin_thread));
   if (plugin_thread == NULL)
-    return -1;
+    return ENOMEM;
 
   plugin_thread->ctx = plugin_get_ctx();
   plugin_thread->start_routine = start_routine;
   plugin_thread->arg = arg;
 
-  return pthread_create(thread, attr, plugin_thread_start, plugin_thread);
+  int ret = pthread_create(thread, attr, plugin_thread_start, plugin_thread);
+  if (ret != 0) {
+    sfree(plugin_thread);
+    return ret;
+  }
+
+  if (name != NULL)
+    set_thread_name(*thread, name);
+
+  return 0;
 } /* int plugin_thread_create */
 
 /* vim: set sw=8 ts=8 noet fdm=marker : */