Merge branch 'master' into ff/java
[collectd.git] / src / plugin.c
index 367c0d1..d185ef2 100644 (file)
 /*
  * Private structures
  */
+struct callback_func_s
+{
+       void *cf_callback;
+       user_data_t cf_udata;
+};
+typedef struct callback_func_s callback_func_t;
+
+#define RF_SIMPLE  0
+#define RF_COMPLEX 1
 struct read_func_s
 {
-       int wait_time;
-       int wait_left;
-       int (*callback) (void);
-       enum { DONE = 0, TODO = 1, ACTIVE = 2 } needs_read;
+       /* `read_func_t' "inherits" from `callback_func_t'.
+        * The `rf_super' member MUST be the first one in this structure! */
+#define rf_callback rf_super.cf_callback
+#define rf_udata rf_super.cf_udata
+       callback_func_t rf_super;
+       int rf_type;
+       int rf_wait_time;
+       int rf_wait_left;
+       enum { DONE = 0, TODO = 1, ACTIVE = 2 } rf_needs_read;
 };
 typedef struct read_func_s read_func_t;
 
@@ -85,26 +99,81 @@ static const char *plugin_get_dir (void)
                return (plugindir);
 }
 
-static int register_callback (llist_t **list, const char *name, void *callback)
+static void destroy_callback (callback_func_t *cf) /* {{{ */
+{
+       if (cf == NULL)
+               return;
+
+       if ((cf->cf_udata.data != NULL) && (cf->cf_udata.free_func != NULL))
+       {
+               cf->cf_udata.free_func (cf->cf_udata.data);
+               cf->cf_udata.data = NULL;
+               cf->cf_udata.free_func = NULL;
+       }
+       sfree (cf);
+} /* }}} void destroy_callback */
+
+static void destroy_all_callbacks (llist_t **list) /* {{{ */
+{
+       llentry_t *le;
+
+       if (*list == NULL)
+               return;
+
+       le = llist_head (*list);
+       while (le != NULL)
+       {
+               llentry_t *le_next;
+
+               le_next = le->next;
+
+               sfree (le->key);
+               destroy_callback (le->value);
+               le->value = NULL;
+
+               le = le_next;
+       }
+
+       llist_destroy (*list);
+       *list = NULL;
+} /* }}} void destroy_all_callbacks */
+
+static int register_callback (llist_t **list, /* {{{ */
+               const char *name, callback_func_t *cf)
 {
        llentry_t *le;
        char *key;
 
-       if ((*list == NULL)
-                       && ((*list = llist_create ()) == NULL))
+       if (*list == NULL)
+       {
+               *list = llist_create ();
+               if (*list == NULL)
+               {
+                       ERROR ("plugin: create_register_callback: "
+                                       "llist_create failed.");
+                       destroy_callback (cf);
+                       return (-1);
+               }
+       }
+
+       key = strdup (name);
+       if (key == NULL)
+       {
+               ERROR ("plugin: create_register_callback: strdup failed.");
+               destroy_callback (cf);
                return (-1);
+       }
 
        le = llist_search (*list, name);
        if (le == NULL)
        {
-               key = strdup (name);
-               if (key == NULL)
-                       return (-1);
-
-               le = llentry_create (key, callback);
+               le = llentry_create (key, cf);
                if (le == NULL)
                {
+                       ERROR ("plugin: create_register_callback: "
+                                       "llentry_create failed.");
                        free (key);
+                       destroy_callback (cf);
                        return (-1);
                }
 
@@ -112,27 +181,67 @@ static int register_callback (llist_t **list, const char *name, void *callback)
        }
        else
        {
-               le->value = callback;
+               callback_func_t *old_cf;
+
+               old_cf = le->value;
+               le->value = cf;
+
+               destroy_callback (old_cf);
+               sfree (key);
        }
 
        return (0);
-} /* int register_callback */
+} /* }}} int register_callback */
 
-static int plugin_unregister (llist_t *list, const char *name)
+static int create_register_callback (llist_t **list, /* {{{ */
+               const char *name, void *callback, user_data_t *ud)
+{
+       char *key;
+       callback_func_t *cf;
+
+       cf = (callback_func_t *) malloc (sizeof (*cf));
+       if (cf == NULL)
+       {
+               ERROR ("plugin: create_register_callback: malloc failed.");
+               sfree (key);
+               return (-1);
+       }
+       memset (cf, 0, sizeof (*cf));
+
+       cf->cf_callback = callback;
+       if (ud == NULL)
+       {
+               cf->cf_udata.data = NULL;
+               cf->cf_udata.free_func = NULL;
+       }
+       else
+       {
+               cf->cf_udata = *ud;
+       }
+
+       return (register_callback (list, name, cf));
+} /* }}} int create_register_callback */
+
+static int plugin_unregister (llist_t *list, const char *name) /* {{{ */
 {
        llentry_t *e;
 
-       e = llist_search (list, name);
+       if (list == NULL)
+               return (-1);
 
+       e = llist_search (list, name);
        if (e == NULL)
                return (-1);
 
        llist_remove (list, e);
-       free (e->key);
+
+       sfree (e->key);
+       destroy_callback (e->value);
+
        llentry_destroy (e);
 
        return (0);
-} /* int plugin_unregister */
+} /* }}} int plugin_unregister */
 
 /*
  * (Try to) load the shared object `file'. Won't complain if it isn't a shared
@@ -189,44 +298,58 @@ static void *plugin_read_thread (void __attribute__((unused)) *args)
                {
                        rf = (read_func_t *) le->value;
 
-                       if (rf->needs_read != TODO)
+                       if (rf->rf_needs_read != TODO)
                        {
                                le = le->next;
                                continue;
                        }
 
                        /* We will do this read function */
-                       rf->needs_read = ACTIVE;
+                       rf->rf_needs_read = ACTIVE;
 
                        DEBUG ("[thread #%5lu] plugin: plugin_read_thread: Handling %s",
                                        (unsigned long int) pthread_self (), le->key);
                        pthread_mutex_unlock (&read_lock);
 
-                       status = rf->callback ();
+                       if (rf->rf_type == RF_SIMPLE)
+                       {
+                               int (*callback) (void);
+
+                               callback = rf->rf_callback;
+                               status = (*callback) ();
+                       }
+                       else
+                       {
+                               plugin_read_cb callback;
+
+                               callback = rf->rf_callback;
+                               status = (*callback) (&rf->rf_udata);
+                       }
+
                        done++;
 
                        if (status != 0)
                        {
-                               if (rf->wait_time < interval_g)
-                                       rf->wait_time = interval_g;
-                               rf->wait_left = rf->wait_time;
-                               rf->wait_time = rf->wait_time * 2;
-                               if (rf->wait_time > 86400)
-                                       rf->wait_time = 86400;
+                               if (rf->rf_wait_time < interval_g)
+                                       rf->rf_wait_time = interval_g;
+                               rf->rf_wait_left = rf->rf_wait_time;
+                               rf->rf_wait_time = rf->rf_wait_time * 2;
+                               if (rf->rf_wait_time > 86400)
+                                       rf->rf_wait_time = 86400;
 
                                NOTICE ("read-function of plugin `%s' "
                                                "failed. Will suspend it for %i "
-                                               "seconds.", le->key, rf->wait_left);
+                                               "seconds.", le->key, rf->rf_wait_left);
                        }
                        else
                        {
-                               rf->wait_left = 0;
-                               rf->wait_time = interval_g;
+                               rf->rf_wait_left = 0;
+                               rf->rf_wait_time = interval_g;
                        }
 
                        pthread_mutex_lock (&read_lock);
 
-                       rf->needs_read = DONE;
+                       rf->rf_needs_read = DONE;
                        le = le->next;
                } /* while (le != NULL) */
 
@@ -244,7 +367,7 @@ static void *plugin_read_thread (void __attribute__((unused)) *args)
        return ((void *) 0);
 } /* void *plugin_read_thread */
 
-static void start_threads (int num)
+static void start_read_threads (int num)
 {
        int i;
 
@@ -254,7 +377,7 @@ static void start_threads (int num)
        read_threads = (pthread_t *) calloc (num, sizeof (pthread_t));
        if (read_threads == NULL)
        {
-               ERROR ("plugin: start_threads: calloc failed.");
+               ERROR ("plugin: start_read_threads: calloc failed.");
                return;
        }
 
@@ -268,13 +391,13 @@ static void start_threads (int num)
                }
                else
                {
-                       ERROR ("plugin: start_threads: pthread_create failed.");
+                       ERROR ("plugin: start_read_threads: pthread_create failed.");
                        return;
                }
        } /* for (i) */
-} /* void start_threads */
+} /* void start_read_threads */
 
-static void stop_threads (void)
+static void stop_read_threads (void)
 {
        int i;
 
@@ -283,7 +406,7 @@ static void stop_threads (void)
 
        pthread_mutex_lock (&read_lock);
        read_loop = 0;
-       DEBUG ("plugin: stop_threads: Signalling `read_cond'");
+       DEBUG ("plugin: stop_read_threads: Signalling `read_cond'");
        pthread_cond_broadcast (&read_cond);
        pthread_mutex_unlock (&read_lock);
 
@@ -291,13 +414,13 @@ static void stop_threads (void)
        {
                if (pthread_join (read_threads[i], NULL) != 0)
                {
-                       ERROR ("plugin: stop_threads: pthread_join failed.");
+                       ERROR ("plugin: stop_read_threads: pthread_join failed.");
                }
                read_threads[i] = (pthread_t) 0;
        }
        sfree (read_threads);
        read_threads_num = 0;
-} /* void stop_threads */
+} /* void stop_read_threads */
 
 /*
  * Public functions
@@ -419,7 +542,8 @@ int plugin_register_complex_config (const char *type,
 int plugin_register_init (const char *name,
                int (*callback) (void))
 {
-       return (register_callback (&list_init, name, (void *) callback));
+       return (create_register_callback (&list_init, name, (void *) callback,
+                               /* user_data = */ NULL));
 } /* plugin_register_init */
 
 int plugin_register_read (const char *name,
@@ -436,31 +560,70 @@ int plugin_register_read (const char *name,
                return (-1);
        }
 
-       memset (rf, '\0', sizeof (read_func_t));
-       rf->wait_time = interval_g;
-       rf->wait_left = 0;
-       rf->callback = callback;
-       rf->needs_read = DONE;
+       memset (rf, 0, sizeof (read_func_t));
+       rf->rf_callback = (void *) callback;
+       rf->rf_udata.data = NULL;
+       rf->rf_udata.free_func = NULL;
+       rf->rf_wait_time = interval_g;
+       rf->rf_wait_left = 0;
+       rf->rf_type = RF_SIMPLE;
+       rf->rf_needs_read = DONE;
 
-       return (register_callback (&list_read, name, (void *) rf));
+       return (register_callback (&list_read, name, (callback_func_t *) rf));
 } /* int plugin_register_read */
 
+int plugin_register_complex_read (const char *name,
+               plugin_read_cb callback, user_data_t *user_data)
+{
+       read_func_t *rf;
+
+       rf = (read_func_t *) malloc (sizeof (read_func_t));
+       if (rf == NULL)
+       {
+               ERROR ("plugin_register_complex_read: malloc failed.");
+               return (-1);
+       }
+
+       memset (rf, 0, sizeof (read_func_t));
+       rf->rf_callback = (void *) callback;
+       rf->rf_wait_time = interval_g;
+       rf->rf_wait_left = 0;
+       rf->rf_type = RF_COMPLEX;
+       rf->rf_needs_read = DONE;
+
+       /* Set user data */
+       if (user_data == NULL)
+       {
+               rf->rf_udata.data = NULL;
+               rf->rf_udata.free_func = NULL;
+       }
+       else
+       {
+               rf->rf_udata = *user_data;
+       }
+
+       return (register_callback (&list_read, name, (callback_func_t *) rf));
+} /* int plugin_register_complex_read */
+
 int plugin_register_write (const char *name,
-               int (*callback) (const data_set_t *ds, const value_list_t *vl))
+               plugin_write_cb callback, user_data_t *ud)
 {
-       return (register_callback (&list_write, name, (void *) callback));
+       return (create_register_callback (&list_write, name,
+                               (void *) callback, ud));
 } /* int plugin_register_write */
 
 int plugin_register_flush (const char *name,
-               int (*callback) (const int timeout, const char *identifier))
+               plugin_flush_cb callback, user_data_t *ud)
 {
-       return (register_callback (&list_flush, name, (void *) callback));
+       return (create_register_callback (&list_flush, name,
+                               (void *) callback, ud));
 } /* int plugin_register_flush */
 
 int plugin_register_shutdown (char *name,
                int (*callback) (void))
 {
-       return (register_callback (&list_shutdown, name, (void *) callback));
+       return (create_register_callback (&list_shutdown, name,
+                               (void *) callback, /* user_data = */ NULL));
 } /* int plugin_register_shutdown */
 
 int plugin_register_data_set (const data_set_t *ds)
@@ -500,16 +663,18 @@ int plugin_register_data_set (const data_set_t *ds)
        return (c_avl_insert (data_sets, (void *) ds_copy->type, (void *) ds_copy));
 } /* int plugin_register_data_set */
 
-int plugin_register_log (char *name,
-               void (*callback) (int priority, const char *msg))
+int plugin_register_log (const char *name,
+               plugin_log_cb callback, user_data_t *ud)
 {
-       return (register_callback (&list_log, name, (void *) callback));
+       return (create_register_callback (&list_log, name,
+                               (void *) callback, ud));
 } /* int plugin_register_log */
 
 int plugin_register_notification (const char *name,
-               int (*callback) (const notification_t *notif))
+               plugin_notification_cb callback, user_data_t *ud)
 {
-       return (register_callback (&list_notification, name, (void *) callback));
+       return (create_register_callback (&list_notification, name,
+                               (void *) callback, ud));
 } /* int plugin_register_log */
 
 int plugin_unregister_config (const char *name)
@@ -531,19 +696,7 @@ int plugin_unregister_init (const char *name)
 
 int plugin_unregister_read (const char *name)
 {
-       llentry_t *e;
-
-       e = llist_search (list_read, name);
-
-       if (e == NULL)
-               return (-1);
-
-       llist_remove (list_read, e);
-       free (e->value);
-       free (e->key);
-       llentry_destroy (e);
-
-       return (0);
+       return (plugin_unregister (list_read, name));
 }
 
 int plugin_unregister_write (const char *name)
@@ -590,7 +743,6 @@ int plugin_unregister_notification (const char *name)
 void plugin_init_all (void)
 {
        const char *chain_name;
-       int (*callback) (void);
        llentry_t *le;
        int status;
 
@@ -613,7 +765,11 @@ void plugin_init_all (void)
        le = llist_head (list_init);
        while (le != NULL)
        {
-               callback = (int (*) (void)) le->value;
+               callback_func_t *cf;
+               plugin_init_cb callback;
+
+               cf = le->value;
+               callback = cf->cf_callback;
                status = (*callback) ();
 
                if (status != 0)
@@ -640,7 +796,7 @@ void plugin_init_all (void)
                rt = global_option_get ("ReadThreads");
                num = atoi (rt);
                if (num != -1)
-                       start_threads ((num > 0) ? num : 5);
+                       start_read_threads ((num > 0) ? num : 5);
        }
 } /* void plugin_init_all */
 
@@ -661,18 +817,18 @@ void plugin_read_all (void)
        {
                rf = (read_func_t *) le->value;
 
-               if (rf->needs_read != DONE)
+               if (rf->rf_needs_read != DONE)
                {
                        le = le->next;
                        continue;
                }
 
-               if (rf->wait_left > 0)
-                       rf->wait_left -= interval_g;
+               if (rf->rf_wait_left > 0)
+                       rf->rf_wait_left -= interval_g;
 
-               if (rf->wait_left <= 0)
+               if (rf->rf_wait_left <= 0)
                {
-                       rf->needs_read = TODO;
+                       rf->rf_needs_read = TODO;
                }
 
                le = le->next;
@@ -702,7 +858,22 @@ int plugin_read_all_once (void)
             le = le->next)
        {
                rf = (read_func_t *) le->value;
-               status = rf->callback ();
+
+               if (rf->rf_type == RF_SIMPLE)
+               {
+                       int (*callback) (void);
+
+                       callback = rf->rf_callback;
+                       status = (*callback) ();
+               }
+               else
+               {
+                       plugin_read_cb callback;
+
+                       callback = rf->rf_callback;
+                       status = (*callback) (&rf->rf_udata);
+               }
+
                if (status != 0)
                {
                        NOTICE ("read-function of plugin `%s' failed.",
@@ -717,7 +888,6 @@ int plugin_read_all_once (void)
 int plugin_write (const char *plugin, /* {{{ */
                const data_set_t *ds, const value_list_t *vl)
 {
-  int (*callback) (const data_set_t *ds, const value_list_t *vl);
   llentry_t *le;
   int status;
 
@@ -745,8 +915,12 @@ int plugin_write (const char *plugin, /* {{{ */
     le = llist_head (list_write);
     while (le != NULL)
     {
-      callback = le->value;
-      status = (*callback) (ds, vl);
+      callback_func_t *cf = le->value;
+      plugin_write_cb callback;
+
+      DEBUG ("plugin: plugin_write: Writing values via %s.", le->key);
+      callback = cf->cf_callback;
+      status = (*callback) (ds, vl, &cf->cf_udata);
       if (status != 0)
         failure++;
       else
@@ -762,6 +936,9 @@ int plugin_write (const char *plugin, /* {{{ */
   }
   else /* plugin != NULL */
   {
+    callback_func_t *cf;
+    plugin_write_cb callback;
+
     le = llist_head (list_write);
     while (le != NULL)
     {
@@ -774,8 +951,11 @@ int plugin_write (const char *plugin, /* {{{ */
     if (le == NULL)
       return (ENOENT);
 
-    callback = le->value;
-    status = (*callback) (ds, vl);
+    cf = le->value;
+
+    DEBUG ("plugin: plugin_write: Writing values via %s.", le->key);
+    callback = cf->cf_callback;
+    status = (*callback) (ds, vl, &cf->cf_udata);
   }
 
   return (status);
@@ -783,7 +963,6 @@ int plugin_write (const char *plugin, /* {{{ */
 
 int plugin_flush (const char *plugin, int timeout, const char *identifier)
 {
-  int (*callback) (int timeout, const char *identifier);
   llentry_t *le;
 
   if (list_flush == NULL)
@@ -792,6 +971,9 @@ int plugin_flush (const char *plugin, int timeout, const char *identifier)
   le = llist_head (list_flush);
   while (le != NULL)
   {
+    callback_func_t *cf;
+    plugin_flush_cb callback;
+
     if ((plugin != NULL)
         && (strcmp (plugin, le->key) != 0))
     {
@@ -799,8 +981,10 @@ int plugin_flush (const char *plugin, int timeout, const char *identifier)
       continue;
     }
 
-    callback = (int (*) (int, const char *)) le->value;
-    (*callback) (timeout, identifier);
+    cf = le->value;
+    callback = cf->cf_callback;
+
+    (*callback) (timeout, identifier, &cf->cf_udata);
 
     le = le->next;
   }
@@ -809,18 +993,27 @@ int plugin_flush (const char *plugin, int timeout, const char *identifier)
 
 void plugin_shutdown_all (void)
 {
-       int (*callback) (void);
        llentry_t *le;
 
-       stop_threads ();
+       stop_read_threads ();
 
-       if (list_shutdown == NULL)
-               return;
+       destroy_all_callbacks (&list_init);
+       destroy_all_callbacks (&list_read);
+
+       plugin_flush (/* plugin = */ NULL, /* timeout = */ -1,
+                       /* identifier = */ NULL);
+
+       le = NULL;
+       if (list_shutdown != NULL)
+               le = llist_head (list_shutdown);
 
-       le = llist_head (list_shutdown);
        while (le != NULL)
        {
-               callback = (int (*) (void)) le->value;
+               callback_func_t *cf;
+               plugin_shutdown_cb callback;
+
+               cf = le->value;
+               callback = cf->cf_callback;
 
                /* Advance the pointer before calling the callback allows
                 * shutdown functions to unregister themselves. If done the
@@ -830,6 +1023,12 @@ void plugin_shutdown_all (void)
 
                (*callback) ();
        }
+
+       destroy_all_callbacks (&list_write);
+       destroy_all_callbacks (&list_flush);
+       destroy_all_callbacks (&list_notification);
+       destroy_all_callbacks (&list_shutdown);
+       destroy_all_callbacks (&list_log);
 } /* void plugin_shutdown_all */
 
 int plugin_dispatch_values (value_list_t *vl)
@@ -988,7 +1187,6 @@ int plugin_dispatch_values (value_list_t *vl)
 
 int plugin_dispatch_notification (const notification_t *notif)
 {
-       int (*callback) (const notification_t *);
        llentry_t *le;
        /* Possible TODO: Add flap detection here */
 
@@ -1004,8 +1202,19 @@ int plugin_dispatch_notification (const notification_t *notif)
        le = llist_head (list_notification);
        while (le != NULL)
        {
-               callback = (int (*) (const notification_t *)) le->value;
-               (*callback) (notif);
+               callback_func_t *cf;
+               plugin_notification_cb callback;
+               int status;
+
+               cf = le->value;
+               callback = cf->cf_callback;
+               status = (*callback) (notif, &cf->cf_udata);
+               if (status != 0)
+               {
+                       WARNING ("plugin_dispatch_notification: Notification "
+                                       "callback %s returned %i.",
+                                       le->key, status);
+               }
 
                le = le->next;
        }
@@ -1017,8 +1226,6 @@ void plugin_log (int level, const char *format, ...)
 {
        char msg[1024];
        va_list ap;
-
-       void (*callback) (int, const char *);
        llentry_t *le;
 
        if (list_log == NULL)
@@ -1037,8 +1244,13 @@ void plugin_log (int level, const char *format, ...)
        le = llist_head (list_log);
        while (le != NULL)
        {
-               callback = (void (*) (int, const char *)) le->value;
-               (*callback) (level, msg);
+               callback_func_t *cf;
+               plugin_log_cb callback;
+
+               cf = le->value;
+               callback = cf->cf_callback;
+
+               (*callback) (level, msg, &cf->cf_udata);
 
                le = le->next;
        }
@@ -1231,3 +1443,5 @@ int plugin_notification_meta_free (notification_meta_t *n)
 
   return (0);
 } /* int plugin_notification_meta_free */
+
+/* vim: set sw=8 ts=8 noet fdm=marker : */