From: Florian Forster Date: Sat, 21 Feb 2009 11:25:29 +0000 (+0100) Subject: Revert "src/plugin.[ch]: Implement `plugin_dispatch_values_async'." X-Git-Tag: collectd-4.7.0~127^2~38 X-Git-Url: https://git.octo.it/?p=collectd.git;a=commitdiff_plain;h=b1e441ef04535b807d0ee34f8d416687deeee9bf Revert "src/plugin.[ch]: Implement `plugin_dispatch_values_async'." This reverts commit ba6497bdffb6a225eb6c86c8b8fa57a2520f0c5f. --- diff --git a/src/plugin.c b/src/plugin.c index 6fd74e2b..9f42f2e4 100644 --- a/src/plugin.c +++ b/src/plugin.c @@ -50,14 +50,6 @@ struct read_func_s }; typedef struct read_func_s read_func_t; -struct dispatch_queue_s; -typedef struct dispatch_queue_s dispatch_queue_t; -struct dispatch_queue_s -{ - value_list_t *vl; - dispatch_queue_t *next; -}; - /* * Private variables */ @@ -76,19 +68,11 @@ static c_avl_tree_t *data_sets; static char *plugindir = NULL; -static int read_loop = 0; +static int read_loop = 1; static pthread_mutex_t read_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t read_cond = PTHREAD_COND_INITIALIZER; static pthread_t *read_threads = NULL; -static size_t read_threads_num = 0; - -static int dispatch_loop = 0; -static pthread_mutex_t dispatch_lock = PTHREAD_MUTEX_INITIALIZER; -static pthread_cond_t dispatch_cond = PTHREAD_COND_INITIALIZER; -static pthread_t *dispatch_threads = NULL; -static size_t dispatch_threads_num = 0; -static dispatch_queue_t *dispatch_head; -static dispatch_queue_t *dispatch_tail; +static int read_threads_num = 0; /* * Static functions @@ -260,138 +244,60 @@ static void *plugin_read_thread (void __attribute__((unused)) *args) return ((void *) 0); } /* void *plugin_read_thread */ -static void *plugin_dispatch_thread (void *arg) -{ - pthread_mutex_lock (&dispatch_lock); - - while ((dispatch_loop != 0) || (dispatch_head != NULL)) - { - dispatch_queue_t *qi; - - if (dispatch_head == NULL) - pthread_cond_wait (&dispatch_cond, &dispatch_lock); - - if (dispatch_head == NULL) - continue; - - qi = dispatch_head; - - if (dispatch_head == dispatch_tail) - { - dispatch_head = NULL; - dispatch_tail = NULL; - } - else - { - dispatch_head = qi->next; - } - - pthread_mutex_unlock (&dispatch_lock); - - plugin_dispatch_values (qi->vl); - sfree (qi->vl->values); - sfree (qi->vl); - sfree (qi); - - pthread_mutex_lock (&dispatch_lock); - } /* while */ - - pthread_mutex_unlock (&dispatch_lock); - - return ((void *) 0); -} /* void *plugin_dispatch_thread */ - -static int plugin_start_threads (size_t num, - pthread_t **tlist, size_t *tlist_num, - void *(*thread_main) (void *)) +static void start_threads (int num) { - size_t i; - pthread_t *new_tlist; - size_t new_tlist_num; + int i; - if (*tlist != NULL) - return (-1); + if (read_threads != NULL) + return; - new_tlist = (pthread_t *) calloc (num, sizeof (*new_tlist)); - if (new_tlist == NULL) + read_threads = (pthread_t *) calloc (num, sizeof (pthread_t)); + if (read_threads == NULL) { - ERROR ("plugin: plugin_start_threads: calloc failed."); - return (-1); + ERROR ("plugin: start_threads: calloc failed."); + return; } - new_tlist_num = 0; + read_threads_num = 0; for (i = 0; i < num; i++) { - int status; - - status = pthread_create (new_tlist + new_tlist_num, - /* attr = */ NULL, thread_main, - /* arg = */ (void *) 0); - if (status != 0) + if (pthread_create (read_threads + read_threads_num, NULL, + plugin_read_thread, NULL) == 0) { - ERROR ("plugin: plugin_start_threads: " - "pthread_create failed."); - continue; + read_threads_num++; + } + else + { + ERROR ("plugin: start_threads: pthread_create failed."); + return; } - - new_tlist_num++; } /* for (i) */ +} /* void start_threads */ - if (new_tlist_num < 1) - { - ERROR ("plugin: plugin_start_threads: " - "Creating threads failed."); - sfree (new_tlist); - return (-1); - } - - *tlist = new_tlist; - *tlist_num = new_tlist_num; - - return (0); -} /* int plugin_start_threads */ - -static int plugin_stop_threads (int *loop, - pthread_mutex_t *lock, pthread_cond_t *cond, - pthread_t **ret_tlist, size_t *ret_tlist_len) +static void stop_threads (void) { - pthread_t *tlist; - size_t tlist_len; - size_t i; - - tlist = *ret_tlist; - tlist_len = *ret_tlist_len; - - if (tlist == NULL) - return (0); - - DEBUG ("plugin_stop_threads: Stopping %zu threads.", tlist_len); - - pthread_mutex_lock (lock); + int i; - *loop = 0; + if (read_threads == NULL) + return; - pthread_cond_broadcast (cond); - pthread_mutex_unlock (lock); + pthread_mutex_lock (&read_lock); + read_loop = 0; + DEBUG ("plugin: stop_threads: Signalling `read_cond'"); + pthread_cond_broadcast (&read_cond); + pthread_mutex_unlock (&read_lock); - for (i = 0; i < tlist_len; i++) + for (i = 0; i < read_threads_num; i++) { - int status; - - status = pthread_join (tlist[i], NULL); - if (status != 0) + if (pthread_join (read_threads[i], NULL) != 0) { - ERROR ("plugin_stop_threads: pthread_join failed."); + ERROR ("plugin: stop_threads: pthread_join failed."); } + read_threads[i] = (pthread_t) 0; } - - sfree (tlist); - - *ret_tlist = NULL; - *ret_tlist_len = 0; - - return (0); -} /* int plugin_stop_threads */ + sfree (read_threads); + read_threads_num = 0; +} /* void stop_threads */ /* * Public functions @@ -734,18 +640,8 @@ void plugin_init_all (void) rt = global_option_get ("ReadThreads"); num = atoi (rt); if (num != -1) - { - read_loop = 1; - plugin_start_threads ((num > 0) ? ((size_t ) num) : 5, - &read_threads, &read_threads_num, - plugin_read_thread); - } + start_threads ((num > 0) ? num : 5); } - - dispatch_loop = 1; - plugin_start_threads (/* num = */ 1, /* FIXME: Make this number configurable */ - &dispatch_threads, &dispatch_threads_num, - plugin_dispatch_thread); } /* void plugin_init_all */ void plugin_read_all (void) @@ -916,11 +812,7 @@ void plugin_shutdown_all (void) int (*callback) (void); llentry_t *le; - plugin_stop_threads (&read_loop, &read_lock, &read_cond, - &read_threads, &read_threads_num); - - plugin_stop_threads (&dispatch_loop, &dispatch_lock, &dispatch_cond, - &dispatch_threads, &dispatch_threads_num); + stop_threads (); if (list_shutdown == NULL) return; @@ -1045,76 +937,6 @@ int plugin_dispatch_values (value_list_t *vl) return (0); } /* int plugin_dispatch_values */ -int plugin_dispatch_values_async (const value_list_t *vl) -{ - dispatch_queue_t *qi; - int i; - - if (vl == NULL) - return (-EINVAL); - - if (dispatch_threads_num < 1) - { - ERROR ("plugin_dispatch_values_async: " - "No dispatch threads have been started!"); -#ifdef ENOTCONN - return (-ENOTCONN); -#else - return (-1); -#endif - } - - qi = (dispatch_queue_t *) malloc (sizeof (*qi)); - if (qi == NULL) - { - ERROR ("plugin_dispatch_values_async: malloc failed."); - return (-ENOMEM); - } - memset (qi, 0, sizeof (*qi)); - qi->next = NULL; - - qi->vl = (value_list_t *) malloc (sizeof (value_list_t)); - if (qi->vl == NULL) - { - ERROR ("plugin_dispatch_values_async: malloc failed."); - sfree (qi); - return (-ENOMEM); - } - memcpy (qi->vl, vl, sizeof (value_list_t)); - qi->vl->values = NULL; - - qi->vl->values = (value_t *) calloc (qi->vl->values_len, - sizeof (value_t)); - if (qi->vl->values == NULL) - { - ERROR ("plugin_dispatch_values_async: malloc failed."); - sfree (qi->vl); - sfree (qi); - return (-ENOMEM); - } - - for (i = 0; i < vl->values_len; i++) - qi->vl->values[i] = vl->values[i]; - - pthread_mutex_lock (&dispatch_lock); - - if (dispatch_tail == NULL) - { - dispatch_head = qi; - dispatch_tail = qi; - } - else - { - dispatch_tail->next = qi; - dispatch_tail = qi; - } - - pthread_cond_signal (&dispatch_cond); - pthread_mutex_unlock (&dispatch_lock); - - return (0); -} /* int plugin_dispatch_values_async */ - int plugin_dispatch_notification (const notification_t *notif) { int (*callback) (const notification_t *); diff --git a/src/plugin.h b/src/plugin.h index 6c2c41ea..3088e06e 100644 --- a/src/plugin.h +++ b/src/plugin.h @@ -264,7 +264,6 @@ int plugin_unregister_notification (const char *name); * function. */ int plugin_dispatch_values (value_list_t *vl); -int plugin_dispatch_values_async (const value_list_t *vl); int plugin_dispatch_notification (const notification_t *notif);