Merge branch 'collectd-4.8' into collectd-4.9
[collectd.git] / src / plugin.c
index 6139baf..5ff21c6 100644 (file)
@@ -51,6 +51,7 @@ typedef struct callback_func_s callback_func_t;
 
 #define RF_SIMPLE  0
 #define RF_COMPLEX 1
+#define RF_REMOVE  65535
 struct read_func_s
 {
        /* `read_func_t' "inherits" from `callback_func_t'.
@@ -84,6 +85,7 @@ static c_avl_tree_t *data_sets;
 static char *plugindir = NULL;
 
 static c_heap_t       *read_heap = NULL;
+static llist_t        *read_list;
 static int             read_loop = 1;
 static pthread_mutex_t read_lock = PTHREAD_MUTEX_INITIALIZER;
 static pthread_cond_t  read_cond = PTHREAD_COND_INITIALIZER;
@@ -149,7 +151,7 @@ static void destroy_read_heap (void) /* {{{ */
        {
                callback_func_t *cf;
 
-               cf = c_head_get_root (read_heap);
+               cf = c_heap_get_root (read_heap);
                if (cf == NULL)
                        break;
 
@@ -171,7 +173,7 @@ static int register_callback (llist_t **list, /* {{{ */
                *list = llist_create ();
                if (*list == NULL)
                {
-                       ERROR ("plugin: create_register_callback: "
+                       ERROR ("plugin: register_callback: "
                                        "llist_create failed.");
                        destroy_callback (cf);
                        return (-1);
@@ -181,7 +183,7 @@ static int register_callback (llist_t **list, /* {{{ */
        key = strdup (name);
        if (key == NULL)
        {
-               ERROR ("plugin: create_register_callback: strdup failed.");
+               ERROR ("plugin: register_callback: strdup failed.");
                destroy_callback (cf);
                return (-1);
        }
@@ -192,7 +194,7 @@ static int register_callback (llist_t **list, /* {{{ */
                le = llentry_create (key, cf);
                if (le == NULL)
                {
-                       ERROR ("plugin: create_register_callback: "
+                       ERROR ("plugin: register_callback: "
                                        "llentry_create failed.");
                        free (key);
                        destroy_callback (cf);
@@ -208,6 +210,10 @@ static int register_callback (llist_t **list, /* {{{ */
                old_cf = le->value;
                le->value = cf;
 
+               WARNING ("plugin: register_callback: "
+                               "a callback named `%s' already exists - "
+                               "overwriting the old entry!", name);
+
                destroy_callback (old_cf);
                sfree (key);
        }
@@ -268,7 +274,7 @@ static int plugin_unregister (llist_t *list, const char *name) /* {{{ */
  * object, but it will bitch about a shared object not having a
  * ``module_register'' symbol..
  */
-static int plugin_load_file (char *file)
+static int plugin_load_file (char *file, uint32_t flags)
 {
        lt_dlhandle dlh;
        void (*reg_handle) (void);
@@ -278,7 +284,24 @@ static int plugin_load_file (char *file)
        lt_dlinit ();
        lt_dlerror (); /* clear errors */
 
-       if ((dlh = lt_dlopen (file)) == NULL)
+#if LIBTOOL_VERSION == 2
+       if (flags & PLUGIN_FLAGS_GLOBAL) {
+               lt_dladvise advise;
+               lt_dladvise_init(&advise);
+               lt_dladvise_global(&advise);
+               dlh = lt_dlopenadvise(file, advise);
+               lt_dladvise_destroy(&advise);
+       } else {
+               dlh = lt_dlopen (file);
+       }
+#else /* if LIBTOOL_VERSION == 1 */
+       if (flags & PLUGIN_FLAGS_GLOBAL)
+               ERROR ("plugin_load_file: The global flag is not supported, "
+                               "libtool 2 is required for this.");
+       dlh = lt_dlopen (file);
+#endif
+
+       if (dlh == NULL)
        {
                const char *error = lt_dlerror ();
 
@@ -300,6 +323,13 @@ static int plugin_load_file (char *file)
        return (0);
 }
 
+static _Bool timeout_reached(struct timespec timeout)
+{
+       struct timeval now;
+       gettimeofday(&now, NULL);
+       return (now.tv_sec >= timeout.tv_sec && now.tv_usec >= (timeout.tv_nsec / 1000));
+}
+
 static void *plugin_read_thread (void __attribute__((unused)) *args)
 {
        while (read_loop != 0)
@@ -307,9 +337,11 @@ static void *plugin_read_thread (void __attribute__((unused)) *args)
                read_func_t *rf;
                struct timeval now;
                int status;
+               int rf_type;
+               int rc;
 
                /* Get the read function that needs to be read next. */
-               rf = c_head_get_root (read_heap);
+               rf = c_heap_get_root (read_heap);
                if (rf == NULL)
                {
                        struct timespec abstime;
@@ -342,8 +374,18 @@ static void *plugin_read_thread (void __attribute__((unused)) *args)
                /* sleep until this entry is due,
                 * using pthread_cond_timedwait */
                pthread_mutex_lock (&read_lock);
-               pthread_cond_timedwait (&read_cond, &read_lock,
+               /* In pthread_cond_timedwait, spurious wakeups are possible
+                * (and really happen, at least on NetBSD with > 1 CPU), thus
+                * we need to re-evaluate the condition every time
+                * pthread_cond_timedwait returns. */
+               rc = 0;
+               while (!timeout_reached(rf->rf_next_read) && rc == 0) {
+                       rc = pthread_cond_timedwait (&read_cond, &read_lock,
                                &rf->rf_next_read);
+               }
+
+               /* Must hold `real_lock' when accessing `rf->rf_type'. */
+               rf_type = rf->rf_type;
                pthread_mutex_unlock (&read_lock);
 
                /* Check if we're supposed to stop.. This may have interrupted
@@ -355,9 +397,22 @@ static void *plugin_read_thread (void __attribute__((unused)) *args)
                        break;
                }
 
+               /* The entry has been marked for deletion. The linked list
+                * entry has already been removed by `plugin_unregister_read'.
+                * All we have to do here is free the `read_func_t' and
+                * continue. */
+               if (rf_type == RF_REMOVE)
+               {
+                       DEBUG ("plugin_read_thread: Destroying the `%s' "
+                                       "callback.", rf->rf_name);
+                       destroy_callback ((callback_func_t *) rf);
+                       rf = NULL;
+                       continue;
+               }
+
                DEBUG ("plugin_read_thread: Handling `%s'.", rf->rf_name);
 
-               if (rf->rf_type == RF_SIMPLE)
+               if (rf_type == RF_SIMPLE)
                {
                        int (*callback) (void);
 
@@ -368,6 +423,8 @@ static void *plugin_read_thread (void __attribute__((unused)) *args)
                {
                        plugin_read_cb callback;
 
+                       assert (rf_type == RF_COMPLEX);
+
                        callback = rf->rf_callback;
                        status = (*callback) (&rf->rf_udata);
                }
@@ -515,7 +572,7 @@ void plugin_set_dir (const char *dir)
 }
 
 #define BUFSIZE 512
-int plugin_load (const char *type)
+int plugin_load (const char *type, uint32_t flags)
 {
        DIR  *dh;
        const char *dir;
@@ -577,7 +634,7 @@ int plugin_load (const char *type)
                        continue;
                }
 
-               if (plugin_load_file (filename) == 0)
+               if (plugin_load_file (filename, flags) == 0)
                {
                        /* success */
                        ret = 0;
@@ -641,22 +698,67 @@ static int plugin_compare_read_func (const void *arg0, const void *arg1)
                return (0);
 } /* int plugin_compare_read_func */
 
-int plugin_register_read (const char *name,
-               int (*callback) (void))
+/* Add a read function to both, the heap and a linked list. The linked list if
+ * used to look-up read functions, especially for the remove function. The heap
+ * is used to determine which plugin to read next. */
+static int plugin_insert_read (read_func_t *rf)
 {
-       read_func_t *rf;
+       int status;
+       llentry_t *le;
+
+       pthread_mutex_lock (&read_lock);
+
+       if (read_list == NULL)
+       {
+               read_list = llist_create ();
+               if (read_list == NULL)
+               {
+                       pthread_mutex_unlock (&read_lock);
+                       ERROR ("plugin_insert_read: read_list failed.");
+                       return (-1);
+               }
+       }
 
        if (read_heap == NULL)
        {
                read_heap = c_heap_create (plugin_compare_read_func);
                if (read_heap == NULL)
                {
-                       ERROR ("plugin_register_read: "
-                                       "c_heap_create failed.");
+                       pthread_mutex_unlock (&read_lock);
+                       ERROR ("plugin_insert_read: c_heap_create failed.");
                        return (-1);
                }
        }
 
+       le = llentry_create (rf->rf_name, rf);
+       if (le == NULL)
+       {
+               pthread_mutex_unlock (&read_lock);
+               ERROR ("plugin_insert_read: llentry_create failed.");
+               return (-1);
+       }
+
+       status = c_heap_insert (read_heap, rf);
+       if (status != 0)
+       {
+               pthread_mutex_unlock (&read_lock);
+               ERROR ("plugin_insert_read: c_heap_insert failed.");
+               llentry_destroy (le);
+               return (-1);
+       }
+
+       /* This does not fail. */
+       llist_append (read_list, le);
+
+       pthread_mutex_unlock (&read_lock);
+       return (0);
+} /* int plugin_insert_read */
+
+int plugin_register_read (const char *name,
+               int (*callback) (void))
+{
+       read_func_t *rf;
+
        rf = (read_func_t *) malloc (sizeof (read_func_t));
        if (rf == NULL)
        {
@@ -676,7 +778,7 @@ int plugin_register_read (const char *name,
        rf->rf_interval.tv_nsec = 0;
        rf->rf_effective_interval = rf->rf_interval;
 
-       return (c_heap_insert (read_heap, rf));
+       return (plugin_insert_read (rf));
 } /* int plugin_register_read */
 
 int plugin_register_complex_read (const char *name,
@@ -686,16 +788,6 @@ int plugin_register_complex_read (const char *name,
 {
        read_func_t *rf;
 
-       if (read_heap == NULL)
-       {
-               read_heap = c_heap_create (plugin_compare_read_func);
-               if (read_heap == NULL)
-               {
-                       ERROR ("plugin_register_read: c_heap_create failed.");
-                       return (-1);
-               }
-       }
-
        rf = (read_func_t *) malloc (sizeof (read_func_t));
        if (rf == NULL)
        {
@@ -724,7 +816,7 @@ int plugin_register_complex_read (const char *name,
                rf->rf_udata = *user_data;
        }
 
-       return (c_heap_insert (read_heap, rf));
+       return (plugin_insert_read (rf));
 } /* int plugin_register_complex_read */
 
 int plugin_register_write (const char *name,
@@ -816,12 +908,45 @@ int plugin_unregister_init (const char *name)
        return (plugin_unregister (list_init, name));
 }
 
-int plugin_unregister_read (const char *name)
+int plugin_unregister_read (const char *name) /* {{{ */
 {
-       /* TODO: Implement removal of a specific key from the heap. */
-       assert (0);
-       return (-1);
-}
+       llentry_t *le;
+       read_func_t *rf;
+
+       if (name == NULL)
+               return (-ENOENT);
+
+       pthread_mutex_lock (&read_lock);
+
+       if (read_list == NULL)
+       {
+               pthread_mutex_unlock (&read_lock);
+               return (-ENOENT);
+       }
+
+       le = llist_search (read_list, name);
+       if (le == NULL)
+       {
+               pthread_mutex_unlock (&read_lock);
+               WARNING ("plugin_unregister_read: No such read function: %s",
+                               name);
+               return (-ENOENT);
+       }
+
+       llist_remove (read_list, le);
+
+       rf = le->value;
+       assert (rf != NULL);
+       rf->rf_type = RF_REMOVE;
+
+       pthread_mutex_unlock (&read_lock);
+
+       llentry_destroy (le);
+
+       DEBUG ("plugin_unregister_read: Marked `%s' for removal.", name);
+
+       return (0);
+} /* }}} int plugin_unregister_read */
 
 int plugin_unregister_write (const char *name)
 {
@@ -948,7 +1073,7 @@ int plugin_read_all_once (void)
        {
                read_func_t *rf;
 
-               rf = c_head_get_root (read_heap);
+               rf = c_heap_get_root (read_heap);
                if (rf == NULL)
                        break;
 
@@ -1093,6 +1218,12 @@ void plugin_shutdown_all (void)
        stop_read_threads ();
 
        destroy_all_callbacks (&list_init);
+
+       pthread_mutex_lock (&read_lock);
+       llist_destroy (read_list);
+       read_list = NULL;
+       pthread_mutex_unlock (&read_lock);
+
        destroy_read_heap ();
 
        plugin_flush (/* plugin = */ NULL, /* timeout = */ -1,
@@ -1119,8 +1250,14 @@ void plugin_shutdown_all (void)
                (*callback) ();
        }
 
-       destroy_all_callbacks (&list_write);
+       /* Write plugins which use the `user_data' pointer usually need the
+        * same data available to the flush callback. If this is the case, set
+        * the free_function to NULL when registering the flush callback and to
+        * the real free function when registering the write callback. This way
+        * the data isn't freed twice. */
        destroy_all_callbacks (&list_flush);
+       destroy_all_callbacks (&list_write);
+
        destroy_all_callbacks (&list_notification);
        destroy_all_callbacks (&list_shutdown);
        destroy_all_callbacks (&list_log);
@@ -1136,6 +1273,8 @@ int plugin_dispatch_values (value_list_t *vl)
 
        data_set_t *ds;
 
+       int free_meta_data = 0;
+
        if ((vl == NULL) || (vl->type[0] == 0)
                        || (vl->values == NULL) || (vl->values_len < 1))
        {
@@ -1143,6 +1282,12 @@ int plugin_dispatch_values (value_list_t *vl)
                return (-1);
        }
 
+       /* Free meta data only if the calling function didn't specify any. In
+        * this case matches and targets may add some and the calling function
+        * may not expect (and therefore free) that data. */
+       if (vl->meta == NULL)
+               free_meta_data = 1;
+
        if (list_write == NULL)
                c_complain_once (LOG_WARNING, &no_write_complaint,
                                "plugin_dispatch_values: No write callback has been "
@@ -1283,6 +1428,12 @@ int plugin_dispatch_values (value_list_t *vl)
                vl->values_len = saved_values_len;
        }
 
+       if ((free_meta_data != 0) && (vl->meta != NULL))
+       {
+               meta_data_destroy (vl->meta);
+               vl->meta = NULL;
+       }
+
        return (0);
 } /* int plugin_dispatch_values */
 
@@ -1329,9 +1480,6 @@ void plugin_log (int level, const char *format, ...)
        va_list ap;
        llentry_t *le;
 
-       if (list_log == NULL)
-               return;
-
 #if !COLLECT_DEBUG
        if (level >= LOG_DEBUG)
                return;
@@ -1342,6 +1490,12 @@ void plugin_log (int level, const char *format, ...)
        msg[sizeof (msg) - 1] = '\0';
        va_end (ap);
 
+       if (list_log == NULL)
+       {
+               fprintf (stderr, "%s\n", msg);
+               return;
+       }
+
        le = llist_head (list_log);
        while (le != NULL)
        {