/*
* Private structures
*/
+#define RF_SIMPLE 0
+#define RF_COMPLEX 1
struct read_func_s
{
int wait_time;
int wait_left;
- int (*callback) (void);
+ int type;
+ union
+ {
+ int (*cb_simple) (void);
+ plugin_read_cb cb_complex;
+ } callback;
enum { DONE = 0, TODO = 1, ACTIVE = 2 } needs_read;
+ user_data_t udata;
};
typedef struct read_func_s read_func_t;
-struct dispatch_queue_s;
-typedef struct dispatch_queue_s dispatch_queue_t;
-struct dispatch_queue_s
+struct write_func_s
{
- value_list_t *vl;
- dispatch_queue_t *next;
+ plugin_write_cb callback;
+ user_data_t udata;
};
+typedef struct write_func_s write_func_t;
/*
* Private variables
static char *plugindir = NULL;
-static int read_loop = 0;
+static int read_loop = 1;
static pthread_mutex_t read_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t read_cond = PTHREAD_COND_INITIALIZER;
static pthread_t *read_threads = NULL;
-static size_t read_threads_num = 0;
-
-static int dispatch_loop = 0;
-static pthread_mutex_t dispatch_lock = PTHREAD_MUTEX_INITIALIZER;
-static pthread_cond_t dispatch_cond = PTHREAD_COND_INITIALIZER;
-static pthread_t *dispatch_threads = NULL;
-static size_t dispatch_threads_num = 0;
-static dispatch_queue_t *dispatch_head;
-static dispatch_queue_t *dispatch_tail;
+static int read_threads_num = 0;
/*
* Static functions
return (0);
} /* int register_callback */
+static void plugin_user_data_destroy (user_data_t *ud)
+{
+ if ((ud != NULL) && (ud->data != NULL) && (ud->free_func != NULL))
+ {
+ ud->free_func (ud->data);
+ ud->data = NULL;
+ ud->free_func = NULL;
+ }
+} /* void plugin_user_data_destroy */
+
static int plugin_unregister (llist_t *list, const char *name)
{
llentry_t *e;
(unsigned long int) pthread_self (), le->key);
pthread_mutex_unlock (&read_lock);
- status = rf->callback ();
+ if (rf->type == RF_SIMPLE)
+ {
+ status = rf->callback.cb_simple ();
+ }
+ else
+ {
+ assert (rf->type == RF_COMPLEX);
+ status = rf->callback.cb_complex (&rf->udata);
+ }
done++;
if (status != 0)
return ((void *) 0);
} /* void *plugin_read_thread */
-static void *plugin_dispatch_thread (void *arg)
+static void start_read_threads (int num)
{
- pthread_mutex_lock (&dispatch_lock);
-
- while ((dispatch_loop != 0) || (dispatch_head != NULL))
- {
- dispatch_queue_t *qi;
-
- if (dispatch_head == NULL)
- pthread_cond_wait (&dispatch_cond, &dispatch_lock);
+ int i;
- if (dispatch_head == NULL)
- continue;
+ if (read_threads != NULL)
+ return;
- qi = dispatch_head;
+ read_threads = (pthread_t *) calloc (num, sizeof (pthread_t));
+ if (read_threads == NULL)
+ {
+ ERROR ("plugin: start_read_threads: calloc failed.");
+ return;
+ }
- if (dispatch_head == dispatch_tail)
+ read_threads_num = 0;
+ for (i = 0; i < num; i++)
+ {
+ if (pthread_create (read_threads + read_threads_num, NULL,
+ plugin_read_thread, NULL) == 0)
{
- dispatch_head = NULL;
- dispatch_tail = NULL;
+ read_threads_num++;
}
else
{
- dispatch_head = qi->next;
+ ERROR ("plugin: start_read_threads: pthread_create failed.");
+ return;
}
+ } /* for (i) */
+} /* void start_read_threads */
- pthread_mutex_unlock (&dispatch_lock);
-
- plugin_dispatch_values (qi->vl);
- sfree (qi->vl->values);
- sfree (qi->vl);
- sfree (qi);
-
- pthread_mutex_lock (&dispatch_lock);
- } /* while */
-
- pthread_mutex_unlock (&dispatch_lock);
-
- return ((void *) 0);
-} /* void *plugin_dispatch_thread */
-
-static int plugin_start_threads (size_t num,
- pthread_t **tlist, size_t *tlist_num,
- void *(*thread_main) (void *))
+static void stop_read_threads (void)
{
- size_t i;
- pthread_t *new_tlist;
- size_t new_tlist_num;
+ int i;
- if (*tlist != NULL)
- return (-1);
+ if (read_threads == NULL)
+ return;
- new_tlist = (pthread_t *) calloc (num, sizeof (*new_tlist));
- if (new_tlist == NULL)
- {
- ERROR ("plugin: plugin_start_threads: calloc failed.");
- return (-1);
- }
+ pthread_mutex_lock (&read_lock);
+ read_loop = 0;
+ DEBUG ("plugin: stop_read_threads: Signalling `read_cond'");
+ pthread_cond_broadcast (&read_cond);
+ pthread_mutex_unlock (&read_lock);
- new_tlist_num = 0;
- for (i = 0; i < num; i++)
+ for (i = 0; i < read_threads_num; i++)
{
- int status;
-
- status = pthread_create (new_tlist + new_tlist_num,
- /* attr = */ NULL, thread_main,
- /* arg = */ (void *) 0);
- if (status != 0)
+ if (pthread_join (read_threads[i], NULL) != 0)
{
- ERROR ("plugin: plugin_start_threads: "
- "pthread_create failed.");
- continue;
+ ERROR ("plugin: stop_read_threads: pthread_join failed.");
}
-
- new_tlist_num++;
- } /* for (i) */
-
- if (new_tlist_num < 1)
- {
- ERROR ("plugin: plugin_start_threads: "
- "Creating threads failed.");
- sfree (new_tlist);
- return (-1);
+ read_threads[i] = (pthread_t) 0;
}
+ sfree (read_threads);
+ read_threads_num = 0;
+} /* void stop_read_threads */
- *tlist = new_tlist;
- *tlist_num = new_tlist_num;
-
- return (0);
-} /* int plugin_start_threads */
-
-static int plugin_stop_threads (int *loop,
- pthread_mutex_t *lock, pthread_cond_t *cond,
- pthread_t **ret_tlist, size_t *ret_tlist_len)
+static int remove_read_functions (void)
{
- pthread_t *tlist;
- size_t tlist_len;
- size_t i;
+ llentry_t *this;
- tlist = *ret_tlist;
- tlist_len = *ret_tlist_len;
-
- if (tlist == NULL)
+ if (list_read == NULL)
return (0);
- DEBUG ("plugin_stop_threads: Stopping %zu threads.", tlist_len);
-
- pthread_mutex_lock (lock);
+ this = llist_head (list_read);
+ while (this != NULL)
+ {
+ llentry_t *next;
+ read_func_t *rf;
- *loop = 0;
+ next = this->next;
+ rf = (read_func_t *) this->value;
- pthread_cond_broadcast (cond);
- pthread_mutex_unlock (lock);
+ free (this->key);
- for (i = 0; i < tlist_len; i++)
- {
- int status;
+ plugin_user_data_destroy (&rf->udata);
+ free (rf);
- status = pthread_join (tlist[i], NULL);
- if (status != 0)
- {
- ERROR ("plugin_stop_threads: pthread_join failed.");
- }
+ this = next;
}
- sfree (tlist);
-
- *ret_tlist = NULL;
- *ret_tlist_len = 0;
+ llist_destroy (list_read);
+ list_read = NULL;
return (0);
-} /* int plugin_stop_threads */
+} /* }}} int remove_read_functions */
/*
* Public functions
return (-1);
}
- memset (rf, '\0', sizeof (read_func_t));
+ memset (rf, 0, sizeof (read_func_t));
rf->wait_time = interval_g;
rf->wait_left = 0;
- rf->callback = callback;
+ rf->type = RF_SIMPLE;
+ rf->callback.cb_simple = callback;
rf->needs_read = DONE;
+ rf->udata.data = NULL;
+ rf->udata.free_func = NULL;
return (register_callback (&list_read, name, (void *) rf));
} /* int plugin_register_read */
+int plugin_register_complex_read (const char *name,
+ plugin_read_cb callback, user_data_t *user_data)
+{
+ read_func_t *rf;
+
+ rf = (read_func_t *) malloc (sizeof (read_func_t));
+ if (rf == NULL)
+ {
+ ERROR ("plugin_register_complex_read: malloc failed.");
+ return (-1);
+ }
+
+ memset (rf, 0, sizeof (read_func_t));
+ rf->wait_time = interval_g;
+ rf->wait_left = 0;
+ rf->type = RF_COMPLEX;
+ rf->callback.cb_complex = callback;
+ rf->needs_read = DONE;
+
+ /* Set user data */
+ if (user_data == NULL)
+ {
+ rf->udata.data = NULL;
+ rf->udata.free_func = NULL;
+ }
+ else
+ {
+ rf->udata = *user_data;
+ }
+
+ return (register_callback (&list_read, name, (void *) rf));
+} /* int plugin_register_complex_read */
+
int plugin_register_write (const char *name,
- int (*callback) (const data_set_t *ds, const value_list_t *vl))
+ plugin_write_cb callback, user_data_t *user_data)
{
- return (register_callback (&list_write, name, (void *) callback));
+ write_func_t *wr;
+
+ wr = (write_func_t *) malloc (sizeof (*wr));
+ if (wr == NULL)
+ {
+ ERROR ("plugin_register_write: malloc failed.");
+ return (-1);
+ }
+ memset (wr, 0, sizeof (*wr));
+
+ wr->callback = callback;
+ if (user_data == NULL)
+ {
+ wr->udata.data = NULL;
+ wr->udata.free_func = NULL;
+ }
+ else
+ {
+ wr->udata = *user_data;
+ }
+
+ return (register_callback (&list_write, name, (void *) wr));
} /* int plugin_register_write */
int plugin_register_flush (const char *name,
int plugin_unregister_read (const char *name)
{
llentry_t *e;
+ read_func_t *rf;
e = llist_search (list_read, name);
return (-1);
llist_remove (list_read, e);
- free (e->value);
+
+ rf = (read_func_t *) e->value;
+ plugin_user_data_destroy (&rf->udata);
+ free (rf);
free (e->key);
+
llentry_destroy (e);
return (0);
int plugin_unregister_write (const char *name)
{
- return (plugin_unregister (list_write, name));
+ llentry_t *e;
+ write_func_t *wf;
+
+ e = llist_search (list_write, name);
+
+ if (e == NULL)
+ return (-1);
+
+ llist_remove (list_write, e);
+
+ wf = (write_func_t *) e->value;
+ plugin_user_data_destroy (&wf->udata);
+ free (wf);
+ free (e->key);
+
+ llentry_destroy (e);
+
+ return (0);
}
int plugin_unregister_flush (const char *name)
rt = global_option_get ("ReadThreads");
num = atoi (rt);
if (num != -1)
- {
- read_loop = 1;
- plugin_start_threads ((num > 0) ? ((size_t ) num) : 5,
- &read_threads, &read_threads_num,
- plugin_read_thread);
- }
+ start_read_threads ((num > 0) ? num : 5);
}
-
- dispatch_loop = 1;
- plugin_start_threads (/* num = */ 1, /* FIXME: Make this number configurable */
- &dispatch_threads, &dispatch_threads_num,
- plugin_dispatch_thread);
} /* void plugin_init_all */
void plugin_read_all (void)
le = le->next)
{
rf = (read_func_t *) le->value;
- status = rf->callback ();
+
+ if (rf->type == RF_SIMPLE)
+ {
+ status = rf->callback.cb_simple ();
+ }
+ else
+ {
+ assert (rf->type == RF_COMPLEX);
+ status = rf->callback.cb_complex (&rf->udata);
+ }
+
if (status != 0)
{
NOTICE ("read-function of plugin `%s' failed.",
int plugin_write (const char *plugin, /* {{{ */
const data_set_t *ds, const value_list_t *vl)
{
- int (*callback) (const data_set_t *ds, const value_list_t *vl);
llentry_t *le;
int status;
le = llist_head (list_write);
while (le != NULL)
{
- callback = le->value;
- status = (*callback) (ds, vl);
+ write_func_t *wf = le->value;
+
+ DEBUG ("plugin: plugin_write: Writing values via %s.", le->key);
+ status = wf->callback (ds, vl, &wf->udata);
if (status != 0)
failure++;
else
}
else /* plugin != NULL */
{
+ write_func_t *wf;
le = llist_head (list_write);
while (le != NULL)
{
if (le == NULL)
return (ENOENT);
- callback = le->value;
- status = (*callback) (ds, vl);
+ wf = le->value;
+
+ DEBUG ("plugin: plugin_write: Writing values via %s.", le->key);
+ status = wf->callback (ds, vl, &wf->udata);
}
return (status);
int (*callback) (void);
llentry_t *le;
- plugin_stop_threads (&read_loop, &read_lock, &read_cond,
- &read_threads, &read_threads_num);
-
- plugin_stop_threads (&dispatch_loop, &dispatch_lock, &dispatch_cond,
- &dispatch_threads, &dispatch_threads_num);
+ stop_read_threads ();
+ remove_read_functions ();
if (list_shutdown == NULL)
return;
return (0);
} /* int plugin_dispatch_values */
-int plugin_dispatch_values_async (const value_list_t *vl)
-{
- dispatch_queue_t *qi;
- int i;
-
- if (vl == NULL)
- return (-EINVAL);
-
- if (dispatch_threads_num < 1)
- {
- ERROR ("plugin_dispatch_values_async: "
- "No dispatch threads have been started!");
-#ifdef ENOTCONN
- return (-ENOTCONN);
-#else
- return (-1);
-#endif
- }
-
- qi = (dispatch_queue_t *) malloc (sizeof (*qi));
- if (qi == NULL)
- {
- ERROR ("plugin_dispatch_values_async: malloc failed.");
- return (-ENOMEM);
- }
- memset (qi, 0, sizeof (*qi));
- qi->next = NULL;
-
- qi->vl = (value_list_t *) malloc (sizeof (value_list_t));
- if (qi->vl == NULL)
- {
- ERROR ("plugin_dispatch_values_async: malloc failed.");
- sfree (qi);
- return (-ENOMEM);
- }
- memcpy (qi->vl, vl, sizeof (value_list_t));
- qi->vl->values = NULL;
-
- qi->vl->values = (value_t *) calloc (qi->vl->values_len,
- sizeof (value_t));
- if (qi->vl->values == NULL)
- {
- ERROR ("plugin_dispatch_values_async: malloc failed.");
- sfree (qi->vl);
- sfree (qi);
- return (-ENOMEM);
- }
-
- for (i = 0; i < vl->values_len; i++)
- qi->vl->values[i] = vl->values[i];
-
- pthread_mutex_lock (&dispatch_lock);
-
- if (dispatch_tail == NULL)
- {
- dispatch_head = qi;
- dispatch_tail = qi;
- }
- else
- {
- dispatch_tail->next = qi;
- dispatch_tail = qi;
- }
-
- pthread_cond_signal (&dispatch_cond);
- pthread_mutex_unlock (&dispatch_lock);
-
- return (0);
-} /* int plugin_dispatch_values_async */
-
int plugin_dispatch_notification (const notification_t *notif)
{
int (*callback) (const notification_t *);