src/plugin.[ch]: Use a pool of write threads to dispatch values to write plugins.
authorFlorian Forster <octo@collectd.org>
Mon, 21 Jan 2013 09:27:55 +0000 (10:27 +0100)
committerFlorian Forster <octo@collectd.org>
Mon, 21 Jan 2013 09:27:55 +0000 (10:27 +0100)
This fixes Github issue #75.

src/collectd.conf.in
src/collectd.conf.pod
src/configfile.c
src/plugin.c
src/plugin.h

index dead93b..e7428fb 100644 (file)
@@ -28,6 +28,7 @@
 
 #Timeout      2
 #ReadThreads  5
+#WriteThreads 5
 
 ##############################################################################
 # Logging                                                                    #
index 01346e1..74a8cfc 100644 (file)
@@ -181,8 +181,14 @@ see L<collectd-threshold(5)> for details.
 
 Number of threads to start for reading plugins. The default value is B<5>, but
 you may want to increase this if you have more than five plugins that take a
-long time to read. Mostly those are plugin that do network-IO. Setting this to
-a value higher than the number of plugins you've loaded is totally useless.
+long time to read. Mostly those are plugins that do network-IO. Setting this to
+a value higher than the number of registered read callbacks is not recommended.
+
+=item B<WriteThreads> I<Num>
+
+Number of threads to start for dispatching value lists to write plugins. The
+default value is B<5>, but you may want to increase this if you have more than
+five plugins that may take relatively long to write to.
 
 =item B<Hostname> I<Name>
 
index b16ae47..ac5e8ed 100644 (file)
@@ -108,6 +108,7 @@ static cf_global_option_t cf_global_options[] =
        {"FQDNLookup",  NULL, "true"},
        {"Interval",    NULL, NULL},
        {"ReadThreads", NULL, "5"},
+       {"WriteThreads", NULL, "5"},
        {"Timeout",     NULL, "2"},
        {"PreCacheChain",  NULL, "PreCache"},
        {"PostCacheChain", NULL, "PostCache"}
index bbede05..0aa33b1 100644 (file)
@@ -1,6 +1,6 @@
 /**
  * collectd - src/plugin.c
- * Copyright (C) 2005-2011  Florian octo Forster
+ * Copyright (C) 2005-2013  Florian octo Forster
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License as published by the
@@ -69,6 +69,14 @@ struct read_func_s
 };
 typedef struct read_func_s read_func_t;
 
+struct write_queue_s;
+typedef struct write_queue_s write_queue_t;
+struct write_queue_s
+{
+       value_list_t *vl;
+       write_queue_t *next;
+};
+
 /*
  * Private variables
  */
@@ -95,12 +103,22 @@ static pthread_cond_t  read_cond = PTHREAD_COND_INITIALIZER;
 static pthread_t      *read_threads = NULL;
 static int             read_threads_num = 0;
 
+static write_queue_t  *write_queue_head;
+static write_queue_t  *write_queue_tail;
+static _Bool           write_loop = 1;
+static pthread_mutex_t write_lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t  write_cond = PTHREAD_COND_INITIALIZER;
+static pthread_t      *write_threads = NULL;
+static size_t          write_threads_num = 0;
+
 static pthread_key_t   plugin_ctx_key;
 static _Bool           plugin_ctx_key_initialized = 0;
 
 /*
  * Static functions
  */
+static int plugin_dispatch_values_internal (value_list_t *vl);
+
 static const char *plugin_get_dir (void)
 {
        if (plugindir == NULL)
@@ -573,6 +591,210 @@ static void stop_read_threads (void)
        read_threads_num = 0;
 } /* void stop_read_threads */
 
+static void plugin_value_list_free (value_list_t *vl) /* {{{ */
+{
+       if (vl == NULL)
+               return;
+
+       meta_data_destroy (vl->meta);
+       sfree (vl->values);
+       sfree (vl);
+} /* }}} void plugin_value_list_free */
+
+static value_list_t *plugin_value_list_clone (value_list_t const *vl_orig) /* {{{ */
+{
+       value_list_t *vl;
+
+       if (vl_orig == NULL)
+               return (NULL);
+
+       vl = malloc (sizeof (*vl));
+       if (vl == NULL)
+               return (NULL);
+       memcpy (vl, vl_orig, sizeof (*vl));
+
+       vl->values = calloc (vl_orig->values_len, sizeof (*vl->values));
+       if (vl->values == NULL)
+       {
+               plugin_value_list_free (vl);
+               return (NULL);
+       }
+       memcpy (vl->values, vl_orig->values,
+                       vl_orig->values_len * sizeof (*vl->values));
+
+       vl->meta = meta_data_clone (vl->meta);
+       if ((vl_orig->meta != NULL) && (vl->meta == NULL))
+       {
+               plugin_value_list_free (vl);
+               return (NULL);
+       }
+
+       return (vl);
+} /* }}} value_list_t *plugin_value_list_clone */
+
+static int plugin_write_enqueue (value_list_t const *vl) /* {{{ */
+{
+       write_queue_t *q;
+
+       q = malloc (sizeof (*q));
+       if (q == NULL)
+               return (ENOMEM);
+       q->next = NULL;
+
+       q->vl = plugin_value_list_clone (vl);
+       if (q->vl == NULL)
+       {
+               sfree (q);
+               return (ENOMEM);
+       }
+
+       pthread_mutex_lock (&write_lock);
+
+       if (write_queue_tail == NULL)
+       {
+               write_queue_head = q;
+               write_queue_tail = q;
+       }
+       else
+       {
+               write_queue_tail->next = q;
+               write_queue_tail = q;
+       }
+
+       pthread_cond_signal (&write_cond);
+       pthread_mutex_unlock (&write_lock);
+
+       return (0);
+} /* }}} int plugin_write_enqueue */
+
+static value_list_t *plugin_write_dequeue (void) /* {{{ */
+{
+       write_queue_t *q;
+       value_list_t *vl;
+
+       pthread_mutex_lock (&write_lock);
+
+       while (write_loop && (write_queue_head == NULL))
+               pthread_cond_wait (&write_cond, &write_lock);
+
+       if (write_queue_head == NULL)
+       {
+               pthread_mutex_unlock (&write_lock);
+               return (NULL);
+       }
+
+       q = write_queue_head;
+       write_queue_head = q->next;
+       if (write_queue_head == NULL)
+               write_queue_tail = NULL;
+
+       pthread_mutex_unlock (&write_lock);
+
+       vl = q->vl;
+       sfree (q);
+       return (vl);
+} /* }}} value_list_t *plugin_write_dequeue */
+
+static void *plugin_write_thread (void __attribute__((unused)) *args) /* {{{ */
+{
+       while (write_loop)
+       {
+               value_list_t *vl = plugin_write_dequeue ();
+               if (vl == NULL)
+                       continue;
+
+               plugin_dispatch_values_internal (vl);
+
+               plugin_value_list_free (vl);
+       }
+
+       pthread_exit (NULL);
+       return ((void *) 0);
+} /* }}} void *plugin_write_thread */
+
+static void start_write_threads (size_t num) /* {{{ */
+{
+       size_t i;
+
+       if (write_threads != NULL)
+               return;
+
+       write_threads = (pthread_t *) calloc (num, sizeof (pthread_t));
+       if (write_threads == NULL)
+       {
+               ERROR ("plugin: start_write_threads: calloc failed.");
+               return;
+       }
+
+       write_threads_num = 0;
+       for (i = 0; i < num; i++)
+       {
+               int status;
+
+               status = pthread_create (write_threads + write_threads_num,
+                               /* attr = */ NULL,
+                               plugin_write_thread,
+                               /* arg = */ NULL);
+               if (status != 0)
+               {
+                       char errbuf[1024];
+                       ERROR ("plugin: start_write_threads: pthread_create failed "
+                                       "with status %i (%s).", status,
+                                       sstrerror (status, errbuf, sizeof (errbuf)));
+                       return;
+               }
+
+               write_threads_num++;
+       } /* for (i) */
+} /* }}} void start_write_threads */
+
+static void stop_write_threads (void) /* {{{ */
+{
+       write_queue_t *q;
+       int i;
+
+       if (write_threads == NULL)
+               return;
+
+       INFO ("collectd: Stopping %zu write threads.", write_threads_num);
+
+       pthread_mutex_lock (&write_lock);
+       write_loop = 0;
+       DEBUG ("plugin: stop_write_threads: Signalling `write_cond'");
+       pthread_cond_broadcast (&write_cond);
+       pthread_mutex_unlock (&write_lock);
+
+       for (i = 0; i < write_threads_num; i++)
+       {
+               if (pthread_join (write_threads[i], NULL) != 0)
+               {
+                       ERROR ("plugin: stop_write_threads: pthread_join failed.");
+               }
+               write_threads[i] = (pthread_t) 0;
+       }
+       sfree (write_threads);
+       write_threads_num = 0;
+
+       pthread_mutex_lock (&write_lock);
+       i = 0;
+       for (q = write_queue_head; q != NULL; q = q->next)
+       {
+               plugin_value_list_free (q->vl);
+               sfree (q);
+               i++;
+       }
+       write_queue_head = NULL;
+       write_queue_tail = NULL;
+       pthread_mutex_unlock (&write_lock);
+
+       if (i > 0)
+       {
+               WARNING ("plugin: %i value list%s left after shutting down "
+                               "the write threads.",
+                               i, (i == 1) ? " was" : "s were");
+       }
+} /* }}} void stop_write_threads */
+
 /*
  * Public functions
  */
@@ -1166,6 +1388,15 @@ void plugin_init_all (void)
        chain_name = global_option_get ("PostCacheChain");
        post_cache_chain = fc_chain_get_by_name (chain_name);
 
+       {
+               char const *tmp = global_option_get ("WriteThreads");
+               int num = atoi (tmp);
+
+               if (num < 1)
+                       num = 5;
+
+               start_write_threads ((size_t) num);
+       }
 
        if ((list_init == NULL) && (read_heap == NULL))
                return;
@@ -1435,6 +1666,8 @@ void plugin_shutdown_all (void)
                plugin_set_ctx (old_ctx);
        }
 
+       stop_write_threads ();
+
        /* Write plugins which use the `user_data' pointer usually need the
         * same data available to the flush callback. If this is the case, set
         * the free_function to NULL when registering the flush callback and to
@@ -1490,7 +1723,7 @@ int plugin_dispatch_missing (const value_list_t *vl) /* {{{ */
   return (0);
 } /* int }}} plugin_dispatch_missing */
 
-int plugin_dispatch_values (value_list_t *vl)
+static int plugin_dispatch_values_internal (value_list_t *vl)
 {
        int status;
        static c_complain_t no_write_complaint = C_COMPLAIN_INIT_STATIC;
@@ -1684,52 +1917,28 @@ int plugin_dispatch_values (value_list_t *vl)
        }
 
        return (0);
-} /* int plugin_dispatch_values */
+} /* int plugin_dispatch_values_internal */
 
-int plugin_dispatch_values_secure (const value_list_t *vl)
+int plugin_dispatch_values (value_list_t const *vl)
 {
-  value_list_t vl_copy;
-  int status;
-
-  if (vl == NULL)
-    return EINVAL;
-
-  memcpy (&vl_copy, vl, sizeof (vl_copy));
-
-  /* Write callbacks must not change the values and meta pointers, so we can
-   * savely skip copying those and make this more efficient. */
-  if ((pre_cache_chain == NULL) && (post_cache_chain == NULL))
-    return (plugin_dispatch_values (&vl_copy));
-
-  /* Set pointers to NULL, just to be on the save side. */
-  vl_copy.values = NULL;
-  vl_copy.meta = NULL;
-
-  vl_copy.values = malloc (sizeof (*vl_copy.values) * vl->values_len);
-  if (vl_copy.values == NULL)
-  {
-    ERROR ("plugin_dispatch_values_secure: malloc failed.");
-    return (ENOMEM);
-  }
-  memcpy (vl_copy.values, vl->values, sizeof (*vl_copy.values) * vl->values_len);
-
-  if (vl->meta != NULL)
-  {
-    vl_copy.meta = meta_data_clone (vl->meta);
-    if (vl_copy.meta == NULL)
-    {
-      ERROR ("plugin_dispatch_values_secure: meta_data_clone failed.");
-      free (vl_copy.values);
-      return (ENOMEM);
-    }
-  } /* if (vl->meta) */
+       int status;
 
-  status = plugin_dispatch_values (&vl_copy);
+       status = plugin_write_enqueue (vl);
+       if (status != 0)
+       {
+               char errbuf[1024];
+               ERROR ("plugin_dispatch_values: plugin_write_enqueue failed "
+                               "with status %i (%s).", status,
+                               sstrerror (status, errbuf, sizeof (errbuf)));
+               return (status);
+       }
 
-  meta_data_destroy (vl_copy.meta);
-  free (vl_copy.values);
+       return (0);
+}
 
-  return (status);
+int plugin_dispatch_values_secure (const value_list_t *vl)
+{
+  return (plugin_dispatch_values (vl));
 } /* int plugin_dispatch_values_secure */
 
 int plugin_dispatch_notification (const notification_t *notif)
index 0f35de5..c28709e 100644 (file)
@@ -326,7 +326,7 @@ int plugin_unregister_notification (const char *name);
  *  `vl'        Value list of the values that have been read by a `read'
  *              function.
  */
-int plugin_dispatch_values (value_list_t *vl);
+int plugin_dispatch_values (value_list_t const *vl);
 int plugin_dispatch_values_secure (const value_list_t *vl);
 int plugin_dispatch_missing (const value_list_t *vl);