X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Fplugin.c;h=11a0ef6eadbada45aad092f7116c879aeef9dfdd;hb=8d2f2ffd15ac83343aae58c82577b929e2e5f4ab;hp=b120e7ba6f5f0c9b5c612486001a46a07abfa9c9;hpb=7a369682c637e685587b021cbe0835b4fe8d2cb2;p=collectd.git diff --git a/src/plugin.c b/src/plugin.c index b120e7ba..11a0ef6e 100644 --- a/src/plugin.c +++ b/src/plugin.c @@ -51,6 +51,7 @@ typedef struct callback_func_s callback_func_t; #define RF_SIMPLE 0 #define RF_COMPLEX 1 +#define RF_REMOVE 65535 struct read_func_s { /* `read_func_t' "inherits" from `callback_func_t'. @@ -84,6 +85,7 @@ static c_avl_tree_t *data_sets; static char *plugindir = NULL; static c_heap_t *read_heap = NULL; +static llist_t *read_list; static int read_loop = 1; static pthread_mutex_t read_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t read_cond = PTHREAD_COND_INITIALIZER; @@ -307,6 +309,7 @@ static void *plugin_read_thread (void __attribute__((unused)) *args) read_func_t *rf; struct timeval now; int status; + int rf_type; /* Get the read function that needs to be read next. */ rf = c_head_get_root (read_heap); @@ -344,6 +347,8 @@ static void *plugin_read_thread (void __attribute__((unused)) *args) pthread_mutex_lock (&read_lock); pthread_cond_timedwait (&read_cond, &read_lock, &rf->rf_next_read); + /* Must hold `real_lock' when accessing `rf->rf_type'. */ + rf_type = rf->rf_type; pthread_mutex_unlock (&read_lock); /* Check if we're supposed to stop.. This may have interrupted @@ -355,9 +360,22 @@ static void *plugin_read_thread (void __attribute__((unused)) *args) break; } + /* The entry has been marked for deletion. The linked list + * entry has already been removed by `plugin_unregister_read'. + * All we have to do here is free the `read_func_t' and + * continue. */ + if (rf_type == RF_REMOVE) + { + DEBUG ("plugin_read_thread: Destroying the `%s' " + "callback.", rf->rf_name); + destroy_callback ((callback_func_t *) rf); + rf = NULL; + continue; + } + DEBUG ("plugin_read_thread: Handling `%s'.", rf->rf_name); - if (rf->rf_type == RF_SIMPLE) + if (rf_type == RF_SIMPLE) { int (*callback) (void); @@ -368,6 +386,8 @@ static void *plugin_read_thread (void __attribute__((unused)) *args) { plugin_read_cb callback; + assert (rf_type == RF_COMPLEX); + callback = rf->rf_callback; status = (*callback) (&rf->rf_udata); } @@ -641,22 +661,67 @@ static int plugin_compare_read_func (const void *arg0, const void *arg1) return (0); } /* int plugin_compare_read_func */ -int plugin_register_read (const char *name, - int (*callback) (void)) +/* Add a read function to both, the heap and a linked list. The linked list if + * used to look-up read functions, especially for the remove function. The heap + * is used to determine which plugin to read next. */ +static int plugin_insert_read (read_func_t *rf) { - read_func_t *rf; + int status; + llentry_t *le; + + pthread_mutex_lock (&read_lock); + + if (read_list == NULL) + { + read_list = llist_create (); + if (read_list == NULL) + { + pthread_mutex_unlock (&read_lock); + ERROR ("plugin_insert_read: read_list failed."); + return (-1); + } + } if (read_heap == NULL) { read_heap = c_heap_create (plugin_compare_read_func); if (read_heap == NULL) { - ERROR ("plugin_register_read: " - "c_heap_create failed."); + pthread_mutex_unlock (&read_lock); + ERROR ("plugin_insert_read: c_heap_create failed."); return (-1); } } + le = llentry_create (rf->rf_name, rf); + if (le == NULL) + { + pthread_mutex_unlock (&read_lock); + ERROR ("plugin_insert_read: llentry_create failed."); + return (-1); + } + + status = c_heap_insert (read_heap, rf); + if (status != 0) + { + pthread_mutex_unlock (&read_lock); + ERROR ("plugin_insert_read: c_heap_insert failed."); + llentry_destroy (le); + return (-1); + } + + /* This does not fail. */ + llist_append (read_list, le); + + pthread_mutex_unlock (&read_lock); + return (0); +} /* int plugin_insert_read */ + +int plugin_register_read (const char *name, + int (*callback) (void)) +{ + read_func_t *rf; + rf = (read_func_t *) malloc (sizeof (read_func_t)); if (rf == NULL) { @@ -676,7 +741,7 @@ int plugin_register_read (const char *name, rf->rf_interval.tv_nsec = 0; rf->rf_effective_interval = rf->rf_interval; - return (c_heap_insert (read_heap, rf)); + return (plugin_insert_read (rf)); } /* int plugin_register_read */ int plugin_register_complex_read (const char *name, @@ -686,16 +751,6 @@ int plugin_register_complex_read (const char *name, { read_func_t *rf; - if (read_heap == NULL) - { - read_heap = c_heap_create (plugin_compare_read_func); - if (read_heap == NULL) - { - ERROR ("plugin_register_read: c_heap_create failed."); - return (-1); - } - } - rf = (read_func_t *) malloc (sizeof (read_func_t)); if (rf == NULL) { @@ -724,7 +779,7 @@ int plugin_register_complex_read (const char *name, rf->rf_udata = *user_data; } - return (c_heap_insert (read_heap, rf)); + return (plugin_insert_read (rf)); } /* int plugin_register_complex_read */ int plugin_register_write (const char *name, @@ -816,12 +871,45 @@ int plugin_unregister_init (const char *name) return (plugin_unregister (list_init, name)); } -int plugin_unregister_read (const char *name) +int plugin_unregister_read (const char *name) /* {{{ */ { - /* TODO: Implement removal of a specific key from the heap. */ - assert (0); - return (-1); -} + llentry_t *le; + read_func_t *rf; + + if (name == NULL) + return (-ENOENT); + + pthread_mutex_lock (&read_lock); + + if (read_list == NULL) + { + pthread_mutex_unlock (&read_lock); + return (-ENOENT); + } + + le = llist_search (read_list, name); + if (le == NULL) + { + pthread_mutex_unlock (&read_lock); + WARNING ("plugin_unregister_read: No such read function: %s", + name); + return (-ENOENT); + } + + llist_remove (read_list, le); + + rf = le->value; + assert (rf != NULL); + rf->rf_type = RF_REMOVE; + + pthread_mutex_unlock (&read_lock); + + llentry_destroy (le); + + DEBUG ("plugin_unregister_read: Marked `%s' for removal.", name); + + return (0); +} /* }}} int plugin_unregister_read */ int plugin_unregister_write (const char *name) { @@ -1093,6 +1181,12 @@ void plugin_shutdown_all (void) stop_read_threads (); destroy_all_callbacks (&list_init); + + pthread_mutex_lock (&read_lock); + llist_destroy (read_list); + read_list = NULL; + pthread_mutex_unlock (&read_lock); + destroy_read_heap (); plugin_flush (/* plugin = */ NULL, /* timeout = */ -1, @@ -1119,8 +1213,14 @@ void plugin_shutdown_all (void) (*callback) (); } - destroy_all_callbacks (&list_write); + /* Write plugins which use the `user_data' pointer usually need the + * same data available to the flush callback. If this is the case, set + * the free_function to NULL when registering the flush callback and to + * the real free function when registering the write callback. This way + * the data isn't freed twice. */ destroy_all_callbacks (&list_flush); + destroy_all_callbacks (&list_write); + destroy_all_callbacks (&list_notification); destroy_all_callbacks (&list_shutdown); destroy_all_callbacks (&list_log); @@ -1136,6 +1236,8 @@ int plugin_dispatch_values (value_list_t *vl) data_set_t *ds; + int free_meta_data = 0; + if ((vl == NULL) || (vl->type[0] == 0) || (vl->values == NULL) || (vl->values_len < 1)) { @@ -1143,6 +1245,12 @@ int plugin_dispatch_values (value_list_t *vl) return (-1); } + /* Free meta data only if the calling function didn't specify any. In + * this case matches and targets may add some and the calling function + * may not expect (and therefore free) that data. */ + if (vl->meta == NULL) + free_meta_data = 1; + if (list_write == NULL) c_complain_once (LOG_WARNING, &no_write_complaint, "plugin_dispatch_values: No write callback has been " @@ -1257,6 +1365,9 @@ int plugin_dispatch_values (value_list_t *vl) /* Update the value cache */ uc_update (ds, vl); + /* Initiate threshold checking */ + ut_check_threshold (ds, vl); + if (post_cache_chain != NULL) { status = fc_process_chain (ds, vl, post_cache_chain); @@ -1280,6 +1391,12 @@ int plugin_dispatch_values (value_list_t *vl) vl->values_len = saved_values_len; } + if ((free_meta_data != 0) && (vl->meta != NULL)) + { + meta_data_destroy (vl->meta); + vl->meta = NULL; + } + return (0); } /* int plugin_dispatch_values */ @@ -1327,7 +1444,12 @@ void plugin_log (int level, const char *format, ...) llentry_t *le; if (list_log == NULL) + { + va_start (ap, format); + vfprintf (stderr, format, ap); + va_end (ap); return; + } #if !COLLECT_DEBUG if (level >= LOG_DEBUG)