#define RF_SIMPLE 0
#define RF_COMPLEX 1
+#define RF_REMOVE 65535
struct read_func_s
{
/* `read_func_t' "inherits" from `callback_func_t'.
#define rf_callback rf_super.cf_callback
#define rf_udata rf_super.cf_udata
callback_func_t rf_super;
+ char rf_group[DATA_MAX_NAME_LEN];
char rf_name[DATA_MAX_NAME_LEN];
int rf_type;
struct timespec rf_interval;
static char *plugindir = NULL;
static c_heap_t *read_heap = NULL;
+static llist_t *read_list;
static int read_loop = 1;
static pthread_mutex_t read_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t read_cond = PTHREAD_COND_INITIALIZER;
{
callback_func_t *cf;
- cf = c_head_get_root (read_heap);
+ cf = c_heap_get_root (read_heap);
if (cf == NULL)
break;
*list = llist_create ();
if (*list == NULL)
{
- ERROR ("plugin: create_register_callback: "
+ ERROR ("plugin: register_callback: "
"llist_create failed.");
destroy_callback (cf);
return (-1);
key = strdup (name);
if (key == NULL)
{
- ERROR ("plugin: create_register_callback: strdup failed.");
+ ERROR ("plugin: register_callback: strdup failed.");
destroy_callback (cf);
return (-1);
}
le = llentry_create (key, cf);
if (le == NULL)
{
- ERROR ("plugin: create_register_callback: "
+ ERROR ("plugin: register_callback: "
"llentry_create failed.");
free (key);
destroy_callback (cf);
old_cf = le->value;
le->value = cf;
+ WARNING ("plugin: register_callback: "
+ "a callback named `%s' already exists - "
+ "overwriting the old entry!", name);
+
destroy_callback (old_cf);
sfree (key);
}
* object, but it will bitch about a shared object not having a
* ``module_register'' symbol..
*/
-static int plugin_load_file (char *file)
+static int plugin_load_file (char *file, uint32_t flags)
{
lt_dlhandle dlh;
void (*reg_handle) (void);
- DEBUG ("file = %s", file);
-
lt_dlinit ();
lt_dlerror (); /* clear errors */
- if ((dlh = lt_dlopen (file)) == NULL)
+#if LIBTOOL_VERSION == 2
+ if (flags & PLUGIN_FLAGS_GLOBAL) {
+ lt_dladvise advise;
+ lt_dladvise_init(&advise);
+ lt_dladvise_global(&advise);
+ dlh = lt_dlopenadvise(file, advise);
+ lt_dladvise_destroy(&advise);
+ } else {
+ dlh = lt_dlopen (file);
+ }
+#else /* if LIBTOOL_VERSION == 1 */
+ if (flags & PLUGIN_FLAGS_GLOBAL)
+ WARNING ("plugin_load_file: The global flag is not supported, "
+ "libtool 2 is required for this.");
+ dlh = lt_dlopen (file);
+#endif
+
+ if (dlh == NULL)
{
- const char *error = lt_dlerror ();
+ char errbuf[1024] = "";
+
+ ssnprintf (errbuf, sizeof (errbuf),
+ "lt_dlopen (\"%s\") failed: %s. "
+ "The most common cause for this problem are "
+ "missing dependencies. Use ldd(1) to check "
+ "the dependencies of the plugin "
+ "/ shared object.",
+ file, lt_dlerror ());
+
+ ERROR ("%s", errbuf);
+ /* Make sure this is printed to STDERR in any case, but also
+ * make sure it's printed only once. */
+ if (list_log != NULL)
+ fprintf (stderr, "ERROR: %s\n", errbuf);
- ERROR ("lt_dlopen (%s) failed: %s", file, error);
- fprintf (stderr, "lt_dlopen (%s) failed: %s\n", file, error);
return (1);
}
if ((reg_handle = (void (*) (void)) lt_dlsym (dlh, "module_register")) == NULL)
{
- WARNING ("Couldn't find symbol `module_register' in `%s': %s\n",
+ WARNING ("Couldn't find symbol \"module_register\" in \"%s\": %s\n",
file, lt_dlerror ());
lt_dlclose (dlh);
return (-1);
return (0);
}
+static _Bool timeout_reached(struct timespec timeout)
+{
+ struct timeval now;
+ gettimeofday(&now, NULL);
+ return (now.tv_sec >= timeout.tv_sec && now.tv_usec >= (timeout.tv_nsec / 1000));
+}
+
static void *plugin_read_thread (void __attribute__((unused)) *args)
{
while (read_loop != 0)
read_func_t *rf;
struct timeval now;
int status;
+ int rf_type;
+ int rc;
/* Get the read function that needs to be read next. */
- rf = c_head_get_root (read_heap);
+ rf = c_heap_get_root (read_heap);
if (rf == NULL)
{
struct timespec abstime;
/* sleep until this entry is due,
* using pthread_cond_timedwait */
pthread_mutex_lock (&read_lock);
- pthread_cond_timedwait (&read_cond, &read_lock,
+ /* In pthread_cond_timedwait, spurious wakeups are possible
+ * (and really happen, at least on NetBSD with > 1 CPU), thus
+ * we need to re-evaluate the condition every time
+ * pthread_cond_timedwait returns. */
+ rc = 0;
+ while ((read_loop != 0)
+ && !timeout_reached(rf->rf_next_read)
+ && rc == 0)
+ {
+ rc = pthread_cond_timedwait (&read_cond, &read_lock,
&rf->rf_next_read);
+ }
+
+ /* Must hold `read_lock' when accessing `rf->rf_type'. */
+ rf_type = rf->rf_type;
pthread_mutex_unlock (&read_lock);
/* Check if we're supposed to stop.. This may have interrupted
break;
}
+ /* The entry has been marked for deletion. The linked list
+ * entry has already been removed by `plugin_unregister_read'.
+ * All we have to do here is free the `read_func_t' and
+ * continue. */
+ if (rf_type == RF_REMOVE)
+ {
+ DEBUG ("plugin_read_thread: Destroying the `%s' "
+ "callback.", rf->rf_name);
+ destroy_callback ((callback_func_t *) rf);
+ rf = NULL;
+ continue;
+ }
+
DEBUG ("plugin_read_thread: Handling `%s'.", rf->rf_name);
- if (rf->rf_type == RF_SIMPLE)
+ if (rf_type == RF_SIMPLE)
{
int (*callback) (void);
{
plugin_read_cb callback;
+ assert (rf_type == RF_COMPLEX);
+
callback = rf->rf_callback;
status = (*callback) (&rf->rf_udata);
}
}
#define BUFSIZE 512
-int plugin_load (const char *type)
+int plugin_load (const char *type, uint32_t flags)
{
DIR *dh;
const char *dir;
continue;
}
- if (plugin_load_file (filename) == 0)
+ if (plugin_load_file (filename, flags) == 0)
{
/* success */
ret = 0;
return (0);
} /* int plugin_compare_read_func */
-int plugin_register_read (const char *name,
- int (*callback) (void))
+/* Add a read function to both, the heap and a linked list. The linked list if
+ * used to look-up read functions, especially for the remove function. The heap
+ * is used to determine which plugin to read next. */
+static int plugin_insert_read (read_func_t *rf)
{
- read_func_t *rf;
+ int status;
+ llentry_t *le;
+
+ pthread_mutex_lock (&read_lock);
+
+ if (read_list == NULL)
+ {
+ read_list = llist_create ();
+ if (read_list == NULL)
+ {
+ pthread_mutex_unlock (&read_lock);
+ ERROR ("plugin_insert_read: read_list failed.");
+ return (-1);
+ }
+ }
if (read_heap == NULL)
{
read_heap = c_heap_create (plugin_compare_read_func);
if (read_heap == NULL)
{
- ERROR ("plugin_register_read: "
- "c_heap_create failed.");
+ pthread_mutex_unlock (&read_lock);
+ ERROR ("plugin_insert_read: c_heap_create failed.");
return (-1);
}
}
+ le = llentry_create (rf->rf_name, rf);
+ if (le == NULL)
+ {
+ pthread_mutex_unlock (&read_lock);
+ ERROR ("plugin_insert_read: llentry_create failed.");
+ return (-1);
+ }
+
+ status = c_heap_insert (read_heap, rf);
+ if (status != 0)
+ {
+ pthread_mutex_unlock (&read_lock);
+ ERROR ("plugin_insert_read: c_heap_insert failed.");
+ llentry_destroy (le);
+ return (-1);
+ }
+
+ /* This does not fail. */
+ llist_append (read_list, le);
+
+ pthread_mutex_unlock (&read_lock);
+ return (0);
+} /* int plugin_insert_read */
+
+int plugin_register_read (const char *name,
+ int (*callback) (void))
+{
+ read_func_t *rf;
+
rf = (read_func_t *) malloc (sizeof (read_func_t));
if (rf == NULL)
{
rf->rf_callback = (void *) callback;
rf->rf_udata.data = NULL;
rf->rf_udata.free_func = NULL;
+ rf->rf_group[0] = '\0';
sstrncpy (rf->rf_name, name, sizeof (rf->rf_name));
rf->rf_type = RF_SIMPLE;
rf->rf_interval.tv_sec = 0;
rf->rf_interval.tv_nsec = 0;
rf->rf_effective_interval = rf->rf_interval;
- return (c_heap_insert (read_heap, rf));
+ return (plugin_insert_read (rf));
} /* int plugin_register_read */
-int plugin_register_complex_read (const char *name,
+int plugin_register_complex_read (const char *group, const char *name,
plugin_read_cb callback,
const struct timespec *interval,
user_data_t *user_data)
{
read_func_t *rf;
- if (read_heap == NULL)
- {
- read_heap = c_heap_create (plugin_compare_read_func);
- if (read_heap == NULL)
- {
- ERROR ("plugin_register_read: c_heap_create failed.");
- return (-1);
- }
- }
-
rf = (read_func_t *) malloc (sizeof (read_func_t));
if (rf == NULL)
{
memset (rf, 0, sizeof (read_func_t));
rf->rf_callback = (void *) callback;
+ if (group != NULL)
+ sstrncpy (rf->rf_group, group, sizeof (rf->rf_group));
+ else
+ rf->rf_group[0] = '\0';
sstrncpy (rf->rf_name, name, sizeof (rf->rf_name));
rf->rf_type = RF_COMPLEX;
if (interval != NULL)
rf->rf_udata = *user_data;
}
- return (c_heap_insert (read_heap, rf));
+ return (plugin_insert_read (rf));
} /* int plugin_register_complex_read */
int plugin_register_write (const char *name,
(void *) callback, ud));
} /* int plugin_register_flush */
-int plugin_register_shutdown (char *name,
+int plugin_register_shutdown (const char *name,
int (*callback) (void))
{
return (create_register_callback (&list_shutdown, name,
return (plugin_unregister (list_init, name));
}
-int plugin_unregister_read (const char *name)
+int plugin_unregister_read (const char *name) /* {{{ */
{
- /* TODO: Implement removal of a specific key from the heap. */
- assert (0);
- return (-1);
-}
+ llentry_t *le;
+ read_func_t *rf;
+
+ if (name == NULL)
+ return (-ENOENT);
+
+ pthread_mutex_lock (&read_lock);
+
+ if (read_list == NULL)
+ {
+ pthread_mutex_unlock (&read_lock);
+ return (-ENOENT);
+ }
+
+ le = llist_search (read_list, name);
+ if (le == NULL)
+ {
+ pthread_mutex_unlock (&read_lock);
+ WARNING ("plugin_unregister_read: No such read function: %s",
+ name);
+ return (-ENOENT);
+ }
+
+ llist_remove (read_list, le);
+
+ rf = le->value;
+ assert (rf != NULL);
+ rf->rf_type = RF_REMOVE;
+
+ pthread_mutex_unlock (&read_lock);
+
+ llentry_destroy (le);
+
+ DEBUG ("plugin_unregister_read: Marked `%s' for removal.", name);
+
+ return (0);
+} /* }}} int plugin_unregister_read */
+
+static int compare_read_func_group (llentry_t *e, void *ud) /* {{{ */
+{
+ read_func_t *rf = e->value;
+ char *group = ud;
+
+ return strcmp (rf->rf_group, (const char *)group);
+} /* }}} int compare_read_func_group */
+
+int plugin_unregister_read_group (const char *group) /* {{{ */
+{
+ llentry_t *le;
+ read_func_t *rf;
+
+ int found = 0;
+
+ if (group == NULL)
+ return (-ENOENT);
+
+ pthread_mutex_lock (&read_lock);
+
+ if (read_list == NULL)
+ {
+ pthread_mutex_unlock (&read_lock);
+ return (-ENOENT);
+ }
+
+ while (42)
+ {
+ le = llist_search_custom (read_list,
+ compare_read_func_group, (void *)group);
+
+ if (le == NULL)
+ break;
+
+ ++found;
+
+ llist_remove (read_list, le);
+
+ rf = le->value;
+ assert (rf != NULL);
+ rf->rf_type = RF_REMOVE;
+
+ llentry_destroy (le);
+
+ DEBUG ("plugin_unregister_read_group: "
+ "Marked `%s' (group `%s') for removal.",
+ rf->rf_name, group);
+ }
+
+ pthread_mutex_unlock (&read_lock);
+
+ if (found == 0)
+ {
+ WARNING ("plugin_unregister_read_group: No such "
+ "group of read function: %s", group);
+ return (-ENOENT);
+ }
+
+ return (0);
+} /* }}} int plugin_unregister_read_group */
int plugin_unregister_write (const char *name)
{
{
read_func_t *rf;
- rf = c_head_get_root (read_heap);
+ rf = c_heap_get_root (read_heap);
if (rf == NULL)
break;
stop_read_threads ();
destroy_all_callbacks (&list_init);
+
+ pthread_mutex_lock (&read_lock);
+ llist_destroy (read_list);
+ read_list = NULL;
+ pthread_mutex_unlock (&read_lock);
+
destroy_read_heap ();
plugin_flush (/* plugin = */ NULL, /* timeout = */ -1,
(*callback) ();
}
- destroy_all_callbacks (&list_write);
+ /* Write plugins which use the `user_data' pointer usually need the
+ * same data available to the flush callback. If this is the case, set
+ * the free_function to NULL when registering the flush callback and to
+ * the real free function when registering the write callback. This way
+ * the data isn't freed twice. */
destroy_all_callbacks (&list_flush);
+ destroy_all_callbacks (&list_write);
+
destroy_all_callbacks (&list_notification);
destroy_all_callbacks (&list_shutdown);
destroy_all_callbacks (&list_log);
data_set_t *ds;
+ int free_meta_data = 0;
+
if ((vl == NULL) || (vl->type[0] == 0)
|| (vl->values == NULL) || (vl->values_len < 1))
{
- ERROR ("plugin_dispatch_values: Invalid value list.");
+ ERROR ("plugin_dispatch_values: Invalid value list "
+ "from plugin %s.", vl->plugin);
return (-1);
}
+ /* Free meta data only if the calling function didn't specify any. In
+ * this case matches and targets may add some and the calling function
+ * may not expect (and therefore free) that data. */
+ if (vl->meta == NULL)
+ free_meta_data = 1;
+
if (list_write == NULL)
c_complain_once (LOG_WARNING, &no_write_complaint,
"plugin_dispatch_values: No write callback has been "
if (c_avl_get (data_sets, vl->type, (void *) &ds) != 0)
{
- INFO ("plugin_dispatch_values: Dataset not found: %s", vl->type);
+ char ident[6 * DATA_MAX_NAME_LEN];
+
+ FORMAT_VL (ident, sizeof (ident), vl);
+ INFO ("plugin_dispatch_values: Dataset not found: %s "
+ "(from \"%s\"), check your types.db!",
+ vl->type, ident);
return (-1);
}
vl->values_len = saved_values_len;
}
+ if ((free_meta_data != 0) && (vl->meta != NULL))
+ {
+ meta_data_destroy (vl->meta);
+ vl->meta = NULL;
+ }
+
return (0);
} /* int plugin_dispatch_values */
va_list ap;
llentry_t *le;
- if (list_log == NULL)
- return;
-
#if !COLLECT_DEBUG
if (level >= LOG_DEBUG)
return;
msg[sizeof (msg) - 1] = '\0';
va_end (ap);
+ if (list_log == NULL)
+ {
+ fprintf (stderr, "%s\n", msg);
+ return;
+ }
+
le = llist_head (list_log);
while (le != NULL)
{