X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Fplugin.c;h=647c1fb691d4b09bb2b9f74ff61b5958db6b032f;hb=106e2016957fe53e4d249f1352bb5a7ec9caa76b;hp=b120e7ba6f5f0c9b5c612486001a46a07abfa9c9;hpb=cfb86a0f119d0ee0e72a920ba1251bb0350cf08b;p=collectd.git diff --git a/src/plugin.c b/src/plugin.c index b120e7ba..647c1fb6 100644 --- a/src/plugin.c +++ b/src/plugin.c @@ -51,6 +51,7 @@ typedef struct callback_func_s callback_func_t; #define RF_SIMPLE 0 #define RF_COMPLEX 1 +#define RF_REMOVE 65535 struct read_func_s { /* `read_func_t' "inherits" from `callback_func_t'. @@ -84,6 +85,7 @@ static c_avl_tree_t *data_sets; static char *plugindir = NULL; static c_heap_t *read_heap = NULL; +static llist_t *read_list; static int read_loop = 1; static pthread_mutex_t read_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t read_cond = PTHREAD_COND_INITIALIZER; @@ -300,6 +302,13 @@ static int plugin_load_file (char *file) return (0); } +static _Bool timeout_reached(struct timespec timeout) +{ + struct timeval now; + gettimeofday(&now, NULL); + return (now.tv_sec >= timeout.tv_sec && now.tv_usec >= (timeout.tv_nsec / 1000)); +} + static void *plugin_read_thread (void __attribute__((unused)) *args) { while (read_loop != 0) @@ -307,6 +316,8 @@ static void *plugin_read_thread (void __attribute__((unused)) *args) read_func_t *rf; struct timeval now; int status; + int rf_type; + int rc; /* Get the read function that needs to be read next. */ rf = c_head_get_root (read_heap); @@ -342,8 +353,18 @@ static void *plugin_read_thread (void __attribute__((unused)) *args) /* sleep until this entry is due, * using pthread_cond_timedwait */ pthread_mutex_lock (&read_lock); - pthread_cond_timedwait (&read_cond, &read_lock, + /* In pthread_cond_timedwait, spurious wakeups are possible + * (and really happen, at least on NetBSD with > 1 CPU), thus + * we need to re-evaluate the condition every time + * pthread_cond_timedwait returns. */ + rc = 0; + while (!timeout_reached(rf->rf_next_read) && rc == 0) { + rc = pthread_cond_timedwait (&read_cond, &read_lock, &rf->rf_next_read); + } + + /* Must hold `real_lock' when accessing `rf->rf_type'. */ + rf_type = rf->rf_type; pthread_mutex_unlock (&read_lock); /* Check if we're supposed to stop.. This may have interrupted @@ -355,9 +376,22 @@ static void *plugin_read_thread (void __attribute__((unused)) *args) break; } + /* The entry has been marked for deletion. The linked list + * entry has already been removed by `plugin_unregister_read'. + * All we have to do here is free the `read_func_t' and + * continue. */ + if (rf_type == RF_REMOVE) + { + DEBUG ("plugin_read_thread: Destroying the `%s' " + "callback.", rf->rf_name); + destroy_callback ((callback_func_t *) rf); + rf = NULL; + continue; + } + DEBUG ("plugin_read_thread: Handling `%s'.", rf->rf_name); - if (rf->rf_type == RF_SIMPLE) + if (rf_type == RF_SIMPLE) { int (*callback) (void); @@ -368,6 +402,8 @@ static void *plugin_read_thread (void __attribute__((unused)) *args) { plugin_read_cb callback; + assert (rf_type == RF_COMPLEX); + callback = rf->rf_callback; status = (*callback) (&rf->rf_udata); } @@ -641,22 +677,67 @@ static int plugin_compare_read_func (const void *arg0, const void *arg1) return (0); } /* int plugin_compare_read_func */ -int plugin_register_read (const char *name, - int (*callback) (void)) +/* Add a read function to both, the heap and a linked list. The linked list if + * used to look-up read functions, especially for the remove function. The heap + * is used to determine which plugin to read next. */ +static int plugin_insert_read (read_func_t *rf) { - read_func_t *rf; + int status; + llentry_t *le; + + pthread_mutex_lock (&read_lock); + + if (read_list == NULL) + { + read_list = llist_create (); + if (read_list == NULL) + { + pthread_mutex_unlock (&read_lock); + ERROR ("plugin_insert_read: read_list failed."); + return (-1); + } + } if (read_heap == NULL) { read_heap = c_heap_create (plugin_compare_read_func); if (read_heap == NULL) { - ERROR ("plugin_register_read: " - "c_heap_create failed."); + pthread_mutex_unlock (&read_lock); + ERROR ("plugin_insert_read: c_heap_create failed."); return (-1); } } + le = llentry_create (rf->rf_name, rf); + if (le == NULL) + { + pthread_mutex_unlock (&read_lock); + ERROR ("plugin_insert_read: llentry_create failed."); + return (-1); + } + + status = c_heap_insert (read_heap, rf); + if (status != 0) + { + pthread_mutex_unlock (&read_lock); + ERROR ("plugin_insert_read: c_heap_insert failed."); + llentry_destroy (le); + return (-1); + } + + /* This does not fail. */ + llist_append (read_list, le); + + pthread_mutex_unlock (&read_lock); + return (0); +} /* int plugin_insert_read */ + +int plugin_register_read (const char *name, + int (*callback) (void)) +{ + read_func_t *rf; + rf = (read_func_t *) malloc (sizeof (read_func_t)); if (rf == NULL) { @@ -676,7 +757,7 @@ int plugin_register_read (const char *name, rf->rf_interval.tv_nsec = 0; rf->rf_effective_interval = rf->rf_interval; - return (c_heap_insert (read_heap, rf)); + return (plugin_insert_read (rf)); } /* int plugin_register_read */ int plugin_register_complex_read (const char *name, @@ -686,16 +767,6 @@ int plugin_register_complex_read (const char *name, { read_func_t *rf; - if (read_heap == NULL) - { - read_heap = c_heap_create (plugin_compare_read_func); - if (read_heap == NULL) - { - ERROR ("plugin_register_read: c_heap_create failed."); - return (-1); - } - } - rf = (read_func_t *) malloc (sizeof (read_func_t)); if (rf == NULL) { @@ -724,7 +795,7 @@ int plugin_register_complex_read (const char *name, rf->rf_udata = *user_data; } - return (c_heap_insert (read_heap, rf)); + return (plugin_insert_read (rf)); } /* int plugin_register_complex_read */ int plugin_register_write (const char *name, @@ -816,12 +887,45 @@ int plugin_unregister_init (const char *name) return (plugin_unregister (list_init, name)); } -int plugin_unregister_read (const char *name) +int plugin_unregister_read (const char *name) /* {{{ */ { - /* TODO: Implement removal of a specific key from the heap. */ - assert (0); - return (-1); -} + llentry_t *le; + read_func_t *rf; + + if (name == NULL) + return (-ENOENT); + + pthread_mutex_lock (&read_lock); + + if (read_list == NULL) + { + pthread_mutex_unlock (&read_lock); + return (-ENOENT); + } + + le = llist_search (read_list, name); + if (le == NULL) + { + pthread_mutex_unlock (&read_lock); + WARNING ("plugin_unregister_read: No such read function: %s", + name); + return (-ENOENT); + } + + llist_remove (read_list, le); + + rf = le->value; + assert (rf != NULL); + rf->rf_type = RF_REMOVE; + + pthread_mutex_unlock (&read_lock); + + llentry_destroy (le); + + DEBUG ("plugin_unregister_read: Marked `%s' for removal.", name); + + return (0); +} /* }}} int plugin_unregister_read */ int plugin_unregister_write (const char *name) { @@ -1093,6 +1197,12 @@ void plugin_shutdown_all (void) stop_read_threads (); destroy_all_callbacks (&list_init); + + pthread_mutex_lock (&read_lock); + llist_destroy (read_list); + read_list = NULL; + pthread_mutex_unlock (&read_lock); + destroy_read_heap (); plugin_flush (/* plugin = */ NULL, /* timeout = */ -1, @@ -1119,8 +1229,14 @@ void plugin_shutdown_all (void) (*callback) (); } - destroy_all_callbacks (&list_write); + /* Write plugins which use the `user_data' pointer usually need the + * same data available to the flush callback. If this is the case, set + * the free_function to NULL when registering the flush callback and to + * the real free function when registering the write callback. This way + * the data isn't freed twice. */ destroy_all_callbacks (&list_flush); + destroy_all_callbacks (&list_write); + destroy_all_callbacks (&list_notification); destroy_all_callbacks (&list_shutdown); destroy_all_callbacks (&list_log); @@ -1136,6 +1252,8 @@ int plugin_dispatch_values (value_list_t *vl) data_set_t *ds; + int free_meta_data = 0; + if ((vl == NULL) || (vl->type[0] == 0) || (vl->values == NULL) || (vl->values_len < 1)) { @@ -1143,6 +1261,12 @@ int plugin_dispatch_values (value_list_t *vl) return (-1); } + /* Free meta data only if the calling function didn't specify any. In + * this case matches and targets may add some and the calling function + * may not expect (and therefore free) that data. */ + if (vl->meta == NULL) + free_meta_data = 1; + if (list_write == NULL) c_complain_once (LOG_WARNING, &no_write_complaint, "plugin_dispatch_values: No write callback has been " @@ -1257,6 +1381,9 @@ int plugin_dispatch_values (value_list_t *vl) /* Update the value cache */ uc_update (ds, vl); + /* Initiate threshold checking */ + ut_check_threshold (ds, vl); + if (post_cache_chain != NULL) { status = fc_process_chain (ds, vl, post_cache_chain); @@ -1280,6 +1407,12 @@ int plugin_dispatch_values (value_list_t *vl) vl->values_len = saved_values_len; } + if ((free_meta_data != 0) && (vl->meta != NULL)) + { + meta_data_destroy (vl->meta); + vl->meta = NULL; + } + return (0); } /* int plugin_dispatch_values */ @@ -1326,9 +1459,6 @@ void plugin_log (int level, const char *format, ...) va_list ap; llentry_t *le; - if (list_log == NULL) - return; - #if !COLLECT_DEBUG if (level >= LOG_DEBUG) return; @@ -1339,6 +1469,12 @@ void plugin_log (int level, const char *format, ...) msg[sizeof (msg) - 1] = '\0'; va_end (ap); + if (list_log == NULL) + { + fprintf (stderr, "%s\n", msg); + return; + } + le = llist_head (list_log); while (le != NULL) {