src/daemon/plugin.c: Address review comments.
[collectd.git] / src / daemon / plugin.c
index 50ab675..d54abee 100644 (file)
@@ -25,6 +25,9 @@
  *   Sebastian Harl <sh at tokkee.org>
  **/
 
+/* _GNU_SOURCE is needed in Linux to use pthread_setname_np */
+#define _GNU_SOURCE
+
 #include "collectd.h"
 
 #include "common.h"
 #include "utils_time.h"
 #include "utils_random.h"
 
+#if HAVE_PTHREAD_NP_H
+# include <pthread_np.h> /* for pthread_set_name_np(3) */
+#endif
+
 #include <ltdl.h>
 
 /*
@@ -108,7 +115,7 @@ static c_avl_tree_t *data_sets;
 static char *plugindir = NULL;
 
 #ifndef DEFAULT_MAX_READ_INTERVAL
-# define DEFAULT_MAX_READ_INTERVAL TIME_T_TO_CDTIME_T (86400)
+# define DEFAULT_MAX_READ_INTERVAL TIME_T_TO_CDTIME_T_STATIC (86400)
 #endif
 static c_heap_t       *read_heap = NULL;
 static llist_t        *read_list;
@@ -116,7 +123,7 @@ static int             read_loop = 1;
 static pthread_mutex_t read_lock = PTHREAD_MUTEX_INITIALIZER;
 static pthread_cond_t  read_cond = PTHREAD_COND_INITIALIZER;
 static pthread_t      *read_threads = NULL;
-static int             read_threads_num = 0;
+static size_t          read_threads_num = 0;
 static cdtime_t        max_read_interval = DEFAULT_MAX_READ_INTERVAL;
 
 static write_queue_t  *write_queue_head;
@@ -360,7 +367,7 @@ static void log_list_callbacks (llist_t **list, /* {{{ */
 } /* }}} void log_list_callbacks */
 
 static int create_register_callback (llist_t **list, /* {{{ */
-               const char *name, void *callback, user_data_t *ud)
+               const char *name, void *callback, user_data_t const *ud)
 {
        callback_func_t *cf;
 
@@ -522,12 +529,8 @@ static void *plugin_read_thread (void __attribute__((unused)) *args)
                                && (cdtime () < rf->rf_next_read)
                                && rc == 0)
                {
-                       struct timespec ts = { 0 };
-
-                       CDTIME_T_TO_TIMESPEC (rf->rf_next_read, &ts);
-
                        rc = pthread_cond_timedwait (&read_cond, &read_lock,
-                               &ts);
+                               &CDTIME_T_TO_TIMESPEC (rf->rf_next_read));
                }
 
                /* Must hold `read_lock' when accessing `rf->rf_type'. */
@@ -648,7 +651,38 @@ static void *plugin_read_thread (void __attribute__((unused)) *args)
        return ((void *) 0);
 } /* void *plugin_read_thread */
 
-static void start_read_threads (int num)
+#ifdef PTHREAD_MAX_NAMELEN_NP
+# define THREAD_NAME_MAX PTHREAD_MAX_NAMELEN_NP
+#else
+# define THREAD_NAME_MAX 16
+#endif
+
+static void set_thread_name(pthread_t tid, char const *name) {
+#if defined(HAVE_PTHREAD_SETNAME_NP) || defined(HAVE_PTHREAD_SET_NAME_NP)
+
+       /* glibc limits the length of the name and fails if the passed string
+        * is too long, so we truncate it here. */
+       char n[THREAD_NAME_MAX];
+       if (strlen (name) >= THREAD_NAME_MAX)
+               WARNING("set_thread_name(\"%s\"): name too long", name);
+       sstrncpy (n, name, sizeof(n));
+
+#if defined(HAVE_PTHREAD_SETNAME_NP)
+       int status = pthread_setname_np (tid, n);
+       if (status != 0)
+       {
+               char errbuf[1024];
+               ERROR ("set_thread_name(\"%s\"): %s", n,
+                               sstrerror (status, errbuf, sizeof(errbuf)));
+       }
+#else /* if defined(HAVE_PTHREAD_SET_NAME_NP) */
+       pthread_set_name_np (tid, n);
+#endif
+
+#endif
+}
+
+static void start_read_threads (size_t num) /* {{{ */
 {
        if (read_threads != NULL)
                return;
@@ -661,27 +695,35 @@ static void start_read_threads (int num)
        }
 
        read_threads_num = 0;
-       for (int i = 0; i < num; i++)
+       for (size_t i = 0; i < num; i++)
        {
-               if (pthread_create (read_threads + read_threads_num, NULL,
-                                       plugin_read_thread, NULL) == 0)
-               {
-                       read_threads_num++;
-               }
-               else
+               int status = pthread_create (read_threads + read_threads_num,
+                               /* attr = */ NULL,
+                               plugin_read_thread,
+                               /* arg = */ NULL);
+               if (status != 0)
                {
-                       ERROR ("plugin: start_read_threads: pthread_create failed.");
+                       char errbuf[1024];
+                       ERROR ("plugin: start_read_threads: pthread_create failed "
+                                       "with status %i (%s).", status,
+                                       sstrerror (status, errbuf, sizeof (errbuf)));
                        return;
                }
+
+               char name[THREAD_NAME_MAX];
+               ssnprintf (name, sizeof (name), "reader#%zu", read_threads_num);
+               set_thread_name (read_threads[read_threads_num], name);
+
+               read_threads_num++;
        } /* for (i) */
-} /* void start_read_threads */
+} /* }}} void start_read_threads */
 
 static void stop_read_threads (void)
 {
        if (read_threads == NULL)
                return;
 
-       INFO ("collectd: Stopping %i read threads.", read_threads_num);
+       INFO ("collectd: Stopping %zu read threads.", read_threads_num);
 
        pthread_mutex_lock (&read_lock);
        read_loop = 0;
@@ -689,7 +731,7 @@ static void stop_read_threads (void)
        pthread_cond_broadcast (&read_cond);
        pthread_mutex_unlock (&read_lock);
 
-       for (int i = 0; i < read_threads_num; i++)
+       for (size_t i = 0; i < read_threads_num; i++)
        {
                if (pthread_join (read_threads[i], NULL) != 0)
                {
@@ -877,9 +919,7 @@ static void start_write_threads (size_t num) /* {{{ */
        write_threads_num = 0;
        for (size_t i = 0; i < num; i++)
        {
-               int status;
-
-               status = pthread_create (write_threads + write_threads_num,
+               int status = pthread_create (write_threads + write_threads_num,
                                /* attr = */ NULL,
                                plugin_write_thread,
                                /* arg = */ NULL);
@@ -892,6 +932,10 @@ static void start_write_threads (size_t num) /* {{{ */
                        return;
                }
 
+               char name[THREAD_NAME_MAX];
+               ssnprintf (name, sizeof (name), "writer#%zu", write_threads_num);
+               set_thread_name (write_threads[write_threads_num], name);
+
                write_threads_num++;
        } /* for (i) */
 } /* }}} void start_write_threads */
@@ -1264,7 +1308,7 @@ int plugin_register_read (const char *name,
 int plugin_register_complex_read (const char *group, const char *name,
                plugin_read_cb callback,
                cdtime_t interval,
-               user_data_t *user_data)
+               user_data_t const *user_data)
 {
        read_func_t *rf;
        int status;
@@ -1308,7 +1352,7 @@ int plugin_register_complex_read (const char *group, const char *name,
 } /* int plugin_register_complex_read */
 
 int plugin_register_write (const char *name,
-               plugin_write_cb callback, user_data_t *ud)
+               plugin_write_cb callback, user_data_t const *ud)
 {
        return (create_register_callback (&list_write, name,
                                (void *) callback, ud));
@@ -1355,7 +1399,7 @@ static char *plugin_flush_callback_name (const char *name)
 } /* static char *plugin_flush_callback_name */
 
 int plugin_register_flush (const char *name,
-               plugin_flush_cb callback, user_data_t *ud)
+               plugin_flush_cb callback, user_data_t const *ud)
 {
        int status;
        plugin_ctx_t ctx = plugin_get_ctx ();
@@ -1392,15 +1436,15 @@ int plugin_register_flush (const char *name,
                }
                cb->timeout = ctx.flush_timeout;
 
-               ud->data = cb;
-               ud->free_func = plugin_flush_timeout_callback_free;
-
                status = plugin_register_complex_read (
                        /* group     = */ "flush",
                        /* name      = */ flush_name,
                        /* callback  = */ plugin_flush_timeout_callback,
                        /* interval  = */ ctx.flush_interval,
-                       /* user data = */ ud);
+                       /* user data = */ &(user_data_t) {
+                               .data = cb,
+                               .free_func = plugin_flush_timeout_callback_free,
+                       });
 
                sfree (flush_name);
                if (status != 0)
@@ -1415,7 +1459,7 @@ int plugin_register_flush (const char *name,
 } /* int plugin_register_flush */
 
 int plugin_register_missing (const char *name,
-               plugin_missing_cb callback, user_data_t *ud)
+               plugin_missing_cb callback, user_data_t const *ud)
 {
        return (create_register_callback (&list_missing, name,
                                (void *) callback, ud));
@@ -1486,14 +1530,14 @@ int plugin_register_data_set (const data_set_t *ds)
 } /* int plugin_register_data_set */
 
 int plugin_register_log (const char *name,
-               plugin_log_cb callback, user_data_t *ud)
+               plugin_log_cb callback, user_data_t const *ud)
 {
        return (create_register_callback (&list_log, name,
                                (void *) callback, ud));
 } /* int plugin_register_log */
 
 int plugin_register_notification (const char *name,
-               plugin_notification_cb callback, user_data_t *ud)
+               plugin_notification_cb callback, user_data_t const *ud)
 {
        return (create_register_callback (&list_notification, name,
                                (void *) callback, ud));
@@ -1781,7 +1825,7 @@ int plugin_init_all (void)
                rt = global_option_get ("ReadThreads");
                num = atoi (rt);
                if (num != -1)
-                       start_read_threads ((num > 0) ? num : 5);
+                       start_read_threads ((num > 0) ? ((size_t) num) : 5);
        }
        return ret;
 } /* void plugin_init_all */
@@ -2793,20 +2837,30 @@ static void *plugin_thread_start (void *arg)
 } /* void *plugin_thread_start */
 
 int plugin_thread_create (pthread_t *thread, const pthread_attr_t *attr,
-               void *(*start_routine) (void *), void *arg)
+               void *(*start_routine) (void *), void *arg, char const *name)
 {
        plugin_thread_t *plugin_thread;
 
        plugin_thread = malloc (sizeof (*plugin_thread));
        if (plugin_thread == NULL)
-               return -1;
+               return ENOMEM;
 
        plugin_thread->ctx           = plugin_get_ctx ();
        plugin_thread->start_routine = start_routine;
        plugin_thread->arg           = arg;
 
-       return pthread_create (thread, attr,
+       int ret = pthread_create (thread, attr,
                        plugin_thread_start, plugin_thread);
+       if (ret != 0)
+       {
+               sfree (plugin_thread);
+               return ret;
+       }
+
+       if (name != NULL)
+               set_thread_name (*thread, name);
+
+       return 0;
 } /* int plugin_thread_create */
 
 /* vim: set sw=8 ts=8 noet fdm=marker : */