From: Florian Forster Date: Thu, 19 Feb 2009 22:28:16 +0000 (+0100) Subject: src/plugin.[ch]: Implement `plugin_dispatch_values_async'. X-Git-Tag: collectd-4.7.0~127^2~41^2~1 X-Git-Url: https://git.octo.it/?a=commitdiff_plain;h=ba6497bdffb6a225eb6c86c8b8fa57a2520f0c5f;p=collectd.git src/plugin.[ch]: Implement `plugin_dispatch_values_async'. This function differs from `plugin_dispatch_values' in that it will add the value_list_t to a queue rather than calling the write functions right away. This as at least two advantages: - The _async function will often return faster, since no file operation is done. - The ``read thread'' and the ``write thread'' are not the same thread. This makes it much easier for plugins that provide both, `read' and `write' callbacks. --- diff --git a/src/plugin.c b/src/plugin.c index 9f42f2e4..6fd74e2b 100644 --- a/src/plugin.c +++ b/src/plugin.c @@ -50,6 +50,14 @@ struct read_func_s }; typedef struct read_func_s read_func_t; +struct dispatch_queue_s; +typedef struct dispatch_queue_s dispatch_queue_t; +struct dispatch_queue_s +{ + value_list_t *vl; + dispatch_queue_t *next; +}; + /* * Private variables */ @@ -68,11 +76,19 @@ static c_avl_tree_t *data_sets; static char *plugindir = NULL; -static int read_loop = 1; +static int read_loop = 0; static pthread_mutex_t read_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t read_cond = PTHREAD_COND_INITIALIZER; static pthread_t *read_threads = NULL; -static int read_threads_num = 0; +static size_t read_threads_num = 0; + +static int dispatch_loop = 0; +static pthread_mutex_t dispatch_lock = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t dispatch_cond = PTHREAD_COND_INITIALIZER; +static pthread_t *dispatch_threads = NULL; +static size_t dispatch_threads_num = 0; +static dispatch_queue_t *dispatch_head; +static dispatch_queue_t *dispatch_tail; /* * Static functions @@ -244,60 +260,138 @@ static void *plugin_read_thread (void __attribute__((unused)) *args) return ((void *) 0); } /* void *plugin_read_thread */ -static void start_threads (int num) +static void *plugin_dispatch_thread (void *arg) { - int i; + pthread_mutex_lock (&dispatch_lock); - if (read_threads != NULL) - return; + while ((dispatch_loop != 0) || (dispatch_head != NULL)) + { + dispatch_queue_t *qi; + + if (dispatch_head == NULL) + pthread_cond_wait (&dispatch_cond, &dispatch_lock); + + if (dispatch_head == NULL) + continue; + + qi = dispatch_head; + + if (dispatch_head == dispatch_tail) + { + dispatch_head = NULL; + dispatch_tail = NULL; + } + else + { + dispatch_head = qi->next; + } + + pthread_mutex_unlock (&dispatch_lock); - read_threads = (pthread_t *) calloc (num, sizeof (pthread_t)); - if (read_threads == NULL) + plugin_dispatch_values (qi->vl); + sfree (qi->vl->values); + sfree (qi->vl); + sfree (qi); + + pthread_mutex_lock (&dispatch_lock); + } /* while */ + + pthread_mutex_unlock (&dispatch_lock); + + return ((void *) 0); +} /* void *plugin_dispatch_thread */ + +static int plugin_start_threads (size_t num, + pthread_t **tlist, size_t *tlist_num, + void *(*thread_main) (void *)) +{ + size_t i; + pthread_t *new_tlist; + size_t new_tlist_num; + + if (*tlist != NULL) + return (-1); + + new_tlist = (pthread_t *) calloc (num, sizeof (*new_tlist)); + if (new_tlist == NULL) { - ERROR ("plugin: start_threads: calloc failed."); - return; + ERROR ("plugin: plugin_start_threads: calloc failed."); + return (-1); } - read_threads_num = 0; + new_tlist_num = 0; for (i = 0; i < num; i++) { - if (pthread_create (read_threads + read_threads_num, NULL, - plugin_read_thread, NULL) == 0) - { - read_threads_num++; - } - else + int status; + + status = pthread_create (new_tlist + new_tlist_num, + /* attr = */ NULL, thread_main, + /* arg = */ (void *) 0); + if (status != 0) { - ERROR ("plugin: start_threads: pthread_create failed."); - return; + ERROR ("plugin: plugin_start_threads: " + "pthread_create failed."); + continue; } + + new_tlist_num++; } /* for (i) */ -} /* void start_threads */ -static void stop_threads (void) + if (new_tlist_num < 1) + { + ERROR ("plugin: plugin_start_threads: " + "Creating threads failed."); + sfree (new_tlist); + return (-1); + } + + *tlist = new_tlist; + *tlist_num = new_tlist_num; + + return (0); +} /* int plugin_start_threads */ + +static int plugin_stop_threads (int *loop, + pthread_mutex_t *lock, pthread_cond_t *cond, + pthread_t **ret_tlist, size_t *ret_tlist_len) { - int i; + pthread_t *tlist; + size_t tlist_len; + size_t i; - if (read_threads == NULL) - return; + tlist = *ret_tlist; + tlist_len = *ret_tlist_len; - pthread_mutex_lock (&read_lock); - read_loop = 0; - DEBUG ("plugin: stop_threads: Signalling `read_cond'"); - pthread_cond_broadcast (&read_cond); - pthread_mutex_unlock (&read_lock); + if (tlist == NULL) + return (0); + + DEBUG ("plugin_stop_threads: Stopping %zu threads.", tlist_len); + + pthread_mutex_lock (lock); + + *loop = 0; + + pthread_cond_broadcast (cond); + pthread_mutex_unlock (lock); - for (i = 0; i < read_threads_num; i++) + for (i = 0; i < tlist_len; i++) { - if (pthread_join (read_threads[i], NULL) != 0) + int status; + + status = pthread_join (tlist[i], NULL); + if (status != 0) { - ERROR ("plugin: stop_threads: pthread_join failed."); + ERROR ("plugin_stop_threads: pthread_join failed."); } - read_threads[i] = (pthread_t) 0; } - sfree (read_threads); - read_threads_num = 0; -} /* void stop_threads */ + + sfree (tlist); + + *ret_tlist = NULL; + *ret_tlist_len = 0; + + return (0); +} /* int plugin_stop_threads */ /* * Public functions @@ -640,8 +734,18 @@ void plugin_init_all (void) rt = global_option_get ("ReadThreads"); num = atoi (rt); if (num != -1) - start_threads ((num > 0) ? num : 5); + { + read_loop = 1; + plugin_start_threads ((num > 0) ? ((size_t ) num) : 5, + &read_threads, &read_threads_num, + plugin_read_thread); + } } + + dispatch_loop = 1; + plugin_start_threads (/* num = */ 1, /* FIXME: Make this number configurable */ + &dispatch_threads, &dispatch_threads_num, + plugin_dispatch_thread); } /* void plugin_init_all */ void plugin_read_all (void) @@ -812,7 +916,11 @@ void plugin_shutdown_all (void) int (*callback) (void); llentry_t *le; - stop_threads (); + plugin_stop_threads (&read_loop, &read_lock, &read_cond, + &read_threads, &read_threads_num); + + plugin_stop_threads (&dispatch_loop, &dispatch_lock, &dispatch_cond, + &dispatch_threads, &dispatch_threads_num); if (list_shutdown == NULL) return; @@ -937,6 +1045,76 @@ int plugin_dispatch_values (value_list_t *vl) return (0); } /* int plugin_dispatch_values */ +int plugin_dispatch_values_async (const value_list_t *vl) +{ + dispatch_queue_t *qi; + int i; + + if (vl == NULL) + return (-EINVAL); + + if (dispatch_threads_num < 1) + { + ERROR ("plugin_dispatch_values_async: " + "No dispatch threads have been started!"); +#ifdef ENOTCONN + return (-ENOTCONN); +#else + return (-1); +#endif + } + + qi = (dispatch_queue_t *) malloc (sizeof (*qi)); + if (qi == NULL) + { + ERROR ("plugin_dispatch_values_async: malloc failed."); + return (-ENOMEM); + } + memset (qi, 0, sizeof (*qi)); + qi->next = NULL; + + qi->vl = (value_list_t *) malloc (sizeof (value_list_t)); + if (qi->vl == NULL) + { + ERROR ("plugin_dispatch_values_async: malloc failed."); + sfree (qi); + return (-ENOMEM); + } + memcpy (qi->vl, vl, sizeof (value_list_t)); + qi->vl->values = NULL; + + qi->vl->values = (value_t *) calloc (qi->vl->values_len, + sizeof (value_t)); + if (qi->vl->values == NULL) + { + ERROR ("plugin_dispatch_values_async: malloc failed."); + sfree (qi->vl); + sfree (qi); + return (-ENOMEM); + } + + for (i = 0; i < vl->values_len; i++) + qi->vl->values[i] = vl->values[i]; + + pthread_mutex_lock (&dispatch_lock); + + if (dispatch_tail == NULL) + { + dispatch_head = qi; + dispatch_tail = qi; + } + else + { + dispatch_tail->next = qi; + dispatch_tail = qi; + } + + pthread_cond_signal (&dispatch_cond); + pthread_mutex_unlock (&dispatch_lock); + + return (0); +} /* int plugin_dispatch_values_async */ + int plugin_dispatch_notification (const notification_t *notif) { int (*callback) (const notification_t *); diff --git a/src/plugin.h b/src/plugin.h index 3088e06e..6c2c41ea 100644 --- a/src/plugin.h +++ b/src/plugin.h @@ -264,6 +264,7 @@ int plugin_unregister_notification (const char *name); * function. */ int plugin_dispatch_values (value_list_t *vl); +int plugin_dispatch_values_async (const value_list_t *vl); int plugin_dispatch_notification (const notification_t *notif);