Merge branch 'collectd-4.10' into collectd-5.3
[collectd.git] / src / plugin.c
index f4461b5..ed962a7 100644 (file)
@@ -1,6 +1,6 @@
 /**
  * collectd - src/plugin.c
- * Copyright (C) 2005-2009  Florian octo Forster
+ * Copyright (C) 2005-2013  Florian octo Forster
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License as published by the
  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
  *
  * Authors:
- *   Florian octo Forster <octo at verplant.org>
+ *   Florian octo Forster <octo at collectd.org>
  *   Sebastian Harl <sh at tokkee.org>
  **/
 
 #include "collectd.h"
-#include "utils_complain.h"
-
-#include <ltdl.h>
-
-#if HAVE_PTHREAD_H
-# include <pthread.h>
-#endif
-
 #include "common.h"
 #include "plugin.h"
 #include "configfile.h"
+#include "filter_chain.h"
 #include "utils_avltree.h"
+#include "utils_cache.h"
+#include "utils_complain.h"
 #include "utils_llist.h"
 #include "utils_heap.h"
-#include "utils_cache.h"
-#include "utils_threshold.h"
-#include "filter_chain.h"
+#include "utils_time.h"
+
+#if HAVE_PTHREAD_H
+# include <pthread.h>
+#endif
+
+#include <ltdl.h>
 
 /*
  * Private structures
@@ -46,6 +45,7 @@ struct callback_func_s
 {
        void *cf_callback;
        user_data_t cf_udata;
+       plugin_ctx_t cf_ctx;
 };
 typedef struct callback_func_s callback_func_t;
 
@@ -58,22 +58,33 @@ struct read_func_s
         * The `rf_super' member MUST be the first one in this structure! */
 #define rf_callback rf_super.cf_callback
 #define rf_udata rf_super.cf_udata
+#define rf_ctx rf_super.cf_ctx
        callback_func_t rf_super;
        char rf_group[DATA_MAX_NAME_LEN];
        char rf_name[DATA_MAX_NAME_LEN];
        int rf_type;
-       struct timespec rf_interval;
-       struct timespec rf_effective_interval;
-       struct timespec rf_next_read;
+       cdtime_t rf_interval;
+       cdtime_t rf_effective_interval;
+       cdtime_t rf_next_read;
 };
 typedef struct read_func_s read_func_t;
 
+struct write_queue_s;
+typedef struct write_queue_s write_queue_t;
+struct write_queue_s
+{
+       value_list_t *vl;
+       plugin_ctx_t ctx;
+       write_queue_t *next;
+};
+
 /*
  * Private variables
  */
 static llist_t *list_init;
 static llist_t *list_write;
 static llist_t *list_flush;
+static llist_t *list_missing;
 static llist_t *list_shutdown;
 static llist_t *list_log;
 static llist_t *list_notification;
@@ -93,9 +104,22 @@ static pthread_cond_t  read_cond = PTHREAD_COND_INITIALIZER;
 static pthread_t      *read_threads = NULL;
 static int             read_threads_num = 0;
 
+static write_queue_t  *write_queue_head;
+static write_queue_t  *write_queue_tail;
+static _Bool           write_loop = 1;
+static pthread_mutex_t write_lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t  write_cond = PTHREAD_COND_INITIALIZER;
+static pthread_t      *write_threads = NULL;
+static size_t          write_threads_num = 0;
+
+static pthread_key_t   plugin_ctx_key;
+static _Bool           plugin_ctx_key_initialized = 0;
+
 /*
  * Static functions
  */
+static int plugin_dispatch_values_internal (value_list_t *vl);
+
 static const char *plugin_get_dir (void)
 {
        if (plugindir == NULL)
@@ -246,6 +270,8 @@ static int create_register_callback (llist_t **list, /* {{{ */
                cf->cf_udata = *ud;
        }
 
+       cf->cf_ctx = plugin_get_ctx ();
+
        return (register_callback (list, name, cf));
 } /* }}} int create_register_callback */
 
@@ -291,7 +317,7 @@ static int plugin_load_file (char *file, uint32_t flags)
                dlh = lt_dlopenadvise(file, advise);
                lt_dladvise_destroy(&advise);
        } else {
-               dlh = lt_dlopen (file);
+               dlh = lt_dlopen (file);
        }
 #else /* if LIBTOOL_VERSION == 1 */
        if (flags & PLUGIN_FLAGS_GLOBAL)
@@ -334,52 +360,40 @@ static int plugin_load_file (char *file, uint32_t flags)
        return (0);
 }
 
-static _Bool timeout_reached(struct timespec timeout)
-{
-       struct timeval now;
-       gettimeofday(&now, NULL);
-       return (now.tv_sec >= timeout.tv_sec && now.tv_usec >= (timeout.tv_nsec / 1000));
-}
-
 static void *plugin_read_thread (void __attribute__((unused)) *args)
 {
        while (read_loop != 0)
        {
                read_func_t *rf;
-               struct timeval now;
+               plugin_ctx_t old_ctx;
+               cdtime_t now;
                int status;
                int rf_type;
                int rc;
 
-               /* Get the read function that needs to be read next. */
+               /* Get the read function that needs to be read next.
+                * We don't need to hold "read_lock" for the heap, but we need
+                * to call c_heap_get_root() and pthread_cond_wait() in the
+                * same protected block. */
+               pthread_mutex_lock (&read_lock);
                rf = c_heap_get_root (read_heap);
                if (rf == NULL)
                {
-                       struct timespec abstime;
-
-                       gettimeofday (&now, /* timezone = */ NULL);
-
-                       abstime.tv_sec = now.tv_sec + interval_g;
-                       abstime.tv_nsec = 1000 * now.tv_usec;
-
-                       pthread_mutex_lock (&read_lock);
-                       pthread_cond_timedwait (&read_cond, &read_lock,
-                                       &abstime);
-                       pthread_mutex_unlock (&read_lock);
+                       pthread_cond_wait (&read_cond, &read_lock);
+                        pthread_mutex_unlock (&read_lock);
                        continue;
                }
+               pthread_mutex_unlock (&read_lock);
 
-               if ((rf->rf_interval.tv_sec == 0) && (rf->rf_interval.tv_nsec == 0))
+               if (rf->rf_interval == 0)
                {
-                       gettimeofday (&now, /* timezone = */ NULL);
-
-                       rf->rf_interval.tv_sec = interval_g;
-                       rf->rf_interval.tv_nsec = 0;
-
+                       /* this should not happen, because the interval is set
+                        * for each plugin when loading it
+                        * XXX: issue a warning? */
+                       rf->rf_interval = plugin_get_interval ();
                        rf->rf_effective_interval = rf->rf_interval;
 
-                       rf->rf_next_read.tv_sec = now.tv_sec;
-                       rf->rf_next_read.tv_nsec = 1000 * now.tv_usec;
+                       rf->rf_next_read = cdtime ();
                }
 
                /* sleep until this entry is due,
@@ -391,11 +405,15 @@ static void *plugin_read_thread (void __attribute__((unused)) *args)
                 * pthread_cond_timedwait returns. */
                rc = 0;
                while ((read_loop != 0)
-                               && !timeout_reached(rf->rf_next_read)
+                               && (cdtime () < rf->rf_next_read)
                                && rc == 0)
                {
+                       struct timespec ts = { 0 };
+
+                       CDTIME_T_TO_TIMESPEC (rf->rf_next_read, &ts);
+
                        rc = pthread_cond_timedwait (&read_cond, &read_lock,
-                               &rf->rf_next_read);
+                               &ts);
                }
 
                /* Must hold `read_lock' when accessing `rf->rf_type'. */
@@ -426,6 +444,8 @@ static void *plugin_read_thread (void __attribute__((unused)) *args)
 
                DEBUG ("plugin_read_thread: Handling `%s'.", rf->rf_name);
 
+               old_ctx = plugin_set_ctx (rf->rf_ctx);
+
                if (rf_type == RF_SIMPLE)
                {
                        int (*callback) (void);
@@ -443,24 +463,20 @@ static void *plugin_read_thread (void __attribute__((unused)) *args)
                        status = (*callback) (&rf->rf_udata);
                }
 
+               plugin_set_ctx (old_ctx);
+
                /* If the function signals failure, we will increase the
                 * intervals in which it will be called. */
                if (status != 0)
                {
-                       rf->rf_effective_interval.tv_sec *= 2;
-                       rf->rf_effective_interval.tv_nsec *= 2;
-                       NORMALIZE_TIMESPEC (rf->rf_effective_interval);
-
-                       if (rf->rf_effective_interval.tv_sec >= 86400)
-                       {
-                               rf->rf_effective_interval.tv_sec = 86400;
-                               rf->rf_effective_interval.tv_nsec = 0;
-                       }
+                       rf->rf_effective_interval *= 2;
+                       if (rf->rf_effective_interval > TIME_T_TO_CDTIME_T (86400))
+                               rf->rf_effective_interval = TIME_T_TO_CDTIME_T (86400);
 
                        NOTICE ("read-function of plugin `%s' failed. "
-                                       "Will suspend it for %i seconds.",
+                                       "Will suspend it for %.3f seconds.",
                                        rf->rf_name,
-                                       (int) rf->rf_effective_interval.tv_sec);
+                                       CDTIME_T_TO_DOUBLE (rf->rf_effective_interval));
                }
                else
                {
@@ -469,38 +485,29 @@ static void *plugin_read_thread (void __attribute__((unused)) *args)
                }
 
                /* update the ``next read due'' field */
-               gettimeofday (&now, /* timezone = */ NULL);
+               now = cdtime ();
 
                DEBUG ("plugin_read_thread: Effective interval of the "
-                               "%s plugin is %i.%09i.",
+                               "%s plugin is %.3f seconds.",
                                rf->rf_name,
-                               (int) rf->rf_effective_interval.tv_sec,
-                               (int) rf->rf_effective_interval.tv_nsec);
+                               CDTIME_T_TO_DOUBLE (rf->rf_effective_interval));
 
                /* Calculate the next (absolute) time at which this function
                 * should be called. */
-               rf->rf_next_read.tv_sec = rf->rf_next_read.tv_sec
-                       + rf->rf_effective_interval.tv_sec;
-               rf->rf_next_read.tv_nsec = rf->rf_next_read.tv_nsec
-                       + rf->rf_effective_interval.tv_nsec;
-               NORMALIZE_TIMESPEC (rf->rf_next_read);
+               rf->rf_next_read += rf->rf_effective_interval;
 
                /* Check, if `rf_next_read' is in the past. */
-               if ((rf->rf_next_read.tv_sec < now.tv_sec)
-                               || ((rf->rf_next_read.tv_sec == now.tv_sec)
-                                       && (rf->rf_next_read.tv_nsec < (1000 * now.tv_usec))))
+               if (rf->rf_next_read < now)
                {
                        /* `rf_next_read' is in the past. Insert `now'
                         * so this value doesn't trail off into the
                         * past too much. */
-                       rf->rf_next_read.tv_sec = now.tv_sec;
-                       rf->rf_next_read.tv_nsec = 1000 * now.tv_usec;
+                       rf->rf_next_read = now;
                }
 
-               DEBUG ("plugin_read_thread: Next read of the %s plugin at %i.%09i.",
+               DEBUG ("plugin_read_thread: Next read of the %s plugin at %.3f.",
                                rf->rf_name,
-                               (int) rf->rf_next_read.tv_sec,
-                               (int) rf->rf_next_read.tv_nsec);
+                               CDTIME_T_TO_DOUBLE (rf->rf_next_read));
 
                /* Re-insert this read function into the heap again. */
                c_heap_insert (read_heap, rf);
@@ -567,6 +574,244 @@ static void stop_read_threads (void)
        read_threads_num = 0;
 } /* void stop_read_threads */
 
+static void plugin_value_list_free (value_list_t *vl) /* {{{ */
+{
+       if (vl == NULL)
+               return;
+
+       meta_data_destroy (vl->meta);
+       sfree (vl->values);
+       sfree (vl);
+} /* }}} void plugin_value_list_free */
+
+static value_list_t *plugin_value_list_clone (value_list_t const *vl_orig) /* {{{ */
+{
+       value_list_t *vl;
+
+       if (vl_orig == NULL)
+               return (NULL);
+
+       vl = malloc (sizeof (*vl));
+       if (vl == NULL)
+               return (NULL);
+       memcpy (vl, vl_orig, sizeof (*vl));
+
+       vl->values = calloc (vl_orig->values_len, sizeof (*vl->values));
+       if (vl->values == NULL)
+       {
+               plugin_value_list_free (vl);
+               return (NULL);
+       }
+       memcpy (vl->values, vl_orig->values,
+                       vl_orig->values_len * sizeof (*vl->values));
+
+       vl->meta = meta_data_clone (vl->meta);
+       if ((vl_orig->meta != NULL) && (vl->meta == NULL))
+       {
+               plugin_value_list_free (vl);
+               return (NULL);
+       }
+
+       if (vl->time == 0)
+               vl->time = cdtime ();
+
+       /* Fill in the interval from the thread context, if it is zero. */
+       if (vl->interval == 0)
+       {
+               plugin_ctx_t ctx = plugin_get_ctx ();
+
+               if (ctx.interval != 0)
+                       vl->interval = ctx.interval;
+               else
+               {
+                       char name[6 * DATA_MAX_NAME_LEN];
+                       FORMAT_VL (name, sizeof (name), vl);
+                       ERROR ("plugin_value_list_clone: Unable to determine "
+                                       "interval from context for "
+                                       "value list \"%s\". "
+                                       "This indicates a broken plugin. "
+                                       "Please report this problem to the "
+                                       "collectd mailing list or at "
+                                       "<http://collectd.org/bugs/>.", name);
+                       vl->interval = cf_get_default_interval ();
+               }
+       }
+
+       return (vl);
+} /* }}} value_list_t *plugin_value_list_clone */
+
+static int plugin_write_enqueue (value_list_t const *vl) /* {{{ */
+{
+       write_queue_t *q;
+
+       q = malloc (sizeof (*q));
+       if (q == NULL)
+               return (ENOMEM);
+       q->next = NULL;
+
+       q->vl = plugin_value_list_clone (vl);
+       if (q->vl == NULL)
+       {
+               sfree (q);
+               return (ENOMEM);
+       }
+
+       /* Store context of caller (read plugin); otherwise, it would not be
+        * available to the write plugins when actually dispatching the
+        * value-list later on. */
+       q->ctx = plugin_get_ctx ();
+
+       pthread_mutex_lock (&write_lock);
+
+       if (write_queue_tail == NULL)
+       {
+               write_queue_head = q;
+               write_queue_tail = q;
+       }
+       else
+       {
+               write_queue_tail->next = q;
+               write_queue_tail = q;
+       }
+
+       pthread_cond_signal (&write_cond);
+       pthread_mutex_unlock (&write_lock);
+
+       return (0);
+} /* }}} int plugin_write_enqueue */
+
+static value_list_t *plugin_write_dequeue (void) /* {{{ */
+{
+       write_queue_t *q;
+       value_list_t *vl;
+
+       pthread_mutex_lock (&write_lock);
+
+       while (write_loop && (write_queue_head == NULL))
+               pthread_cond_wait (&write_cond, &write_lock);
+
+       if (write_queue_head == NULL)
+       {
+               pthread_mutex_unlock (&write_lock);
+               return (NULL);
+       }
+
+       q = write_queue_head;
+       write_queue_head = q->next;
+       if (write_queue_head == NULL)
+               write_queue_tail = NULL;
+
+       pthread_mutex_unlock (&write_lock);
+
+       (void) plugin_set_ctx (q->ctx);
+
+       vl = q->vl;
+       sfree (q);
+       return (vl);
+} /* }}} value_list_t *plugin_write_dequeue */
+
+static void *plugin_write_thread (void __attribute__((unused)) *args) /* {{{ */
+{
+       while (write_loop)
+       {
+               value_list_t *vl = plugin_write_dequeue ();
+               if (vl == NULL)
+                       continue;
+
+               plugin_dispatch_values_internal (vl);
+
+               plugin_value_list_free (vl);
+       }
+
+       pthread_exit (NULL);
+       return ((void *) 0);
+} /* }}} void *plugin_write_thread */
+
+static void start_write_threads (size_t num) /* {{{ */
+{
+       size_t i;
+
+       if (write_threads != NULL)
+               return;
+
+       write_threads = (pthread_t *) calloc (num, sizeof (pthread_t));
+       if (write_threads == NULL)
+       {
+               ERROR ("plugin: start_write_threads: calloc failed.");
+               return;
+       }
+
+       write_threads_num = 0;
+       for (i = 0; i < num; i++)
+       {
+               int status;
+
+               status = pthread_create (write_threads + write_threads_num,
+                               /* attr = */ NULL,
+                               plugin_write_thread,
+                               /* arg = */ NULL);
+               if (status != 0)
+               {
+                       char errbuf[1024];
+                       ERROR ("plugin: start_write_threads: pthread_create failed "
+                                       "with status %i (%s).", status,
+                                       sstrerror (status, errbuf, sizeof (errbuf)));
+                       return;
+               }
+
+               write_threads_num++;
+       } /* for (i) */
+} /* }}} void start_write_threads */
+
+static void stop_write_threads (void) /* {{{ */
+{
+       write_queue_t *q;
+       int i;
+
+       if (write_threads == NULL)
+               return;
+
+       INFO ("collectd: Stopping %zu write threads.", write_threads_num);
+
+       pthread_mutex_lock (&write_lock);
+       write_loop = 0;
+       DEBUG ("plugin: stop_write_threads: Signalling `write_cond'");
+       pthread_cond_broadcast (&write_cond);
+       pthread_mutex_unlock (&write_lock);
+
+       for (i = 0; i < write_threads_num; i++)
+       {
+               if (pthread_join (write_threads[i], NULL) != 0)
+               {
+                       ERROR ("plugin: stop_write_threads: pthread_join failed.");
+               }
+               write_threads[i] = (pthread_t) 0;
+       }
+       sfree (write_threads);
+       write_threads_num = 0;
+
+       pthread_mutex_lock (&write_lock);
+       i = 0;
+       for (q = write_queue_head; q != NULL; )
+       {
+               write_queue_t *q1 = q;
+               plugin_value_list_free (q->vl);
+               q = q->next;
+               sfree (q1);
+               i++;
+       }
+       write_queue_head = NULL;
+       write_queue_tail = NULL;
+       pthread_mutex_unlock (&write_lock);
+
+       if (i > 0)
+       {
+               WARNING ("plugin: %i value list%s left after shutting down "
+                               "the write threads.",
+                               i, (i == 1) ? " was" : "s were");
+       }
+} /* }}} void stop_write_threads */
+
 /*
  * Public functions
  */
@@ -598,8 +843,6 @@ int plugin_load (const char *type, uint32_t flags)
        struct dirent *de;
        int status;
 
-       DEBUG ("type = %s", type);
-
        dir = plugin_get_dir ();
        ret = 1;
 
@@ -608,7 +851,7 @@ int plugin_load (const char *type, uint32_t flags)
        status = ssnprintf (typename, sizeof (typename), "%s.so", type);
        if ((status < 0) || ((size_t) status >= sizeof (typename)))
        {
-               WARNING ("snprintf: truncated: `%s.so'", type);
+               WARNING ("plugin_load: Filename too long: \"%s.so\"", type);
                return (-1);
        }
        typename_len = strlen (typename);
@@ -616,7 +859,7 @@ int plugin_load (const char *type, uint32_t flags)
        if ((dh = opendir (dir)) == NULL)
        {
                char errbuf[1024];
-               ERROR ("opendir (%s): %s", dir,
+               ERROR ("plugin_load: opendir (%s) failed: %s", dir,
                                sstrerror (errno, errbuf, sizeof (errbuf)));
                return (-1);
        }
@@ -630,25 +873,29 @@ int plugin_load (const char *type, uint32_t flags)
                                "%s/%s", dir, de->d_name);
                if ((status < 0) || ((size_t) status >= sizeof (filename)))
                {
-                       WARNING ("snprintf: truncated: `%s/%s'", dir, de->d_name);
+                       WARNING ("plugin_load: Filename too long: \"%s/%s\"",
+                                       dir, de->d_name);
                        continue;
                }
 
                if (lstat (filename, &statbuf) == -1)
                {
                        char errbuf[1024];
-                       WARNING ("stat %s: %s", filename,
+                       WARNING ("plugin_load: stat (\"%s\") failed: %s",
+                                       filename,
                                        sstrerror (errno, errbuf, sizeof (errbuf)));
                        continue;
                }
                else if (!S_ISREG (statbuf.st_mode))
                {
                        /* don't follow symlinks */
-                       WARNING ("stat %s: not a regular file", filename);
+                       WARNING ("plugin_load: %s is not a regular file.",
+                                       filename);
                        continue;
                }
 
-               if (plugin_load_file (filename, flags) == 0)
+               status = plugin_load_file (filename, flags);
+               if (status == 0)
                {
                        /* success */
                        ret = 0;
@@ -656,14 +903,16 @@ int plugin_load (const char *type, uint32_t flags)
                }
                else
                {
-                       fprintf (stderr, "Unable to load plugin %s.\n", type);
+                       ERROR ("plugin_load: Load plugin \"%s\" failed with "
+                                       "status %i.", type, status);
                }
        }
 
        closedir (dh);
 
-       if (filename[0] == '\0')
-               fprintf (stderr, "Could not find plugin %s.\n", type);
+       if (filename[0] == 0)
+               ERROR ("plugin_load: Could not find plugin \"%s\" in %s",
+                               type, dir);
 
        return (ret);
 }
@@ -700,13 +949,9 @@ static int plugin_compare_read_func (const void *arg0, const void *arg1)
        rf0 = arg0;
        rf1 = arg1;
 
-       if (rf0->rf_next_read.tv_sec < rf1->rf_next_read.tv_sec)
+       if (rf0->rf_next_read < rf1->rf_next_read)
                return (-1);
-       else if (rf0->rf_next_read.tv_sec > rf1->rf_next_read.tv_sec)
-               return (1);
-       else if (rf0->rf_next_read.tv_nsec < rf1->rf_next_read.tv_nsec)
-               return (-1);
-       else if (rf0->rf_next_read.tv_nsec > rf1->rf_next_read.tv_nsec)
+       else if (rf0->rf_next_read > rf1->rf_next_read)
                return (1);
        else
                return (0);
@@ -720,6 +965,9 @@ static int plugin_insert_read (read_func_t *rf)
        int status;
        llentry_t *le;
 
+       rf->rf_next_read = cdtime ();
+       rf->rf_effective_interval = rf->rf_interval;
+
        pthread_mutex_lock (&read_lock);
 
        if (read_list == NULL)
@@ -744,6 +992,17 @@ static int plugin_insert_read (read_func_t *rf)
                }
        }
 
+       le = llist_search (read_list, rf->rf_name);
+       if (le != NULL)
+       {
+               pthread_mutex_unlock (&read_lock);
+               WARNING ("The read function \"%s\" is already registered. "
+                               "Check for duplicate \"LoadPlugin\" lines "
+                               "in your configuration!",
+                               rf->rf_name);
+               return (EINVAL);
+       }
+
        le = llentry_create (rf->rf_name, rf);
        if (le == NULL)
        {
@@ -764,6 +1023,8 @@ static int plugin_insert_read (read_func_t *rf)
        /* This does not fail. */
        llist_append (read_list, le);
 
+       /* Wake up all the read threads. */
+       pthread_cond_broadcast (&read_cond);
        pthread_mutex_unlock (&read_lock);
        return (0);
 } /* int plugin_insert_read */
@@ -772,28 +1033,30 @@ int plugin_register_read (const char *name,
                int (*callback) (void))
 {
        read_func_t *rf;
+       int status;
 
-       rf = (read_func_t *) malloc (sizeof (read_func_t));
+       rf = malloc (sizeof (*rf));
        if (rf == NULL)
        {
-               char errbuf[1024];
-               ERROR ("plugin_register_read: malloc failed: %s",
-                               sstrerror (errno, errbuf, sizeof (errbuf)));
-               return (-1);
+               ERROR ("plugin_register_read: malloc failed.");
+               return (ENOMEM);
        }
 
        memset (rf, 0, sizeof (read_func_t));
        rf->rf_callback = (void *) callback;
        rf->rf_udata.data = NULL;
        rf->rf_udata.free_func = NULL;
+       rf->rf_ctx = plugin_get_ctx ();
        rf->rf_group[0] = '\0';
        sstrncpy (rf->rf_name, name, sizeof (rf->rf_name));
        rf->rf_type = RF_SIMPLE;
-       rf->rf_interval.tv_sec = 0;
-       rf->rf_interval.tv_nsec = 0;
-       rf->rf_effective_interval = rf->rf_interval;
+       rf->rf_interval = plugin_get_interval ();
 
-       return (plugin_insert_read (rf));
+       status = plugin_insert_read (rf);
+       if (status != 0)
+               sfree (rf);
+
+       return (status);
 } /* int plugin_register_read */
 
 int plugin_register_complex_read (const char *group, const char *name,
@@ -802,12 +1065,13 @@ int plugin_register_complex_read (const char *group, const char *name,
                user_data_t *user_data)
 {
        read_func_t *rf;
+       int status;
 
-       rf = (read_func_t *) malloc (sizeof (read_func_t));
+       rf = malloc (sizeof (*rf));
        if (rf == NULL)
        {
                ERROR ("plugin_register_complex_read: malloc failed.");
-               return (-1);
+               return (ENOMEM);
        }
 
        memset (rf, 0, sizeof (read_func_t));
@@ -819,10 +1083,9 @@ int plugin_register_complex_read (const char *group, const char *name,
        sstrncpy (rf->rf_name, name, sizeof (rf->rf_name));
        rf->rf_type = RF_COMPLEX;
        if (interval != NULL)
-       {
-               rf->rf_interval = *interval;
-       }
-       rf->rf_effective_interval = rf->rf_interval;
+               rf->rf_interval = TIMESPEC_TO_CDTIME_T (interval);
+       else
+               rf->rf_interval = plugin_get_interval ();
 
        /* Set user data */
        if (user_data == NULL)
@@ -835,7 +1098,13 @@ int plugin_register_complex_read (const char *group, const char *name,
                rf->rf_udata = *user_data;
        }
 
-       return (plugin_insert_read (rf));
+       rf->rf_ctx = plugin_get_ctx ();
+
+       status = plugin_insert_read (rf);
+       if (status != 0)
+               sfree (rf);
+
+       return (status);
 } /* int plugin_register_complex_read */
 
 int plugin_register_write (const char *name,
@@ -852,6 +1121,13 @@ int plugin_register_flush (const char *name,
                                (void *) callback, ud));
 } /* int plugin_register_flush */
 
+int plugin_register_missing (const char *name,
+               plugin_missing_cb callback, user_data_t *ud)
+{
+       return (create_register_callback (&list_missing, name,
+                               (void *) callback, ud));
+} /* int plugin_register_missing */
+
 int plugin_register_shutdown (const char *name,
                int (*callback) (void))
 {
@@ -859,6 +1135,27 @@ int plugin_register_shutdown (const char *name,
                                (void *) callback, /* user_data = */ NULL));
 } /* int plugin_register_shutdown */
 
+static void plugin_free_data_sets (void)
+{
+       void *key;
+       void *value;
+
+       if (data_sets == NULL)
+               return;
+
+       while (c_avl_pick (data_sets, &key, &value) == 0)
+       {
+               data_set_t *ds = value;
+               /* key is a pointer to ds->type */
+
+               sfree (ds->ds);
+               sfree (ds);
+       }
+
+       c_avl_destroy (data_sets);
+       data_sets = NULL;
+} /* void plugin_free_data_sets */
+
 int plugin_register_data_set (const data_set_t *ds)
 {
        data_set_t *ds_copy;
@@ -1038,6 +1335,11 @@ int plugin_unregister_flush (const char *name)
        return (plugin_unregister (list_flush, name));
 }
 
+int plugin_unregister_missing (const char *name)
+{
+       return (plugin_unregister (list_missing, name));
+}
+
 int plugin_unregister_shutdown (const char *name)
 {
        return (plugin_unregister (list_shutdown, name));
@@ -1084,6 +1386,15 @@ void plugin_init_all (void)
        chain_name = global_option_get ("PostCacheChain");
        post_cache_chain = fc_chain_get_by_name (chain_name);
 
+       {
+               char const *tmp = global_option_get ("WriteThreads");
+               int num = atoi (tmp);
+
+               if (num < 1)
+                       num = 5;
+
+               start_write_threads ((size_t) num);
+       }
 
        if ((list_init == NULL) && (read_heap == NULL))
                return;
@@ -1096,10 +1407,13 @@ void plugin_init_all (void)
        {
                callback_func_t *cf;
                plugin_init_cb callback;
+               plugin_ctx_t old_ctx;
 
                cf = le->value;
+               old_ctx = plugin_set_ctx (cf->cf_ctx);
                callback = cf->cf_callback;
                status = (*callback) ();
+               plugin_set_ctx (old_ctx);
 
                if (status != 0)
                {
@@ -1152,11 +1466,14 @@ int plugin_read_all_once (void)
        while (42)
        {
                read_func_t *rf;
+               plugin_ctx_t old_ctx;
 
                rf = c_heap_get_root (read_heap);
                if (rf == NULL)
                        break;
 
+               old_ctx = plugin_set_ctx (rf->rf_ctx);
+
                if (rf->rf_type == RF_SIMPLE)
                {
                        int (*callback) (void);
@@ -1172,6 +1489,8 @@ int plugin_read_all_once (void)
                        status = (*callback) (&rf->rf_udata);
                }
 
+               plugin_set_ctx (old_ctx);
+
                if (status != 0)
                {
                        NOTICE ("read-function of plugin `%s' failed.",
@@ -1218,6 +1537,9 @@ int plugin_write (const char *plugin, /* {{{ */
       callback_func_t *cf = le->value;
       plugin_write_cb callback;
 
+      /* do not switch plugin context; rather keep the context (interval)
+       * information of the calling read plugin */
+
       DEBUG ("plugin: plugin_write: Writing values via %s.", le->key);
       callback = cf->cf_callback;
       status = (*callback) (ds, vl, &cf->cf_udata);
@@ -1253,6 +1575,9 @@ int plugin_write (const char *plugin, /* {{{ */
 
     cf = le->value;
 
+    /* do not switch plugin context; rather keep the context (interval)
+     * information of the calling read plugin */
+
     DEBUG ("plugin: plugin_write: Writing values via %s.", le->key);
     callback = cf->cf_callback;
     status = (*callback) (ds, vl, &cf->cf_udata);
@@ -1261,7 +1586,7 @@ int plugin_write (const char *plugin, /* {{{ */
   return (status);
 } /* }}} int plugin_write */
 
-int plugin_flush (const char *plugin, int timeout, const char *identifier)
+int plugin_flush (const char *plugin, cdtime_t timeout, const char *identifier)
 {
   llentry_t *le;
 
@@ -1273,6 +1598,7 @@ int plugin_flush (const char *plugin, int timeout, const char *identifier)
   {
     callback_func_t *cf;
     plugin_flush_cb callback;
+    plugin_ctx_t old_ctx;
 
     if ((plugin != NULL)
         && (strcmp (plugin, le->key) != 0))
@@ -1282,10 +1608,13 @@ int plugin_flush (const char *plugin, int timeout, const char *identifier)
     }
 
     cf = le->value;
+    old_ctx = plugin_set_ctx (cf->cf_ctx);
     callback = cf->cf_callback;
 
     (*callback) (timeout, identifier, &cf->cf_udata);
 
+    plugin_set_ctx (old_ctx);
+
     le = le->next;
   }
   return (0);
@@ -1306,7 +1635,8 @@ void plugin_shutdown_all (void)
 
        destroy_read_heap ();
 
-       plugin_flush (/* plugin = */ NULL, /* timeout = */ -1,
+       plugin_flush (/* plugin = */ NULL,
+                       /* timeout = */ 0,
                        /* identifier = */ NULL);
 
        le = NULL;
@@ -1317,8 +1647,10 @@ void plugin_shutdown_all (void)
        {
                callback_func_t *cf;
                plugin_shutdown_cb callback;
+               plugin_ctx_t old_ctx;
 
                cf = le->value;
+               old_ctx = plugin_set_ctx (cf->cf_ctx);
                callback = cf->cf_callback;
 
                /* Advance the pointer before calling the callback allows
@@ -1328,22 +1660,70 @@ void plugin_shutdown_all (void)
                le = le->next;
 
                (*callback) ();
+
+               plugin_set_ctx (old_ctx);
        }
 
+       stop_write_threads ();
+
        /* Write plugins which use the `user_data' pointer usually need the
         * same data available to the flush callback. If this is the case, set
         * the free_function to NULL when registering the flush callback and to
         * the real free function when registering the write callback. This way
         * the data isn't freed twice. */
        destroy_all_callbacks (&list_flush);
+       destroy_all_callbacks (&list_missing);
        destroy_all_callbacks (&list_write);
 
        destroy_all_callbacks (&list_notification);
        destroy_all_callbacks (&list_shutdown);
        destroy_all_callbacks (&list_log);
+
+       plugin_free_data_sets ();
 } /* void plugin_shutdown_all */
 
-int plugin_dispatch_values (value_list_t *vl)
+int plugin_dispatch_missing (const value_list_t *vl) /* {{{ */
+{
+  llentry_t *le;
+
+  if (list_missing == NULL)
+    return (0);
+
+  le = llist_head (list_missing);
+  while (le != NULL)
+  {
+    callback_func_t *cf;
+    plugin_missing_cb callback;
+    plugin_ctx_t old_ctx;
+    int status;
+
+    cf = le->value;
+    old_ctx = plugin_set_ctx (cf->cf_ctx);
+    callback = cf->cf_callback;
+
+    status = (*callback) (vl, &cf->cf_udata);
+    plugin_set_ctx (old_ctx);
+    if (status != 0)
+    {
+      if (status < 0)
+      {
+        ERROR ("plugin_dispatch_missing: Callback function \"%s\" "
+            "failed with status %i.",
+            le->key, status);
+        return (status);
+      }
+      else
+      {
+        return (0);
+      }
+    }
+
+    le = le->next;
+  }
+  return (0);
+} /* int }}} plugin_dispatch_missing */
+
+static int plugin_dispatch_values_internal (value_list_t *vl)
 {
        int status;
        static c_complain_t no_write_complaint = C_COMPLAIN_INIT_STATIC;
@@ -1394,17 +1774,17 @@ int plugin_dispatch_values (value_list_t *vl)
                return (-1);
        }
 
-       if (vl->time == 0)
-               vl->time = time (NULL);
-
-       if (vl->interval <= 0)
-               vl->interval = interval_g;
+       /* Assured by plugin_value_list_clone(). The time is determined at
+        * _enqueue_ time. */
+       assert (vl->time != 0);
+       assert (vl->interval != 0);
 
-       DEBUG ("plugin_dispatch_values: time = %u; interval = %i; "
+       DEBUG ("plugin_dispatch_values: time = %.3f; interval = %.3f; "
                        "host = %s; "
                        "plugin = %s; plugin_instance = %s; "
                        "type = %s; type_instance = %s;",
-                       (unsigned int) vl->time, vl->interval,
+                       CDTIME_T_TO_DOUBLE (vl->time),
+                       CDTIME_T_TO_DOUBLE (vl->interval),
                        vl->host,
                        vl->plugin, vl->plugin_instance,
                        vl->type, vl->type_instance);
@@ -1488,9 +1868,6 @@ int plugin_dispatch_values (value_list_t *vl)
        /* Update the value cache */
        uc_update (ds, vl);
 
-       /* Initiate threshold checking */
-       ut_check_threshold (ds, vl);
-
        if (post_cache_chain != NULL)
        {
                status = fc_process_chain (ds, vl, post_cache_chain);
@@ -1521,53 +1898,24 @@ int plugin_dispatch_values (value_list_t *vl)
        }
 
        return (0);
-} /* int plugin_dispatch_values */
+} /* int plugin_dispatch_values_internal */
 
-int plugin_dispatch_values_secure (const value_list_t *vl)
+int plugin_dispatch_values (value_list_t const *vl)
 {
-  value_list_t vl_copy;
-  int status;
-
-  if (vl == NULL)
-    return EINVAL;
-
-  memcpy (&vl_copy, vl, sizeof (vl_copy));
-
-  /* Write callbacks must not change the values and meta pointers, so we can
-   * savely skip copying those and make this more efficient. */
-  if ((pre_cache_chain == NULL) && (post_cache_chain == NULL))
-    return (plugin_dispatch_values (&vl_copy));
-
-  /* Set pointers to NULL, just to be on the save side. */
-  vl_copy.values = NULL;
-  vl_copy.meta = NULL;
-
-  vl_copy.values = malloc (sizeof (*vl_copy.values) * vl->values_len);
-  if (vl_copy.values == NULL)
-  {
-    ERROR ("plugin_dispatch_values_secure: malloc failed.");
-    return (ENOMEM);
-  }
-  memcpy (vl_copy.values, vl->values, sizeof (*vl_copy.values) * vl->values_len);
-
-  if (vl->meta != NULL)
-  {
-    vl_copy.meta = meta_data_clone (vl->meta);
-    if (vl_copy.meta == NULL)
-    {
-      ERROR ("plugin_dispatch_values_secure: meta_data_clone failed.");
-      free (vl_copy.values);
-      return (ENOMEM);
-    }
-  } /* if (vl->meta) */
-
-  status = plugin_dispatch_values (&vl_copy);
+       int status;
 
-  meta_data_destroy (vl_copy.meta);
-  free (vl_copy.values);
+       status = plugin_write_enqueue (vl);
+       if (status != 0)
+       {
+               char errbuf[1024];
+               ERROR ("plugin_dispatch_values: plugin_write_enqueue failed "
+                               "with status %i (%s).", status,
+                               sstrerror (status, errbuf, sizeof (errbuf)));
+               return (status);
+       }
 
-  return (status);
-} /* int plugin_dispatch_values_secure */
+       return (0);
+}
 
 int plugin_dispatch_notification (const notification_t *notif)
 {
@@ -1575,9 +1923,9 @@ int plugin_dispatch_notification (const notification_t *notif)
        /* Possible TODO: Add flap detection here */
 
        DEBUG ("plugin_dispatch_notification: severity = %i; message = %s; "
-                       "time = %u; host = %s;",
+                       "time = %.3f; host = %s;",
                        notif->severity, notif->message,
-                       (unsigned int) notif->time, notif->host);
+                       CDTIME_T_TO_DOUBLE (notif->time), notif->host);
 
        /* Nobody cares for notifications */
        if (list_notification == NULL)
@@ -1590,6 +1938,9 @@ int plugin_dispatch_notification (const notification_t *notif)
                plugin_notification_cb callback;
                int status;
 
+               /* do not switch plugin context; rather keep the context
+                * (interval) information of the calling plugin */
+
                cf = le->value;
                callback = cf->cf_callback;
                status = (*callback) (notif, &cf->cf_udata);
@@ -1637,16 +1988,63 @@ void plugin_log (int level, const char *format, ...)
                cf = le->value;
                callback = cf->cf_callback;
 
+               /* do not switch plugin context; rather keep the context
+                * (interval) information of the calling plugin */
+
                (*callback) (level, msg, &cf->cf_udata);
 
                le = le->next;
        }
 } /* void plugin_log */
 
+int parse_log_severity (const char *severity)
+{
+       int log_level = -1;
+
+       if ((0 == strcasecmp (severity, "emerg"))
+                       || (0 == strcasecmp (severity, "alert"))
+                       || (0 == strcasecmp (severity, "crit"))
+                       || (0 == strcasecmp (severity, "err")))
+               log_level = LOG_ERR;
+       else if (0 == strcasecmp (severity, "warning"))
+               log_level = LOG_WARNING;
+       else if (0 == strcasecmp (severity, "notice"))
+               log_level = LOG_NOTICE;
+       else if (0 == strcasecmp (severity, "info"))
+               log_level = LOG_INFO;
+#if COLLECT_DEBUG
+       else if (0 == strcasecmp (severity, "debug"))
+               log_level = LOG_DEBUG;
+#endif /* COLLECT_DEBUG */
+
+       return (log_level);
+} /* int parse_log_severity */
+
+int parse_notif_severity (const char *severity)
+{
+       int notif_severity = -1;
+
+       if (strcasecmp (severity, "FAILURE") == 0)
+               notif_severity = NOTIF_FAILURE;
+       else if (strcmp (severity, "OKAY") == 0)
+               notif_severity = NOTIF_OKAY;
+       else if ((strcmp (severity, "WARNING") == 0)
+                       || (strcmp (severity, "WARN") == 0))
+               notif_severity = NOTIF_WARNING;
+
+       return (notif_severity);
+} /* int parse_notif_severity */
+
 const data_set_t *plugin_get_ds (const char *name)
 {
        data_set_t *ds;
 
+       if (data_sets == NULL)
+       {
+               ERROR ("plugin_get_ds: No data sets are defined yet.");
+               return (NULL);
+       }
+
        if (c_avl_get (data_sets, name, (void *) &ds) != 0)
        {
                DEBUG ("No such dataset registered: %s", name);
@@ -1711,7 +2109,7 @@ static int plugin_notification_meta_add (notification_t *n,
     }
     case NM_TYPE_BOOLEAN:
     {
-      meta->nm_value.nm_boolean = *((bool *) value);
+      meta->nm_value.nm_boolean = *((_Bool *) value);
       break;
     }
     default:
@@ -1765,7 +2163,7 @@ int plugin_notification_meta_add_double (notification_t *n,
 
 int plugin_notification_meta_add_boolean (notification_t *n,
     const char *name,
-    bool value)
+    _Bool value)
 {
   return (plugin_notification_meta_add (n, name, NM_TYPE_BOOLEAN, &value));
 }
@@ -1831,4 +2229,122 @@ int plugin_notification_meta_free (notification_meta_t *n)
   return (0);
 } /* int plugin_notification_meta_free */
 
+static void plugin_ctx_destructor (void *ctx)
+{
+       sfree (ctx);
+} /* void plugin_ctx_destructor */
+
+static plugin_ctx_t ctx_init = { /* interval = */ 0 };
+
+static plugin_ctx_t *plugin_ctx_create (void)
+{
+       plugin_ctx_t *ctx;
+
+       ctx = malloc (sizeof (*ctx));
+       if (ctx == NULL) {
+               char errbuf[1024];
+               ERROR ("Failed to allocate plugin context: %s",
+                               sstrerror (errno, errbuf, sizeof (errbuf)));
+               return NULL;
+       }
+
+       *ctx = ctx_init;
+       assert (plugin_ctx_key_initialized);
+       pthread_setspecific (plugin_ctx_key, ctx);
+       DEBUG("Created new plugin context.");
+       return (ctx);
+} /* int plugin_ctx_create */
+
+void plugin_init_ctx (void)
+{
+       pthread_key_create (&plugin_ctx_key, plugin_ctx_destructor);
+       plugin_ctx_key_initialized = 1;
+} /* void plugin_init_ctx */
+
+plugin_ctx_t plugin_get_ctx (void)
+{
+       plugin_ctx_t *ctx;
+
+       assert (plugin_ctx_key_initialized);
+       ctx = pthread_getspecific (plugin_ctx_key);
+
+       if (ctx == NULL) {
+               ctx = plugin_ctx_create ();
+               /* this must no happen -- exit() instead? */
+               if (ctx == NULL)
+                       return ctx_init;
+       }
+
+       return (*ctx);
+} /* plugin_ctx_t plugin_get_ctx */
+
+plugin_ctx_t plugin_set_ctx (plugin_ctx_t ctx)
+{
+       plugin_ctx_t *c;
+       plugin_ctx_t old;
+
+       assert (plugin_ctx_key_initialized);
+       c = pthread_getspecific (plugin_ctx_key);
+
+       if (c == NULL) {
+               c = plugin_ctx_create ();
+               /* this must no happen -- exit() instead? */
+               if (c == NULL)
+                       return ctx_init;
+       }
+
+       old = *c;
+       *c = ctx;
+
+       return (old);
+} /* void plugin_set_ctx */
+
+cdtime_t plugin_get_interval (void)
+{
+       cdtime_t interval;
+
+       interval = plugin_get_ctx().interval;
+       if (interval > 0)
+               return interval;
+
+       return cf_get_default_interval ();
+} /* cdtime_t plugin_get_interval */
+
+typedef struct {
+       plugin_ctx_t ctx;
+       void *(*start_routine) (void *);
+       void *arg;
+} plugin_thread_t;
+
+static void *plugin_thread_start (void *arg)
+{
+       plugin_thread_t *plugin_thread = arg;
+
+       void *(*start_routine) (void *) = plugin_thread->start_routine;
+       void *plugin_arg = plugin_thread->arg;
+
+       plugin_set_ctx (plugin_thread->ctx);
+
+       free (plugin_thread);
+
+       return start_routine (plugin_arg);
+} /* void *plugin_thread_start */
+
+int plugin_thread_create (pthread_t *thread, const pthread_attr_t *attr,
+               void *(*start_routine) (void *), void *arg)
+{
+       plugin_thread_t *plugin_thread;
+
+       plugin_thread = malloc (sizeof (*plugin_thread));
+       if (plugin_thread == NULL)
+               return -1;
+
+       plugin_thread->ctx           = plugin_get_ctx ();
+       plugin_thread->start_routine = start_routine;
+       plugin_thread->arg           = arg;
+
+       return pthread_create (thread, attr,
+                       plugin_thread_start, plugin_thread);
+} /* int plugin_thread_create */
+
 /* vim: set sw=8 ts=8 noet fdm=marker : */