From: Florian Forster Date: Mon, 21 Jan 2013 09:27:55 +0000 (+0100) Subject: src/plugin.[ch]: Use a pool of write threads to dispatch values to write plugins. X-Git-Tag: collectd-5.3.0~58 X-Git-Url: https://git.octo.it/?p=collectd.git;a=commitdiff_plain;h=67c4689b74c69cbd3444e9860eae07bad439066b src/plugin.[ch]: Use a pool of write threads to dispatch values to write plugins. This fixes Github issue #75. --- diff --git a/src/collectd.conf.in b/src/collectd.conf.in index dead93b6..e7428fb0 100644 --- a/src/collectd.conf.in +++ b/src/collectd.conf.in @@ -28,6 +28,7 @@ #Timeout 2 #ReadThreads 5 +#WriteThreads 5 ############################################################################## # Logging # diff --git a/src/collectd.conf.pod b/src/collectd.conf.pod index 01346e14..74a8cfc7 100644 --- a/src/collectd.conf.pod +++ b/src/collectd.conf.pod @@ -181,8 +181,14 @@ see L for details. Number of threads to start for reading plugins. The default value is B<5>, but you may want to increase this if you have more than five plugins that take a -long time to read. Mostly those are plugin that do network-IO. Setting this to -a value higher than the number of plugins you've loaded is totally useless. +long time to read. Mostly those are plugins that do network-IO. Setting this to +a value higher than the number of registered read callbacks is not recommended. + +=item B I + +Number of threads to start for dispatching value lists to write plugins. The +default value is B<5>, but you may want to increase this if you have more than +five plugins that may take relatively long to write to. =item B I diff --git a/src/configfile.c b/src/configfile.c index b16ae476..ac5e8edc 100644 --- a/src/configfile.c +++ b/src/configfile.c @@ -108,6 +108,7 @@ static cf_global_option_t cf_global_options[] = {"FQDNLookup", NULL, "true"}, {"Interval", NULL, NULL}, {"ReadThreads", NULL, "5"}, + {"WriteThreads", NULL, "5"}, {"Timeout", NULL, "2"}, {"PreCacheChain", NULL, "PreCache"}, {"PostCacheChain", NULL, "PostCache"} diff --git a/src/plugin.c b/src/plugin.c index bbede051..0aa33b1d 100644 --- a/src/plugin.c +++ b/src/plugin.c @@ -1,6 +1,6 @@ /** * collectd - src/plugin.c - * Copyright (C) 2005-2011 Florian octo Forster + * Copyright (C) 2005-2013 Florian octo Forster * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the @@ -69,6 +69,14 @@ struct read_func_s }; typedef struct read_func_s read_func_t; +struct write_queue_s; +typedef struct write_queue_s write_queue_t; +struct write_queue_s +{ + value_list_t *vl; + write_queue_t *next; +}; + /* * Private variables */ @@ -95,12 +103,22 @@ static pthread_cond_t read_cond = PTHREAD_COND_INITIALIZER; static pthread_t *read_threads = NULL; static int read_threads_num = 0; +static write_queue_t *write_queue_head; +static write_queue_t *write_queue_tail; +static _Bool write_loop = 1; +static pthread_mutex_t write_lock = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t write_cond = PTHREAD_COND_INITIALIZER; +static pthread_t *write_threads = NULL; +static size_t write_threads_num = 0; + static pthread_key_t plugin_ctx_key; static _Bool plugin_ctx_key_initialized = 0; /* * Static functions */ +static int plugin_dispatch_values_internal (value_list_t *vl); + static const char *plugin_get_dir (void) { if (plugindir == NULL) @@ -573,6 +591,210 @@ static void stop_read_threads (void) read_threads_num = 0; } /* void stop_read_threads */ +static void plugin_value_list_free (value_list_t *vl) /* {{{ */ +{ + if (vl == NULL) + return; + + meta_data_destroy (vl->meta); + sfree (vl->values); + sfree (vl); +} /* }}} void plugin_value_list_free */ + +static value_list_t *plugin_value_list_clone (value_list_t const *vl_orig) /* {{{ */ +{ + value_list_t *vl; + + if (vl_orig == NULL) + return (NULL); + + vl = malloc (sizeof (*vl)); + if (vl == NULL) + return (NULL); + memcpy (vl, vl_orig, sizeof (*vl)); + + vl->values = calloc (vl_orig->values_len, sizeof (*vl->values)); + if (vl->values == NULL) + { + plugin_value_list_free (vl); + return (NULL); + } + memcpy (vl->values, vl_orig->values, + vl_orig->values_len * sizeof (*vl->values)); + + vl->meta = meta_data_clone (vl->meta); + if ((vl_orig->meta != NULL) && (vl->meta == NULL)) + { + plugin_value_list_free (vl); + return (NULL); + } + + return (vl); +} /* }}} value_list_t *plugin_value_list_clone */ + +static int plugin_write_enqueue (value_list_t const *vl) /* {{{ */ +{ + write_queue_t *q; + + q = malloc (sizeof (*q)); + if (q == NULL) + return (ENOMEM); + q->next = NULL; + + q->vl = plugin_value_list_clone (vl); + if (q->vl == NULL) + { + sfree (q); + return (ENOMEM); + } + + pthread_mutex_lock (&write_lock); + + if (write_queue_tail == NULL) + { + write_queue_head = q; + write_queue_tail = q; + } + else + { + write_queue_tail->next = q; + write_queue_tail = q; + } + + pthread_cond_signal (&write_cond); + pthread_mutex_unlock (&write_lock); + + return (0); +} /* }}} int plugin_write_enqueue */ + +static value_list_t *plugin_write_dequeue (void) /* {{{ */ +{ + write_queue_t *q; + value_list_t *vl; + + pthread_mutex_lock (&write_lock); + + while (write_loop && (write_queue_head == NULL)) + pthread_cond_wait (&write_cond, &write_lock); + + if (write_queue_head == NULL) + { + pthread_mutex_unlock (&write_lock); + return (NULL); + } + + q = write_queue_head; + write_queue_head = q->next; + if (write_queue_head == NULL) + write_queue_tail = NULL; + + pthread_mutex_unlock (&write_lock); + + vl = q->vl; + sfree (q); + return (vl); +} /* }}} value_list_t *plugin_write_dequeue */ + +static void *plugin_write_thread (void __attribute__((unused)) *args) /* {{{ */ +{ + while (write_loop) + { + value_list_t *vl = plugin_write_dequeue (); + if (vl == NULL) + continue; + + plugin_dispatch_values_internal (vl); + + plugin_value_list_free (vl); + } + + pthread_exit (NULL); + return ((void *) 0); +} /* }}} void *plugin_write_thread */ + +static void start_write_threads (size_t num) /* {{{ */ +{ + size_t i; + + if (write_threads != NULL) + return; + + write_threads = (pthread_t *) calloc (num, sizeof (pthread_t)); + if (write_threads == NULL) + { + ERROR ("plugin: start_write_threads: calloc failed."); + return; + } + + write_threads_num = 0; + for (i = 0; i < num; i++) + { + int status; + + status = pthread_create (write_threads + write_threads_num, + /* attr = */ NULL, + plugin_write_thread, + /* arg = */ NULL); + if (status != 0) + { + char errbuf[1024]; + ERROR ("plugin: start_write_threads: pthread_create failed " + "with status %i (%s).", status, + sstrerror (status, errbuf, sizeof (errbuf))); + return; + } + + write_threads_num++; + } /* for (i) */ +} /* }}} void start_write_threads */ + +static void stop_write_threads (void) /* {{{ */ +{ + write_queue_t *q; + int i; + + if (write_threads == NULL) + return; + + INFO ("collectd: Stopping %zu write threads.", write_threads_num); + + pthread_mutex_lock (&write_lock); + write_loop = 0; + DEBUG ("plugin: stop_write_threads: Signalling `write_cond'"); + pthread_cond_broadcast (&write_cond); + pthread_mutex_unlock (&write_lock); + + for (i = 0; i < write_threads_num; i++) + { + if (pthread_join (write_threads[i], NULL) != 0) + { + ERROR ("plugin: stop_write_threads: pthread_join failed."); + } + write_threads[i] = (pthread_t) 0; + } + sfree (write_threads); + write_threads_num = 0; + + pthread_mutex_lock (&write_lock); + i = 0; + for (q = write_queue_head; q != NULL; q = q->next) + { + plugin_value_list_free (q->vl); + sfree (q); + i++; + } + write_queue_head = NULL; + write_queue_tail = NULL; + pthread_mutex_unlock (&write_lock); + + if (i > 0) + { + WARNING ("plugin: %i value list%s left after shutting down " + "the write threads.", + i, (i == 1) ? " was" : "s were"); + } +} /* }}} void stop_write_threads */ + /* * Public functions */ @@ -1166,6 +1388,15 @@ void plugin_init_all (void) chain_name = global_option_get ("PostCacheChain"); post_cache_chain = fc_chain_get_by_name (chain_name); + { + char const *tmp = global_option_get ("WriteThreads"); + int num = atoi (tmp); + + if (num < 1) + num = 5; + + start_write_threads ((size_t) num); + } if ((list_init == NULL) && (read_heap == NULL)) return; @@ -1435,6 +1666,8 @@ void plugin_shutdown_all (void) plugin_set_ctx (old_ctx); } + stop_write_threads (); + /* Write plugins which use the `user_data' pointer usually need the * same data available to the flush callback. If this is the case, set * the free_function to NULL when registering the flush callback and to @@ -1490,7 +1723,7 @@ int plugin_dispatch_missing (const value_list_t *vl) /* {{{ */ return (0); } /* int }}} plugin_dispatch_missing */ -int plugin_dispatch_values (value_list_t *vl) +static int plugin_dispatch_values_internal (value_list_t *vl) { int status; static c_complain_t no_write_complaint = C_COMPLAIN_INIT_STATIC; @@ -1684,52 +1917,28 @@ int plugin_dispatch_values (value_list_t *vl) } return (0); -} /* int plugin_dispatch_values */ +} /* int plugin_dispatch_values_internal */ -int plugin_dispatch_values_secure (const value_list_t *vl) +int plugin_dispatch_values (value_list_t const *vl) { - value_list_t vl_copy; - int status; - - if (vl == NULL) - return EINVAL; - - memcpy (&vl_copy, vl, sizeof (vl_copy)); - - /* Write callbacks must not change the values and meta pointers, so we can - * savely skip copying those and make this more efficient. */ - if ((pre_cache_chain == NULL) && (post_cache_chain == NULL)) - return (plugin_dispatch_values (&vl_copy)); - - /* Set pointers to NULL, just to be on the save side. */ - vl_copy.values = NULL; - vl_copy.meta = NULL; - - vl_copy.values = malloc (sizeof (*vl_copy.values) * vl->values_len); - if (vl_copy.values == NULL) - { - ERROR ("plugin_dispatch_values_secure: malloc failed."); - return (ENOMEM); - } - memcpy (vl_copy.values, vl->values, sizeof (*vl_copy.values) * vl->values_len); - - if (vl->meta != NULL) - { - vl_copy.meta = meta_data_clone (vl->meta); - if (vl_copy.meta == NULL) - { - ERROR ("plugin_dispatch_values_secure: meta_data_clone failed."); - free (vl_copy.values); - return (ENOMEM); - } - } /* if (vl->meta) */ + int status; - status = plugin_dispatch_values (&vl_copy); + status = plugin_write_enqueue (vl); + if (status != 0) + { + char errbuf[1024]; + ERROR ("plugin_dispatch_values: plugin_write_enqueue failed " + "with status %i (%s).", status, + sstrerror (status, errbuf, sizeof (errbuf))); + return (status); + } - meta_data_destroy (vl_copy.meta); - free (vl_copy.values); + return (0); +} - return (status); +int plugin_dispatch_values_secure (const value_list_t *vl) +{ + return (plugin_dispatch_values (vl)); } /* int plugin_dispatch_values_secure */ int plugin_dispatch_notification (const notification_t *notif) diff --git a/src/plugin.h b/src/plugin.h index 0f35de56..c28709e0 100644 --- a/src/plugin.h +++ b/src/plugin.h @@ -326,7 +326,7 @@ int plugin_unregister_notification (const char *name); * `vl' Value list of the values that have been read by a `read' * function. */ -int plugin_dispatch_values (value_list_t *vl); +int plugin_dispatch_values (value_list_t const *vl); int plugin_dispatch_values_secure (const value_list_t *vl); int plugin_dispatch_missing (const value_list_t *vl);