collectd-java(5): Updated the documentation.
[collectd.git] / src / plugin.c
index 6fd74e2..bba8e54 100644 (file)
 /*
  * Private structures
  */
+#define RF_SIMPLE  0
+#define RF_COMPLEX 1
 struct read_func_s
 {
        int wait_time;
        int wait_left;
-       int (*callback) (void);
+       int type;
+       union
+       {
+               int (*cb_simple) (void);
+               plugin_read_cb cb_complex;
+       } callback;
        enum { DONE = 0, TODO = 1, ACTIVE = 2 } needs_read;
+       user_data_t udata;
 };
 typedef struct read_func_s read_func_t;
 
-struct dispatch_queue_s;
-typedef struct dispatch_queue_s dispatch_queue_t;
-struct dispatch_queue_s
+struct write_func_s
 {
-       value_list_t *vl;
-       dispatch_queue_t *next;
+       plugin_write_cb callback;
+       user_data_t udata;
 };
+typedef struct write_func_s write_func_t;
 
 /*
  * Private variables
@@ -76,19 +83,11 @@ static c_avl_tree_t *data_sets;
 
 static char *plugindir = NULL;
 
-static int             read_loop = 0;
+static int             read_loop = 1;
 static pthread_mutex_t read_lock = PTHREAD_MUTEX_INITIALIZER;
 static pthread_cond_t  read_cond = PTHREAD_COND_INITIALIZER;
 static pthread_t      *read_threads = NULL;
-static size_t          read_threads_num = 0;
-
-static int               dispatch_loop = 0;
-static pthread_mutex_t   dispatch_lock = PTHREAD_MUTEX_INITIALIZER;
-static pthread_cond_t    dispatch_cond = PTHREAD_COND_INITIALIZER;
-static pthread_t        *dispatch_threads = NULL;
-static size_t            dispatch_threads_num = 0;
-static dispatch_queue_t *dispatch_head;
-static dispatch_queue_t *dispatch_tail;
+static int             read_threads_num = 0;
 
 /*
  * Static functions
@@ -134,6 +133,16 @@ static int register_callback (llist_t **list, const char *name, void *callback)
        return (0);
 } /* int register_callback */
 
+static void plugin_user_data_destroy (user_data_t *ud)
+{
+       if ((ud != NULL) && (ud->data != NULL) && (ud->free_func != NULL))
+       {
+               ud->free_func (ud->data);
+               ud->data = NULL;
+               ud->free_func = NULL;
+       }
+} /* void plugin_user_data_destroy */
+
 static int plugin_unregister (llist_t *list, const char *name)
 {
        llentry_t *e;
@@ -218,7 +227,15 @@ static void *plugin_read_thread (void __attribute__((unused)) *args)
                                        (unsigned long int) pthread_self (), le->key);
                        pthread_mutex_unlock (&read_lock);
 
-                       status = rf->callback ();
+                       if (rf->type == RF_SIMPLE)
+                       {
+                               status = rf->callback.cb_simple ();
+                       }
+                       else
+                       {
+                               assert (rf->type == RF_COMPLEX);
+                               status = rf->callback.cb_complex (&rf->udata);
+                       }
                        done++;
 
                        if (status != 0)
@@ -260,138 +277,90 @@ static void *plugin_read_thread (void __attribute__((unused)) *args)
        return ((void *) 0);
 } /* void *plugin_read_thread */
 
-static void *plugin_dispatch_thread (void *arg)
+static void start_read_threads (int num)
 {
-       pthread_mutex_lock (&dispatch_lock);
-
-       while ((dispatch_loop != 0) || (dispatch_head != NULL))
-       {
-               dispatch_queue_t *qi;
-
-               if (dispatch_head == NULL)
-                       pthread_cond_wait (&dispatch_cond, &dispatch_lock);
+       int i;
 
-               if (dispatch_head == NULL)
-                       continue;
+       if (read_threads != NULL)
+               return;
 
-               qi = dispatch_head;
+       read_threads = (pthread_t *) calloc (num, sizeof (pthread_t));
+       if (read_threads == NULL)
+       {
+               ERROR ("plugin: start_read_threads: calloc failed.");
+               return;
+       }
 
-               if (dispatch_head == dispatch_tail)
+       read_threads_num = 0;
+       for (i = 0; i < num; i++)
+       {
+               if (pthread_create (read_threads + read_threads_num, NULL,
+                                       plugin_read_thread, NULL) == 0)
                {
-                       dispatch_head = NULL;
-                       dispatch_tail = NULL;
+                       read_threads_num++;
                }
                else
                {
-                       dispatch_head = qi->next;
+                       ERROR ("plugin: start_read_threads: pthread_create failed.");
+                       return;
                }
+       } /* for (i) */
+} /* void start_read_threads */
 
-               pthread_mutex_unlock (&dispatch_lock);
-
-               plugin_dispatch_values (qi->vl);
-               sfree (qi->vl->values);
-               sfree (qi->vl);
-               sfree (qi);
-
-               pthread_mutex_lock (&dispatch_lock);
-       } /* while */
-
-       pthread_mutex_unlock (&dispatch_lock);
-
-       return ((void *) 0);
-} /* void *plugin_dispatch_thread */
-
-static int plugin_start_threads (size_t num,
-               pthread_t **tlist, size_t *tlist_num,
-               void *(*thread_main) (void *))
+static void stop_read_threads (void)
 {
-       size_t i;
-       pthread_t *new_tlist;
-       size_t new_tlist_num;
+       int i;
 
-       if (*tlist != NULL)
-               return (-1);
+       if (read_threads == NULL)
+               return;
 
-       new_tlist = (pthread_t *) calloc (num, sizeof (*new_tlist));
-       if (new_tlist == NULL)
-       {
-               ERROR ("plugin: plugin_start_threads: calloc failed.");
-               return (-1);
-       }
+       pthread_mutex_lock (&read_lock);
+       read_loop = 0;
+       DEBUG ("plugin: stop_read_threads: Signalling `read_cond'");
+       pthread_cond_broadcast (&read_cond);
+       pthread_mutex_unlock (&read_lock);
 
-       new_tlist_num = 0;
-       for (i = 0; i < num; i++)
+       for (i = 0; i < read_threads_num; i++)
        {
-               int status;
-
-               status = pthread_create (new_tlist + new_tlist_num,
-                               /* attr = */ NULL, thread_main,
-                               /* arg = */ (void *) 0);
-               if (status != 0)
+               if (pthread_join (read_threads[i], NULL) != 0)
                {
-                       ERROR ("plugin: plugin_start_threads: "
-                                       "pthread_create failed.");
-                       continue;
+                       ERROR ("plugin: stop_read_threads: pthread_join failed.");
                }
-
-               new_tlist_num++;
-       } /* for (i) */
-
-       if (new_tlist_num < 1)
-       {
-               ERROR ("plugin: plugin_start_threads: "
-                               "Creating threads failed.");
-               sfree (new_tlist);
-               return (-1);
+               read_threads[i] = (pthread_t) 0;
        }
+       sfree (read_threads);
+       read_threads_num = 0;
+} /* void stop_read_threads */
 
-       *tlist     = new_tlist;
-       *tlist_num = new_tlist_num;
-
-       return (0);
-} /* int plugin_start_threads */
-
-static int plugin_stop_threads (int *loop,
-               pthread_mutex_t *lock, pthread_cond_t *cond,
-               pthread_t **ret_tlist, size_t *ret_tlist_len)
+static int remove_read_functions (void)
 {
-       pthread_t *tlist;
-       size_t tlist_len;
-       size_t i;
+       llentry_t *this;
 
-       tlist     = *ret_tlist;
-       tlist_len = *ret_tlist_len;
-
-       if (tlist == NULL)
+       if (list_read == NULL)
                return (0);
 
-       DEBUG ("plugin_stop_threads: Stopping %zu threads.", tlist_len);
-
-       pthread_mutex_lock (lock);
+       this = llist_head (list_read);
+       while (this != NULL)
+       {
+               llentry_t *next;
+               read_func_t *rf;
 
-       *loop = 0;
+               next = this->next;
+               rf = (read_func_t *) this->value;
 
-       pthread_cond_broadcast (cond);
-       pthread_mutex_unlock (lock);
+               free (this->key);
 
-       for (i = 0; i < tlist_len; i++)
-       {
-               int status;
+               plugin_user_data_destroy (&rf->udata);
+               free (rf);
 
-               status = pthread_join (tlist[i], NULL);
-               if (status != 0)
-               {
-                       ERROR ("plugin_stop_threads: pthread_join failed.");
-               }
+               this = next;
        }
 
-       sfree (tlist);
-
-       *ret_tlist = NULL;
-       *ret_tlist_len = 0;
+       llist_destroy (list_read);
+       list_read = NULL;
 
        return (0);
-} /* int plugin_stop_threads */
+} /* }}} int remove_read_functions */
 
 /*
  * Public functions
@@ -530,19 +499,76 @@ int plugin_register_read (const char *name,
                return (-1);
        }
 
-       memset (rf, '\0', sizeof (read_func_t));
+       memset (rf, 0, sizeof (read_func_t));
        rf->wait_time = interval_g;
        rf->wait_left = 0;
-       rf->callback = callback;
+       rf->type = RF_SIMPLE;
+       rf->callback.cb_simple = callback;
        rf->needs_read = DONE;
+       rf->udata.data = NULL;
+       rf->udata.free_func = NULL;
 
        return (register_callback (&list_read, name, (void *) rf));
 } /* int plugin_register_read */
 
+int plugin_register_complex_read (const char *name,
+               plugin_read_cb callback, user_data_t *user_data)
+{
+       read_func_t *rf;
+
+       rf = (read_func_t *) malloc (sizeof (read_func_t));
+       if (rf == NULL)
+       {
+               ERROR ("plugin_register_complex_read: malloc failed.");
+               return (-1);
+       }
+
+       memset (rf, 0, sizeof (read_func_t));
+       rf->wait_time = interval_g;
+       rf->wait_left = 0;
+       rf->type = RF_COMPLEX;
+       rf->callback.cb_complex = callback;
+       rf->needs_read = DONE;
+
+       /* Set user data */
+       if (user_data == NULL)
+       {
+               rf->udata.data = NULL;
+               rf->udata.free_func = NULL;
+       }
+       else
+       {
+               rf->udata = *user_data;
+       }
+
+       return (register_callback (&list_read, name, (void *) rf));
+} /* int plugin_register_complex_read */
+
 int plugin_register_write (const char *name,
-               int (*callback) (const data_set_t *ds, const value_list_t *vl))
+               plugin_write_cb callback, user_data_t *user_data)
 {
-       return (register_callback (&list_write, name, (void *) callback));
+       write_func_t *wr;
+
+       wr = (write_func_t *) malloc (sizeof (*wr));
+       if (wr == NULL)
+       {
+               ERROR ("plugin_register_write: malloc failed.");
+               return (-1);
+       }
+       memset (wr, 0, sizeof (*wr));
+
+       wr->callback = callback;
+       if (user_data == NULL)
+       {
+               wr->udata.data = NULL;
+               wr->udata.free_func = NULL;
+       }
+       else
+       {
+               wr->udata = *user_data;
+       }
+
+       return (register_callback (&list_write, name, (void *) wr));
 } /* int plugin_register_write */
 
 int plugin_register_flush (const char *name,
@@ -626,6 +652,7 @@ int plugin_unregister_init (const char *name)
 int plugin_unregister_read (const char *name)
 {
        llentry_t *e;
+       read_func_t *rf;
 
        e = llist_search (list_read, name);
 
@@ -633,8 +660,12 @@ int plugin_unregister_read (const char *name)
                return (-1);
 
        llist_remove (list_read, e);
-       free (e->value);
+
+       rf = (read_func_t *) e->value;
+       plugin_user_data_destroy (&rf->udata);
+       free (rf);
        free (e->key);
+
        llentry_destroy (e);
 
        return (0);
@@ -642,7 +673,24 @@ int plugin_unregister_read (const char *name)
 
 int plugin_unregister_write (const char *name)
 {
-       return (plugin_unregister (list_write, name));
+       llentry_t *e;
+       write_func_t *wf;
+
+       e = llist_search (list_write, name);
+
+       if (e == NULL)
+               return (-1);
+
+       llist_remove (list_write, e);
+
+       wf = (write_func_t *) e->value;
+       plugin_user_data_destroy (&wf->udata);
+       free (wf);
+       free (e->key);
+
+       llentry_destroy (e);
+
+       return (0);
 }
 
 int plugin_unregister_flush (const char *name)
@@ -734,18 +782,8 @@ void plugin_init_all (void)
                rt = global_option_get ("ReadThreads");
                num = atoi (rt);
                if (num != -1)
-               {
-                       read_loop = 1;
-                       plugin_start_threads ((num > 0) ? ((size_t ) num) : 5,
-                                       &read_threads, &read_threads_num,
-                                       plugin_read_thread);
-               }
+                       start_read_threads ((num > 0) ? num : 5);
        }
-
-       dispatch_loop = 1;
-       plugin_start_threads (/* num = */ 1, /* FIXME: Make this number configurable */
-                       &dispatch_threads, &dispatch_threads_num,
-                       plugin_dispatch_thread);
 } /* void plugin_init_all */
 
 void plugin_read_all (void)
@@ -806,7 +844,17 @@ int plugin_read_all_once (void)
             le = le->next)
        {
                rf = (read_func_t *) le->value;
-               status = rf->callback ();
+
+               if (rf->type == RF_SIMPLE)
+               {
+                       status = rf->callback.cb_simple ();
+               }
+               else
+               {
+                       assert (rf->type == RF_COMPLEX);
+                       status = rf->callback.cb_complex (&rf->udata);
+               }
+
                if (status != 0)
                {
                        NOTICE ("read-function of plugin `%s' failed.",
@@ -821,7 +869,6 @@ int plugin_read_all_once (void)
 int plugin_write (const char *plugin, /* {{{ */
                const data_set_t *ds, const value_list_t *vl)
 {
-  int (*callback) (const data_set_t *ds, const value_list_t *vl);
   llentry_t *le;
   int status;
 
@@ -849,8 +896,10 @@ int plugin_write (const char *plugin, /* {{{ */
     le = llist_head (list_write);
     while (le != NULL)
     {
-      callback = le->value;
-      status = (*callback) (ds, vl);
+      write_func_t *wf = le->value;
+
+      DEBUG ("plugin: plugin_write: Writing values via %s.", le->key);
+      status = wf->callback (ds, vl, &wf->udata);
       if (status != 0)
         failure++;
       else
@@ -866,6 +915,7 @@ int plugin_write (const char *plugin, /* {{{ */
   }
   else /* plugin != NULL */
   {
+    write_func_t *wf;
     le = llist_head (list_write);
     while (le != NULL)
     {
@@ -878,8 +928,10 @@ int plugin_write (const char *plugin, /* {{{ */
     if (le == NULL)
       return (ENOENT);
 
-    callback = le->value;
-    status = (*callback) (ds, vl);
+    wf = le->value;
+
+    DEBUG ("plugin: plugin_write: Writing values via %s.", le->key);
+    status = wf->callback (ds, vl, &wf->udata);
   }
 
   return (status);
@@ -916,11 +968,8 @@ void plugin_shutdown_all (void)
        int (*callback) (void);
        llentry_t *le;
 
-       plugin_stop_threads (&read_loop, &read_lock, &read_cond,
-                       &read_threads, &read_threads_num);
-
-       plugin_stop_threads (&dispatch_loop, &dispatch_lock, &dispatch_cond,
-                       &dispatch_threads, &dispatch_threads_num);
+       stop_read_threads ();
+       remove_read_functions ();
 
        if (list_shutdown == NULL)
                return;
@@ -1045,76 +1094,6 @@ int plugin_dispatch_values (value_list_t *vl)
        return (0);
 } /* int plugin_dispatch_values */
 
-int plugin_dispatch_values_async (const value_list_t *vl)
-{
-       dispatch_queue_t *qi;
-       int i;
-
-       if (vl == NULL)
-               return (-EINVAL);
-
-       if (dispatch_threads_num < 1)
-       {
-               ERROR ("plugin_dispatch_values_async: "
-                               "No dispatch threads have been started!");
-#ifdef ENOTCONN
-               return (-ENOTCONN);
-#else
-               return (-1);
-#endif
-       }
-
-       qi = (dispatch_queue_t *) malloc (sizeof (*qi));
-       if (qi == NULL)
-       {
-               ERROR ("plugin_dispatch_values_async: malloc failed.");
-               return (-ENOMEM);
-       }
-       memset (qi, 0, sizeof (*qi));
-       qi->next = NULL;
-
-       qi->vl = (value_list_t *) malloc (sizeof (value_list_t));
-       if (qi->vl == NULL)
-       {
-               ERROR ("plugin_dispatch_values_async: malloc failed.");
-               sfree (qi);
-               return (-ENOMEM);
-       }
-       memcpy (qi->vl, vl, sizeof (value_list_t));
-       qi->vl->values = NULL;
-
-       qi->vl->values = (value_t *) calloc (qi->vl->values_len,
-                       sizeof (value_t));
-       if (qi->vl->values == NULL)
-       {
-               ERROR ("plugin_dispatch_values_async: malloc failed.");
-               sfree (qi->vl);
-               sfree (qi);
-               return (-ENOMEM);
-       }
-
-       for (i = 0; i < vl->values_len; i++)
-               qi->vl->values[i] = vl->values[i];
-
-       pthread_mutex_lock (&dispatch_lock);
-
-       if (dispatch_tail == NULL)
-       {
-               dispatch_head = qi;
-               dispatch_tail = qi;
-       }
-       else
-       {
-               dispatch_tail->next = qi;
-               dispatch_tail = qi;
-       }
-
-       pthread_cond_signal (&dispatch_cond);
-       pthread_mutex_unlock (&dispatch_lock);
-
-       return (0);
-} /* int plugin_dispatch_values_async */
-
 int plugin_dispatch_notification (const notification_t *notif)
 {
        int (*callback) (const notification_t *);