X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Fplugin.c;h=2e25758e60609e77aa7a3074abc469f8c9bed6e9;hb=a04ffbda508739433df0975328100e33e7986c87;hp=6139baf0de05a2145ac5020d73d2bf64ef2814ae;hpb=307459f5a1d79ee373cf59c7c8a0824d325c43e3;p=collectd.git diff --git a/src/plugin.c b/src/plugin.c index 6139baf0..2e25758e 100644 --- a/src/plugin.c +++ b/src/plugin.c @@ -51,6 +51,7 @@ typedef struct callback_func_s callback_func_t; #define RF_SIMPLE 0 #define RF_COMPLEX 1 +#define RF_REMOVE 65535 struct read_func_s { /* `read_func_t' "inherits" from `callback_func_t'. @@ -58,6 +59,7 @@ struct read_func_s #define rf_callback rf_super.cf_callback #define rf_udata rf_super.cf_udata callback_func_t rf_super; + char rf_group[DATA_MAX_NAME_LEN]; char rf_name[DATA_MAX_NAME_LEN]; int rf_type; struct timespec rf_interval; @@ -84,6 +86,7 @@ static c_avl_tree_t *data_sets; static char *plugindir = NULL; static c_heap_t *read_heap = NULL; +static llist_t *read_list; static int read_loop = 1; static pthread_mutex_t read_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t read_cond = PTHREAD_COND_INITIALIZER; @@ -149,7 +152,7 @@ static void destroy_read_heap (void) /* {{{ */ { callback_func_t *cf; - cf = c_head_get_root (read_heap); + cf = c_heap_get_root (read_heap); if (cf == NULL) break; @@ -171,7 +174,7 @@ static int register_callback (llist_t **list, /* {{{ */ *list = llist_create (); if (*list == NULL) { - ERROR ("plugin: create_register_callback: " + ERROR ("plugin: register_callback: " "llist_create failed."); destroy_callback (cf); return (-1); @@ -181,7 +184,7 @@ static int register_callback (llist_t **list, /* {{{ */ key = strdup (name); if (key == NULL) { - ERROR ("plugin: create_register_callback: strdup failed."); + ERROR ("plugin: register_callback: strdup failed."); destroy_callback (cf); return (-1); } @@ -192,7 +195,7 @@ static int register_callback (llist_t **list, /* {{{ */ le = llentry_create (key, cf); if (le == NULL) { - ERROR ("plugin: create_register_callback: " + ERROR ("plugin: register_callback: " "llentry_create failed."); free (key); destroy_callback (cf); @@ -208,6 +211,10 @@ static int register_callback (llist_t **list, /* {{{ */ old_cf = le->value; le->value = cf; + WARNING ("plugin: register_callback: " + "a callback named `%s' already exists - " + "overwriting the old entry!", name); + destroy_callback (old_cf); sfree (key); } @@ -268,7 +275,7 @@ static int plugin_unregister (llist_t *list, const char *name) /* {{{ */ * object, but it will bitch about a shared object not having a * ``module_register'' symbol.. */ -static int plugin_load_file (char *file) +static int plugin_load_file (char *file, uint32_t flags) { lt_dlhandle dlh; void (*reg_handle) (void); @@ -278,7 +285,24 @@ static int plugin_load_file (char *file) lt_dlinit (); lt_dlerror (); /* clear errors */ - if ((dlh = lt_dlopen (file)) == NULL) +#if LIBTOOL_VERSION == 2 + if (flags & PLUGIN_FLAGS_GLOBAL) { + lt_dladvise advise; + lt_dladvise_init(&advise); + lt_dladvise_global(&advise); + dlh = lt_dlopenadvise(file, advise); + lt_dladvise_destroy(&advise); + } else { + dlh = lt_dlopen (file); + } +#else /* if LIBTOOL_VERSION == 1 */ + if (flags & PLUGIN_FLAGS_GLOBAL) + ERROR ("plugin_load_file: The global flag is not supported, " + "libtool 2 is required for this."); + dlh = lt_dlopen (file); +#endif + + if (dlh == NULL) { const char *error = lt_dlerror (); @@ -300,24 +324,32 @@ static int plugin_load_file (char *file) return (0); } +static _Bool timeout_reached(struct timespec timeout) +{ + struct timeval now; + gettimeofday(&now, NULL); + return (now.tv_sec >= timeout.tv_sec && now.tv_usec >= (timeout.tv_nsec / 1000)); +} + static void *plugin_read_thread (void __attribute__((unused)) *args) { while (read_loop != 0) { read_func_t *rf; - struct timeval now; + cdtime_t now; int status; + int rf_type; + int rc; /* Get the read function that needs to be read next. */ - rf = c_head_get_root (read_heap); + rf = c_heap_get_root (read_heap); if (rf == NULL) { struct timespec abstime; - gettimeofday (&now, /* timezone = */ NULL); + now = cdtime (); - abstime.tv_sec = now.tv_sec + interval_g; - abstime.tv_nsec = 1000 * now.tv_usec; + CDTIME_T_TO_TIMESPEC (now + interval_g, &abstime); pthread_mutex_lock (&read_lock); pthread_cond_timedwait (&read_cond, &read_lock, @@ -328,22 +360,33 @@ static void *plugin_read_thread (void __attribute__((unused)) *args) if ((rf->rf_interval.tv_sec == 0) && (rf->rf_interval.tv_nsec == 0)) { - gettimeofday (&now, /* timezone = */ NULL); + now = cdtime (); - rf->rf_interval.tv_sec = interval_g; - rf->rf_interval.tv_nsec = 0; + CDTIME_T_TO_TIMESPEC (interval_g, &rf->rf_interval); rf->rf_effective_interval = rf->rf_interval; - rf->rf_next_read.tv_sec = now.tv_sec; - rf->rf_next_read.tv_nsec = 1000 * now.tv_usec; + CDTIME_T_TO_TIMESPEC (now, &rf->rf_next_read); } /* sleep until this entry is due, * using pthread_cond_timedwait */ pthread_mutex_lock (&read_lock); - pthread_cond_timedwait (&read_cond, &read_lock, + /* In pthread_cond_timedwait, spurious wakeups are possible + * (and really happen, at least on NetBSD with > 1 CPU), thus + * we need to re-evaluate the condition every time + * pthread_cond_timedwait returns. */ + rc = 0; + while ((read_loop != 0) + && !timeout_reached(rf->rf_next_read) + && rc == 0) + { + rc = pthread_cond_timedwait (&read_cond, &read_lock, &rf->rf_next_read); + } + + /* Must hold `read_lock' when accessing `rf->rf_type'. */ + rf_type = rf->rf_type; pthread_mutex_unlock (&read_lock); /* Check if we're supposed to stop.. This may have interrupted @@ -355,9 +398,22 @@ static void *plugin_read_thread (void __attribute__((unused)) *args) break; } + /* The entry has been marked for deletion. The linked list + * entry has already been removed by `plugin_unregister_read'. + * All we have to do here is free the `read_func_t' and + * continue. */ + if (rf_type == RF_REMOVE) + { + DEBUG ("plugin_read_thread: Destroying the `%s' " + "callback.", rf->rf_name); + destroy_callback ((callback_func_t *) rf); + rf = NULL; + continue; + } + DEBUG ("plugin_read_thread: Handling `%s'.", rf->rf_name); - if (rf->rf_type == RF_SIMPLE) + if (rf_type == RF_SIMPLE) { int (*callback) (void); @@ -368,6 +424,8 @@ static void *plugin_read_thread (void __attribute__((unused)) *args) { plugin_read_cb callback; + assert (rf_type == RF_COMPLEX); + callback = rf->rf_callback; status = (*callback) (&rf->rf_udata); } @@ -398,7 +456,7 @@ static void *plugin_read_thread (void __attribute__((unused)) *args) } /* update the ``next read due'' field */ - gettimeofday (&now, /* timezone = */ NULL); + now = cdtime (); DEBUG ("plugin_read_thread: Effective interval of the " "%s plugin is %i.%09i.", @@ -415,15 +473,12 @@ static void *plugin_read_thread (void __attribute__((unused)) *args) NORMALIZE_TIMESPEC (rf->rf_next_read); /* Check, if `rf_next_read' is in the past. */ - if ((rf->rf_next_read.tv_sec < now.tv_sec) - || ((rf->rf_next_read.tv_sec == now.tv_sec) - && (rf->rf_next_read.tv_nsec < (1000 * now.tv_usec)))) + if (TIMESPEC_TO_CDTIME_T (&rf->rf_next_read) < now) { /* `rf_next_read' is in the past. Insert `now' * so this value doesn't trail off into the * past too much. */ - rf->rf_next_read.tv_sec = now.tv_sec; - rf->rf_next_read.tv_nsec = 1000 * now.tv_usec; + CDTIME_T_TO_TIMESPEC (now, &rf->rf_next_read); } DEBUG ("plugin_read_thread: Next read of the %s plugin at %i.%09i.", @@ -515,7 +570,7 @@ void plugin_set_dir (const char *dir) } #define BUFSIZE 512 -int plugin_load (const char *type) +int plugin_load (const char *type, uint32_t flags) { DIR *dh; const char *dir; @@ -577,7 +632,7 @@ int plugin_load (const char *type) continue; } - if (plugin_load_file (filename) == 0) + if (plugin_load_file (filename, flags) == 0) { /* success */ ret = 0; @@ -641,22 +696,67 @@ static int plugin_compare_read_func (const void *arg0, const void *arg1) return (0); } /* int plugin_compare_read_func */ -int plugin_register_read (const char *name, - int (*callback) (void)) +/* Add a read function to both, the heap and a linked list. The linked list if + * used to look-up read functions, especially for the remove function. The heap + * is used to determine which plugin to read next. */ +static int plugin_insert_read (read_func_t *rf) { - read_func_t *rf; + int status; + llentry_t *le; + + pthread_mutex_lock (&read_lock); + + if (read_list == NULL) + { + read_list = llist_create (); + if (read_list == NULL) + { + pthread_mutex_unlock (&read_lock); + ERROR ("plugin_insert_read: read_list failed."); + return (-1); + } + } if (read_heap == NULL) { read_heap = c_heap_create (plugin_compare_read_func); if (read_heap == NULL) { - ERROR ("plugin_register_read: " - "c_heap_create failed."); + pthread_mutex_unlock (&read_lock); + ERROR ("plugin_insert_read: c_heap_create failed."); return (-1); } } + le = llentry_create (rf->rf_name, rf); + if (le == NULL) + { + pthread_mutex_unlock (&read_lock); + ERROR ("plugin_insert_read: llentry_create failed."); + return (-1); + } + + status = c_heap_insert (read_heap, rf); + if (status != 0) + { + pthread_mutex_unlock (&read_lock); + ERROR ("plugin_insert_read: c_heap_insert failed."); + llentry_destroy (le); + return (-1); + } + + /* This does not fail. */ + llist_append (read_list, le); + + pthread_mutex_unlock (&read_lock); + return (0); +} /* int plugin_insert_read */ + +int plugin_register_read (const char *name, + int (*callback) (void)) +{ + read_func_t *rf; + rf = (read_func_t *) malloc (sizeof (read_func_t)); if (rf == NULL) { @@ -670,32 +770,23 @@ int plugin_register_read (const char *name, rf->rf_callback = (void *) callback; rf->rf_udata.data = NULL; rf->rf_udata.free_func = NULL; + rf->rf_group[0] = '\0'; sstrncpy (rf->rf_name, name, sizeof (rf->rf_name)); rf->rf_type = RF_SIMPLE; rf->rf_interval.tv_sec = 0; rf->rf_interval.tv_nsec = 0; rf->rf_effective_interval = rf->rf_interval; - return (c_heap_insert (read_heap, rf)); + return (plugin_insert_read (rf)); } /* int plugin_register_read */ -int plugin_register_complex_read (const char *name, +int plugin_register_complex_read (const char *group, const char *name, plugin_read_cb callback, const struct timespec *interval, user_data_t *user_data) { read_func_t *rf; - if (read_heap == NULL) - { - read_heap = c_heap_create (plugin_compare_read_func); - if (read_heap == NULL) - { - ERROR ("plugin_register_read: c_heap_create failed."); - return (-1); - } - } - rf = (read_func_t *) malloc (sizeof (read_func_t)); if (rf == NULL) { @@ -705,6 +796,10 @@ int plugin_register_complex_read (const char *name, memset (rf, 0, sizeof (read_func_t)); rf->rf_callback = (void *) callback; + if (group != NULL) + sstrncpy (rf->rf_group, group, sizeof (rf->rf_group)); + else + rf->rf_group[0] = '\0'; sstrncpy (rf->rf_name, name, sizeof (rf->rf_name)); rf->rf_type = RF_COMPLEX; if (interval != NULL) @@ -724,7 +819,7 @@ int plugin_register_complex_read (const char *name, rf->rf_udata = *user_data; } - return (c_heap_insert (read_heap, rf)); + return (plugin_insert_read (rf)); } /* int plugin_register_complex_read */ int plugin_register_write (const char *name, @@ -816,12 +911,106 @@ int plugin_unregister_init (const char *name) return (plugin_unregister (list_init, name)); } -int plugin_unregister_read (const char *name) +int plugin_unregister_read (const char *name) /* {{{ */ { - /* TODO: Implement removal of a specific key from the heap. */ - assert (0); - return (-1); -} + llentry_t *le; + read_func_t *rf; + + if (name == NULL) + return (-ENOENT); + + pthread_mutex_lock (&read_lock); + + if (read_list == NULL) + { + pthread_mutex_unlock (&read_lock); + return (-ENOENT); + } + + le = llist_search (read_list, name); + if (le == NULL) + { + pthread_mutex_unlock (&read_lock); + WARNING ("plugin_unregister_read: No such read function: %s", + name); + return (-ENOENT); + } + + llist_remove (read_list, le); + + rf = le->value; + assert (rf != NULL); + rf->rf_type = RF_REMOVE; + + pthread_mutex_unlock (&read_lock); + + llentry_destroy (le); + + DEBUG ("plugin_unregister_read: Marked `%s' for removal.", name); + + return (0); +} /* }}} int plugin_unregister_read */ + +static int compare_read_func_group (llentry_t *e, void *ud) /* {{{ */ +{ + read_func_t *rf = e->value; + char *group = ud; + + return strcmp (rf->rf_group, (const char *)group); +} /* }}} int compare_read_func_group */ + +int plugin_unregister_read_group (const char *group) /* {{{ */ +{ + llentry_t *le; + read_func_t *rf; + + int found = 0; + + if (group == NULL) + return (-ENOENT); + + pthread_mutex_lock (&read_lock); + + if (read_list == NULL) + { + pthread_mutex_unlock (&read_lock); + return (-ENOENT); + } + + while (42) + { + le = llist_search_custom (read_list, + compare_read_func_group, (void *)group); + + if (le == NULL) + break; + + ++found; + + llist_remove (read_list, le); + + rf = le->value; + assert (rf != NULL); + rf->rf_type = RF_REMOVE; + + llentry_destroy (le); + + DEBUG ("plugin_unregister_read_group: " + "Marked `%s' (group `%s') for removal.", + rf->rf_name, group); + } + + pthread_mutex_unlock (&read_lock); + + if (found == 0) + { + WARNING ("plugin_unregister_read_group: No such " + "group of read function: %s", group); + return (-ENOENT); + } + + return (0); +} /* }}} int plugin_unregister_read_group */ int plugin_unregister_write (const char *name) { @@ -948,7 +1137,7 @@ int plugin_read_all_once (void) { read_func_t *rf; - rf = c_head_get_root (read_heap); + rf = c_heap_get_root (read_heap); if (rf == NULL) break; @@ -1056,7 +1245,7 @@ int plugin_write (const char *plugin, /* {{{ */ return (status); } /* }}} int plugin_write */ -int plugin_flush (const char *plugin, int timeout, const char *identifier) +int plugin_flush (const char *plugin, cdtime_t timeout, const char *identifier) { llentry_t *le; @@ -1093,6 +1282,12 @@ void plugin_shutdown_all (void) stop_read_threads (); destroy_all_callbacks (&list_init); + + pthread_mutex_lock (&read_lock); + llist_destroy (read_list); + read_list = NULL; + pthread_mutex_unlock (&read_lock); + destroy_read_heap (); plugin_flush (/* plugin = */ NULL, /* timeout = */ -1, @@ -1119,8 +1314,14 @@ void plugin_shutdown_all (void) (*callback) (); } - destroy_all_callbacks (&list_write); + /* Write plugins which use the `user_data' pointer usually need the + * same data available to the flush callback. If this is the case, set + * the free_function to NULL when registering the flush callback and to + * the real free function when registering the write callback. This way + * the data isn't freed twice. */ destroy_all_callbacks (&list_flush); + destroy_all_callbacks (&list_write); + destroy_all_callbacks (&list_notification); destroy_all_callbacks (&list_shutdown); destroy_all_callbacks (&list_log); @@ -1136,6 +1337,8 @@ int plugin_dispatch_values (value_list_t *vl) data_set_t *ds; + int free_meta_data = 0; + if ((vl == NULL) || (vl->type[0] == 0) || (vl->values == NULL) || (vl->values_len < 1)) { @@ -1143,6 +1346,12 @@ int plugin_dispatch_values (value_list_t *vl) return (-1); } + /* Free meta data only if the calling function didn't specify any. In + * this case matches and targets may add some and the calling function + * may not expect (and therefore free) that data. */ + if (vl->meta == NULL) + free_meta_data = 1; + if (list_write == NULL) c_complain_once (LOG_WARNING, &no_write_complaint, "plugin_dispatch_values: No write callback has been " @@ -1159,21 +1368,27 @@ int plugin_dispatch_values (value_list_t *vl) if (c_avl_get (data_sets, vl->type, (void *) &ds) != 0) { - INFO ("plugin_dispatch_values: Dataset not found: %s", vl->type); + char ident[6 * DATA_MAX_NAME_LEN]; + + FORMAT_VL (ident, sizeof (ident), vl); + INFO ("plugin_dispatch_values: Dataset not found: %s " + "(from \"%s\"), check your types.db!", + vl->type, ident); return (-1); } if (vl->time == 0) - vl->time = time (NULL); + vl->time = cdtime (); if (vl->interval <= 0) vl->interval = interval_g; - DEBUG ("plugin_dispatch_values: time = %u; interval = %i; " + DEBUG ("plugin_dispatch_values: time = %.3f; interval = %.3f; " "host = %s; " "plugin = %s; plugin_instance = %s; " "type = %s; type_instance = %s;", - (unsigned int) vl->time, vl->interval, + CDTIME_T_TO_DOUBLE (vl->time), + CDTIME_T_TO_DOUBLE (vl->interval), vl->host, vl->plugin, vl->plugin_instance, vl->type, vl->type_instance); @@ -1283,6 +1498,12 @@ int plugin_dispatch_values (value_list_t *vl) vl->values_len = saved_values_len; } + if ((free_meta_data != 0) && (vl->meta != NULL)) + { + meta_data_destroy (vl->meta); + vl->meta = NULL; + } + return (0); } /* int plugin_dispatch_values */ @@ -1292,9 +1513,9 @@ int plugin_dispatch_notification (const notification_t *notif) /* Possible TODO: Add flap detection here */ DEBUG ("plugin_dispatch_notification: severity = %i; message = %s; " - "time = %u; host = %s;", + "time = %.3f; host = %s;", notif->severity, notif->message, - (unsigned int) notif->time, notif->host); + CDTIME_T_TO_DOUBLE (notif->time), notif->host); /* Nobody cares for notifications */ if (list_notification == NULL) @@ -1329,9 +1550,6 @@ void plugin_log (int level, const char *format, ...) va_list ap; llentry_t *le; - if (list_log == NULL) - return; - #if !COLLECT_DEBUG if (level >= LOG_DEBUG) return; @@ -1342,6 +1560,12 @@ void plugin_log (int level, const char *format, ...) msg[sizeof (msg) - 1] = '\0'; va_end (ap); + if (list_log == NULL) + { + fprintf (stderr, "%s\n", msg); + return; + } + le = llist_head (list_log); while (le != NULL) { @@ -1425,7 +1649,7 @@ static int plugin_notification_meta_add (notification_t *n, } case NM_TYPE_BOOLEAN: { - meta->nm_value.nm_boolean = *((bool *) value); + meta->nm_value.nm_boolean = *((_Bool *) value); break; } default: @@ -1479,7 +1703,7 @@ int plugin_notification_meta_add_double (notification_t *n, int plugin_notification_meta_add_boolean (notification_t *n, const char *name, - bool value) + _Bool value) { return (plugin_notification_meta_add (n, name, NM_TYPE_BOOLEAN, &value)); }