rrdtool plugin: Make sure "cache_timeout + random_variation" dosn't get negative.
[collectd.git] / src / rrdtool.c
index 986c8b4..b366a9c 100644 (file)
@@ -1,6 +1,8 @@
 /**
  * collectd - src/rrdtool.c
  * Copyright (C) 2006-2008  Florian octo Forster
+ * Copyright (C) 2008-2008  Sebastian Harl
+ * Copyright (C) 2009       Mariusz Gronczewski
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License as published by the
  *
  * Authors:
  *   Florian octo Forster <octo at verplant.org>
+ *   Sebastian Harl <sh at tokkee.org>
+ *   Mariusz Gronczewski <xani666 at gmail.com>
  **/
 
 #include "collectd.h"
 #include "plugin.h"
 #include "common.h"
 #include "utils_avltree.h"
+#include "utils_rrdcreate.h"
 
 #include <rrd.h>
 
  */
 struct rrd_cache_s
 {
-       int    values_num;
-       char **values;
-       time_t first_value;
-       time_t last_value;
+       int      values_num;
+       char   **values;
+       cdtime_t first_value;
+       cdtime_t last_value;
+       int64_t  random_variation;
        enum
        {
                FLAG_NONE   = 0x00,
@@ -65,27 +71,6 @@ typedef struct rrd_queue_s rrd_queue_t;
 /*
  * Private variables
  */
-static int rra_timespans[] =
-{
-       3600,
-       86400,
-       604800,
-       2678400,
-       31622400
-};
-static int rra_timespans_num = STATIC_ARRAY_SIZE (rra_timespans);
-
-static int *rra_timespans_custom = NULL;
-static int rra_timespans_custom_num = 0;
-
-static char *rra_types[] =
-{
-       "AVERAGE",
-       "MIN",
-       "MAX"
-};
-static int rra_types_num = STATIC_ARRAY_SIZE (rra_types);
-
 static const char *config_keys[] =
 {
        "CacheTimeout",
@@ -96,25 +81,36 @@ static const char *config_keys[] =
        "RRARows",
        "RRATimespan",
        "XFF",
-       "WritesPerSecond"
+       "WritesPerSecond",
+       "RandomTimeout"
 };
 static int config_keys_num = STATIC_ARRAY_SIZE (config_keys);
 
 /* If datadir is zero, the daemon's basedir is used. If stepsize or heartbeat
  * is zero a default, depending on the `interval' member of the value list is
  * being used. */
-static char   *datadir   = NULL;
-static int     stepsize  = 0;
-static int     heartbeat = 0;
-static int     rrarows   = 1200;
-static double  xff       = 0.1;
-static double  write_rate = 0.0;
+static char *datadir   = NULL;
+static double write_rate = 0.0;
+static rrdcreate_config_t rrdcreate_config =
+{
+       /* stepsize = */ 0,
+       /* heartbeat = */ 0,
+       /* rrarows = */ 1200,
+       /* xff = */ 0.1,
+
+       /* timespans = */ NULL,
+       /* timespans_num = */ 0,
+
+       /* consolidation_functions = */ NULL,
+       /* consolidation_functions_num = */ 0
+};
 
 /* XXX: If you need to lock both, cache_lock and queue_lock, at the same time,
  * ALWAYS lock `cache_lock' first! */
-static int         cache_timeout = 0;
-static int         cache_flush_timeout = 0;
-static time_t      cache_flush_last;
+static cdtime_t    cache_timeout = 0;
+static cdtime_t    cache_flush_timeout = 0;
+static cdtime_t    random_timeout = TIME_T_TO_CDTIME_T (1);
+static cdtime_t    cache_flush_last;
 static c_avl_tree_t *cache = NULL;
 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
 
@@ -122,7 +118,8 @@ static rrd_queue_t    *queue_head = NULL;
 static rrd_queue_t    *queue_tail = NULL;
 static rrd_queue_t    *flushq_head = NULL;
 static rrd_queue_t    *flushq_tail = NULL;
-static pthread_t       queue_thread = 0;
+static pthread_t       queue_thread;
+static int             queue_thread_running = 1;
 static pthread_mutex_t queue_lock = PTHREAD_MUTEX_INITIALIZER;
 static pthread_cond_t  queue_cond = PTHREAD_COND_INITIALIZER;
 
@@ -132,233 +129,7 @@ static pthread_mutex_t librrd_lock = PTHREAD_MUTEX_INITIALIZER;
 
 static int do_shutdown = 0;
 
-/* * * * * * * * * *
- * WARNING:  Magic *
- * * * * * * * * * */
-
-static void rra_free (int rra_num, char **rra_def)
-{
-       int i;
-
-       for (i = 0; i < rra_num; i++)
-       {
-               sfree (rra_def[i]);
-       }
-       sfree (rra_def);
-} /* void rra_free */
-
-static int rra_get (char ***ret, const value_list_t *vl)
-{
-       char **rra_def;
-       int rra_num;
-
-       int *rts;
-       int  rts_num;
-
-       int rra_max;
-
-       int span;
-
-       int cdp_num;
-       int cdp_len;
-       int i, j;
-
-       char buffer[64];
-
-       /* The stepsize we use here: If it is user-set, use it. If not, use the
-        * interval of the value-list. */
-       int ss;
-
-       if (rrarows <= 0)
-       {
-               *ret = NULL;
-               return (-1);
-       }
-
-       ss = (stepsize > 0) ? stepsize : vl->interval;
-       if (ss <= 0)
-       {
-               *ret = NULL;
-               return (-1);
-       }
-
-       /* Use the configured timespans or fall back to the built-in defaults */
-       if (rra_timespans_custom_num != 0)
-       {
-               rts = rra_timespans_custom;
-               rts_num = rra_timespans_custom_num;
-       }
-       else
-       {
-               rts = rra_timespans;
-               rts_num = rra_timespans_num;
-       }
-
-       rra_max = rts_num * rra_types_num;
-
-       if ((rra_def = (char **) malloc ((rra_max + 1) * sizeof (char *))) == NULL)
-               return (-1);
-       memset (rra_def, '\0', (rra_max + 1) * sizeof (char *));
-       rra_num = 0;
-
-       cdp_len = 0;
-       for (i = 0; i < rts_num; i++)
-       {
-               span = rts[i];
-
-               if ((span / ss) < rrarows)
-                       span = ss * rrarows;
-
-               if (cdp_len == 0)
-                       cdp_len = 1;
-               else
-                       cdp_len = (int) floor (((double) span)
-                                       / ((double) (rrarows * ss)));
-
-               cdp_num = (int) ceil (((double) span)
-                               / ((double) (cdp_len * ss)));
-
-               for (j = 0; j < rra_types_num; j++)
-               {
-                       if (rra_num >= rra_max)
-                               break;
-
-                       if (ssnprintf (buffer, sizeof (buffer), "RRA:%s:%3.1f:%u:%u",
-                                               rra_types[j], xff,
-                                               cdp_len, cdp_num) >= sizeof (buffer))
-                       {
-                               ERROR ("rra_get: Buffer would have been truncated.");
-                               continue;
-                       }
-
-                       rra_def[rra_num++] = sstrdup (buffer);
-               }
-       }
-
-#if COLLECT_DEBUG
-       DEBUG ("rra_num = %i", rra_num);
-       for (i = 0; i < rra_num; i++)
-               DEBUG ("  %s", rra_def[i]);
-#endif
-
-       *ret = rra_def;
-       return (rra_num);
-} /* int rra_get */
-
-static void ds_free (int ds_num, char **ds_def)
-{
-       int i;
-
-       for (i = 0; i < ds_num; i++)
-               if (ds_def[i] != NULL)
-                       free (ds_def[i]);
-       free (ds_def);
-}
-
-static int ds_get (char ***ret, const data_set_t *ds, const value_list_t *vl)
-{
-       char **ds_def;
-       int ds_num;
-
-       char min[32];
-       char max[32];
-       char buffer[128];
-
-       DEBUG ("ds->ds_num = %i", ds->ds_num);
-
-       ds_def = (char **) malloc (ds->ds_num * sizeof (char *));
-       if (ds_def == NULL)
-       {
-               char errbuf[1024];
-               ERROR ("rrdtool plugin: malloc failed: %s",
-                               sstrerror (errno, errbuf, sizeof (errbuf)));
-               return (-1);
-       }
-       memset (ds_def, '\0', ds->ds_num * sizeof (char *));
-
-       for (ds_num = 0; ds_num < ds->ds_num; ds_num++)
-       {
-               data_source_t *d = ds->ds + ds_num;
-               char *type;
-               int status;
-
-               ds_def[ds_num] = NULL;
-
-               if (d->type == DS_TYPE_COUNTER)
-                       type = "COUNTER";
-               else if (d->type == DS_TYPE_GAUGE)
-                       type = "GAUGE";
-               else
-               {
-                       ERROR ("rrdtool plugin: Unknown DS type: %i",
-                                       d->type);
-                       break;
-               }
-
-               if (isnan (d->min))
-               {
-                       sstrncpy (min, "U", sizeof (min));
-               }
-               else
-                       ssnprintf (min, sizeof (min), "%lf", d->min);
-
-               if (isnan (d->max))
-               {
-                       sstrncpy (max, "U", sizeof (max));
-               }
-               else
-                       ssnprintf (max, sizeof (max), "%lf", d->max);
-
-               status = ssnprintf (buffer, sizeof (buffer),
-                               "DS:%s:%s:%i:%s:%s",
-                               d->name, type,
-                               (heartbeat > 0) ? heartbeat : (2 * vl->interval),
-                               min, max);
-               if ((status < 1) || (status >= sizeof (buffer)))
-                       break;
-
-               ds_def[ds_num] = sstrdup (buffer);
-       } /* for ds_num = 0 .. ds->ds_num */
-
-#if COLLECT_DEBUG
-{
-       int i;
-       DEBUG ("ds_num = %i", ds_num);
-       for (i = 0; i < ds_num; i++)
-               DEBUG ("  %s", ds_def[i]);
-}
-#endif
-
-       if (ds_num != ds->ds_num)
-       {
-               ds_free (ds_num, ds_def);
-               return (-1);
-       }
-
-       *ret = ds_def;
-       return (ds_num);
-}
-
 #if HAVE_THREADSAFE_LIBRRD
-static int srrd_create (char *filename, unsigned long pdp_step, time_t last_up,
-               int argc, const char **argv)
-{
-       int status;
-
-       optind = 0; /* bug in librrd? */
-       rrd_clear_error ();
-
-       status = rrd_create_r (filename, pdp_step, last_up, argc, (void *) argv);
-
-       if (status != 0)
-       {
-               WARNING ("rrdtool plugin: rrd_create_r (%s) failed: %s",
-                               filename, rrd_get_error ());
-       }
-
-       return (status);
-} /* int srrd_create */
-
 static int srrd_update (char *filename, char *template,
                int argc, const char **argv)
 {
@@ -380,59 +151,6 @@ static int srrd_update (char *filename, char *template,
 /* #endif HAVE_THREADSAFE_LIBRRD */
 
 #else /* !HAVE_THREADSAFE_LIBRRD */
-static int srrd_create (char *filename, unsigned long pdp_step, time_t last_up,
-               int argc, const char **argv)
-{
-       int status;
-
-       int new_argc;
-       char **new_argv;
-
-       char pdp_step_str[16];
-       char last_up_str[16];
-
-       new_argc = 6 + argc;
-       new_argv = (char **) malloc ((new_argc + 1) * sizeof (char *));
-       if (new_argv == NULL)
-       {
-               ERROR ("rrdtool plugin: malloc failed.");
-               return (-1);
-       }
-
-       if (last_up == 0)
-               last_up = time (NULL) - 10;
-
-       ssnprintf (pdp_step_str, sizeof (pdp_step_str), "%lu", pdp_step);
-       ssnprintf (last_up_str, sizeof (last_up_str), "%u", (unsigned int) last_up);
-
-       new_argv[0] = "create";
-       new_argv[1] = filename;
-       new_argv[2] = "-s";
-       new_argv[3] = pdp_step_str;
-       new_argv[4] = "-b";
-       new_argv[5] = last_up_str;
-
-       memcpy (new_argv + 6, argv, argc * sizeof (char *));
-       new_argv[new_argc] = NULL;
-       
-       pthread_mutex_lock (&librrd_lock);
-       optind = 0; /* bug in librrd? */
-       rrd_clear_error ();
-
-       status = rrd_create (new_argc, new_argv);
-       pthread_mutex_unlock (&librrd_lock);
-
-       if (status != 0)
-       {
-               WARNING ("rrdtool plugin: rrd_create (%s) failed: %s",
-                               filename, rrd_get_error ());
-       }
-
-       sfree (new_argv);
-
-       return (status);
-} /* int srrd_create */
-
 static int srrd_update (char *filename, char *template,
                int argc, const char **argv)
 {
@@ -476,68 +194,18 @@ static int srrd_update (char *filename, char *template,
 } /* int srrd_update */
 #endif /* !HAVE_THREADSAFE_LIBRRD */
 
-static int rrd_create_file (char *filename, const data_set_t *ds, const value_list_t *vl)
-{
-       char **argv;
-       int argc;
-       char **rra_def;
-       int rra_num;
-       char **ds_def;
-       int ds_num;
-       int status = 0;
-
-       if (check_create_dir (filename))
-               return (-1);
-
-       if ((rra_num = rra_get (&rra_def, vl)) < 1)
-       {
-               ERROR ("rrd_create_file failed: Could not calculate RRAs");
-               return (-1);
-       }
-
-       if ((ds_num = ds_get (&ds_def, ds, vl)) < 1)
-       {
-               ERROR ("rrd_create_file failed: Could not calculate DSes");
-               return (-1);
-       }
-
-       argc = ds_num + rra_num;
-
-       if ((argv = (char **) malloc (sizeof (char *) * (argc + 1))) == NULL)
-       {
-               char errbuf[1024];
-               ERROR ("rrd_create failed: %s",
-                               sstrerror (errno, errbuf, sizeof (errbuf)));
-               return (-1);
-       }
-
-       memcpy (argv, ds_def, ds_num * sizeof (char *));
-       memcpy (argv + ds_num, rra_def, rra_num * sizeof (char *));
-       argv[ds_num + rra_num] = NULL;
-
-       assert (vl->time > 10);
-       status = srrd_create (filename,
-                       (stepsize > 0) ? stepsize : vl->interval,
-                       vl->time - 10,
-                       argc, (const char **)argv);
-
-       free (argv);
-       ds_free (ds_num, ds_def);
-       rra_free (rra_num, rra_def);
-
-       return (status);
-}
-
 static int value_list_to_string (char *buffer, int buffer_len,
                const data_set_t *ds, const value_list_t *vl)
 {
        int offset;
        int status;
+       time_t tt;
        int i;
 
        memset (buffer, '\0', buffer_len);
 
-       status = ssnprintf (buffer, buffer_len, "%u", (unsigned int) vl->time);
+       tt = CDTIME_T_TO_TIME_T (vl->time);
+       status = ssnprintf (buffer, buffer_len, "%u", (unsigned int) tt);
        if ((status < 1) || (status >= buffer_len))
                return (-1);
        offset = status;
@@ -545,15 +213,23 @@ static int value_list_to_string (char *buffer, int buffer_len,
        for (i = 0; i < ds->ds_num; i++)
        {
                if ((ds->ds[i].type != DS_TYPE_COUNTER)
-                               && (ds->ds[i].type != DS_TYPE_GAUGE))
+                               && (ds->ds[i].type != DS_TYPE_GAUGE)
+                               && (ds->ds[i].type != DS_TYPE_DERIVE)
+                               && (ds->ds[i].type != DS_TYPE_ABSOLUTE))
                        return (-1);
 
                if (ds->ds[i].type == DS_TYPE_COUNTER)
                        status = ssnprintf (buffer + offset, buffer_len - offset,
                                        ":%llu", vl->values[i].counter);
-               else
+               else if (ds->ds[i].type == DS_TYPE_GAUGE)
                        status = ssnprintf (buffer + offset, buffer_len - offset,
                                        ":%lf", vl->values[i].gauge);
+               else if (ds->ds[i].type == DS_TYPE_DERIVE)
+                       status = ssnprintf (buffer + offset, buffer_len - offset,
+                                       ":%"PRIi64, vl->values[i].derive);
+               else /*if (ds->ds[i].type == DS_TYPE_ABSOLUTE) */
+                       status = ssnprintf (buffer + offset, buffer_len - offset,
+                                       ":%"PRIu64, vl->values[i].absolute);
 
                if ((status < 1) || (status >= (buffer_len - offset)))
                        return (-1);
@@ -565,7 +241,7 @@ static int value_list_to_string (char *buffer, int buffer_len,
 } /* int value_list_to_string */
 
 static int value_list_to_filename (char *buffer, int buffer_len,
-               const data_set_t *ds, const value_list_t *vl)
+               const data_set_t __attribute__((unused)) *ds, const value_list_t *vl)
 {
        int offset = 0;
        int status;
@@ -608,7 +284,7 @@ static int value_list_to_filename (char *buffer, int buffer_len,
        return (0);
 } /* int value_list_to_filename */
 
-static void *rrd_queue_thread (void *data)
+static void *rrd_queue_thread (void __attribute__((unused)) *data)
 {
         struct timeval tv_next_update;
         struct timeval tv_now;
@@ -629,7 +305,7 @@ static void *rrd_queue_thread (void *data)
 
                 pthread_mutex_lock (&queue_lock);
                 /* Wait for values to arrive */
-                while (true)
+                while (42)
                 {
                   struct timespec ts_wait;
 
@@ -653,10 +329,9 @@ static void *rrd_queue_thread (void *data)
                     break;
 
                   gettimeofday (&tv_now, /* timezone = */ NULL);
-                  status = timeval_sub_timespec (&tv_next_update, &tv_now,
-                      &ts_wait);
+                  status = timeval_cmp (tv_next_update, tv_now, NULL);
                   /* We're good to go */
-                  if (status != 0)
+                  if (status <= 0)
                     break;
 
                   /* We're supposed to wait a bit with this update, so we'll
@@ -669,7 +344,7 @@ static void *rrd_queue_thread (void *data)
                       &ts_wait);
                   if (status == ETIMEDOUT)
                     break;
-                } /* while (true) */
+                } /* while (42) */
 
                 /* XXX: If you need to lock both, cache_lock and queue_lock, at
                  * the same time, ALWAYS lock `cache_lock' first! */
@@ -746,8 +421,9 @@ static void *rrd_queue_thread (void *data)
                /* Write the values to the RRD-file */
                srrd_update (queue_entry->filename, NULL,
                                values_num, (const char **)values);
-               DEBUG ("rrdtool plugin: queue thread: Wrote %i values to %s",
-                               values_num, queue_entry->filename);
+               DEBUG ("rrdtool plugin: queue thread: Wrote %i value%s to %s",
+                               values_num, (values_num == 1) ? "" : "s",
+                               queue_entry->filename);
 
                for (i = 0; i < values_num; i++)
                {
@@ -758,11 +434,6 @@ static void *rrd_queue_thread (void *data)
                sfree (queue_entry);
        } /* while (42) */
 
-       pthread_mutex_lock (&cache_lock);
-       c_avl_destroy (cache);
-       cache = NULL;
-       pthread_mutex_unlock (&cache_lock);
-
        pthread_exit ((void *) 0);
        return ((void *) 0);
 } /* void *rrd_queue_thread */
@@ -841,10 +512,11 @@ static int rrd_queue_dequeue (const char *filename,
   return (0);
 } /* int rrd_queue_dequeue */
 
-static void rrd_cache_flush (int timeout)
+/* XXX: You must hold "cache_lock" when calling this function! */
+static void rrd_cache_flush (cdtime_t timeout)
 {
        rrd_cache_t *rc;
-       time_t       now;
+       cdtime_t     now;
 
        char **keys = NULL;
        int    keys_num = 0;
@@ -853,9 +525,11 @@ static void rrd_cache_flush (int timeout)
        c_avl_iterator_t *iter;
        int i;
 
-       DEBUG ("rrdtool plugin: Flushing cache, timeout = %i", timeout);
+       DEBUG ("rrdtool plugin: Flushing cache, timeout = %.3f",
+                       CDTIME_T_TO_DOUBLE (timeout));
 
-       now = time (NULL);
+       now = cdtime ();
+       timeout = TIME_T_TO_CDTIME_T (timeout);
 
        /* Build a list of entries to be flushed */
        iter = c_avl_get_iterator (cache);
@@ -863,7 +537,9 @@ static void rrd_cache_flush (int timeout)
        {
                if (rc->flags != FLAG_NONE)
                        continue;
-               else if ((now - rc->first_value) < timeout)
+               /* timeout == 0  =>  flush everything */
+               else if ((timeout != 0)
+                               && ((now - rc->first_value) < timeout))
                        continue;
                else if (rc->values_num > 0)
                {
@@ -916,10 +592,11 @@ static void rrd_cache_flush (int timeout)
        cache_flush_last = now;
 } /* void rrd_cache_flush */
 
-static int rrd_cache_flush_identifier (int timeout, const char *identifier)
+static int rrd_cache_flush_identifier (cdtime_t timeout,
+    const char *identifier)
 {
   rrd_cache_t *rc;
-  time_t now;
+  cdtime_t now;
   int status;
   char key[2048];
 
@@ -929,7 +606,7 @@ static int rrd_cache_flush_identifier (int timeout, const char *identifier)
     return (0);
   }
 
-  now = time (NULL);
+  now = cdtime ();
 
   if (datadir == NULL)
     snprintf (key, sizeof (key), "%s.rrd",
@@ -942,7 +619,7 @@ static int rrd_cache_flush_identifier (int timeout, const char *identifier)
   status = c_avl_get (cache, key, (void *) &rc);
   if (status != 0)
   {
-    WARNING ("rrdtool plugin: rrd_cache_flush_identifier: "
+    INFO ("rrdtool plugin: rrd_cache_flush_identifier: "
         "c_avl_get (%s) failed. Does that file really exist?",
         key);
     return (status);
@@ -973,8 +650,43 @@ static int rrd_cache_flush_identifier (int timeout, const char *identifier)
   return (status);
 } /* int rrd_cache_flush_identifier */
 
+static int64_t rrd_get_random_variation (void)
+{
+  double dbl_timeout;
+  cdtime_t ctm_timeout;
+  double rand_fact;
+  _Bool negative;
+  int64_t ret;
+
+  if (random_timeout <= 0)
+    return (0);
+
+  /* Assure that "cache_timeout + random_variation" is never negative. */
+  if (random_timeout > cache_timeout)
+  {
+         INFO ("rrdtool plugin: Adjusting \"RandomTimeout\" to %.3f seconds.",
+                         CDTIME_T_TO_DOUBLE (cache_timeout));
+         random_timeout = cache_timeout;
+  }
+
+  /* This seems a bit complicated, but "random_timeout" is likely larger than
+   * RAND_MAX, so we can't simply use modulo here. */
+  dbl_timeout = CDTIME_T_TO_DOUBLE (random_timeout);
+  rand_fact = ((double) random ())
+    / ((double) RAND_MAX);
+  negative = (_Bool) (random () % 2);
+
+  ctm_timeout = DOUBLE_TO_CDTIME_T (dbl_timeout * rand_fact);
+
+  ret = (int64_t) ctm_timeout;
+  if (negative)
+    ret *= -1;
+
+  return (ret);
+} /* int64_t rrd_get_random_variation */
+
 static int rrd_cache_insert (const char *filename,
-               const char *value, time_t value_time)
+               const char *value, cdtime_t value_time)
 {
        rrd_cache_t *rc = NULL;
        int new_rc = 0;
@@ -995,13 +707,14 @@ static int rrd_cache_insert (const char *filename,
 
        if (rc == NULL)
        {
-               rc = (rrd_cache_t *) malloc (sizeof (rrd_cache_t));
+               rc = malloc (sizeof (*rc));
                if (rc == NULL)
                        return (-1);
                rc->values_num = 0;
                rc->values = NULL;
                rc->first_value = 0;
                rc->last_value = 0;
+               rc->random_variation = rrd_get_random_variation ();
                rc->flags = FLAG_NONE;
                new_rc = 1;
        }
@@ -1009,9 +722,9 @@ static int rrd_cache_insert (const char *filename,
        if (rc->last_value >= value_time)
        {
                pthread_mutex_unlock (&cache_lock);
-               WARNING ("rrdtool plugin: (rc->last_value = %u) >= (value_time = %u)",
-                               (unsigned int) rc->last_value,
-                               (unsigned int) value_time);
+               DEBUG ("rrdtool plugin: (rc->last_value = %"PRIu64") "
+                               ">= (value_time = %"PRIu64")",
+                               rc->last_value, value_time);
                return (-1);
        }
 
@@ -1068,11 +781,11 @@ static int rrd_cache_insert (const char *filename,
        }
 
        DEBUG ("rrdtool plugin: rrd_cache_insert: file = %s; "
-                       "values_num = %i; age = %lu;",
+                       "values_num = %i; age = %.3f;",
                        filename, rc->values_num,
-                       (unsigned long)(rc->last_value - rc->first_value));
+                       CDTIME_T_TO_DOUBLE (rc->last_value - rc->first_value));
 
-       if ((rc->last_value - rc->first_value) >= cache_timeout)
+       if ((rc->last_value - rc->first_value) >= (cache_timeout + rc->random_variation))
        {
                /* XXX: If you need to lock both, cache_lock and queue_lock, at
                 * the same time, ALWAYS lock `cache_lock' first! */
@@ -1083,6 +796,8 @@ static int rrd_cache_insert (const char *filename,
                        status = rrd_queue_enqueue (filename, &queue_head, &queue_tail);
                        if (status == 0)
                                rc->flags = FLAG_QUEUED;
+
+                        rc->random_variation = rrd_get_random_variation ();
                }
                else
                {
@@ -1091,7 +806,7 @@ static int rrd_cache_insert (const char *filename,
        }
 
        if ((cache_timeout > 0) &&
-                       ((time (NULL) - cache_flush_last) > cache_flush_timeout))
+                       ((cdtime () - cache_flush_last) > cache_flush_timeout))
                rrd_cache_flush (cache_flush_timeout);
 
        pthread_mutex_unlock (&cache_lock);
@@ -1099,6 +814,59 @@ static int rrd_cache_insert (const char *filename,
        return (0);
 } /* int rrd_cache_insert */
 
+static int rrd_cache_destroy (void) /* {{{ */
+{
+  void *key = NULL;
+  void *value = NULL;
+
+  int non_empty = 0;
+
+  pthread_mutex_lock (&cache_lock);
+
+  if (cache == NULL)
+  {
+    pthread_mutex_unlock (&cache_lock);
+    return (0);
+  }
+
+  while (c_avl_pick (cache, &key, &value) == 0)
+  {
+    rrd_cache_t *rc;
+    int i;
+
+    sfree (key);
+    key = NULL;
+
+    rc = value;
+    value = NULL;
+
+    if (rc->values_num > 0)
+      non_empty++;
+
+    for (i = 0; i < rc->values_num; i++)
+      sfree (rc->values[i]);
+    sfree (rc->values);
+    sfree (rc);
+  }
+
+  c_avl_destroy (cache);
+  cache = NULL;
+
+  if (non_empty > 0)
+  {
+    INFO ("rrdtool plugin: %i cache %s had values when destroying the cache.",
+        non_empty, (non_empty == 1) ? "entry" : "entries");
+  }
+  else
+  {
+    DEBUG ("rrdtool plugin: No values have been lost "
+        "when destroying the cache.");
+  }
+
+  pthread_mutex_unlock (&cache_lock);
+  return (0);
+} /* }}} int rrd_cache_destroy */
+
 static int rrd_compare_numeric (const void *a_ptr, const void *b_ptr)
 {
        int a = *((int *) a_ptr);
@@ -1112,13 +880,17 @@ static int rrd_compare_numeric (const void *a_ptr, const void *b_ptr)
                return (0);
 } /* int rrd_compare_numeric */
 
-static int rrd_write (const data_set_t *ds, const value_list_t *vl)
+static int rrd_write (const data_set_t *ds, const value_list_t *vl,
+               user_data_t __attribute__((unused)) *user_data)
 {
        struct stat  statbuf;
        char         filename[512];
        char         values[512];
        int          status;
 
+       if (do_shutdown)
+               return (0);
+
        if (0 != strcmp (ds->type, vl->type)) {
                ERROR ("rrdtool plugin: DS type does not match value list type");
                return -1;
@@ -1134,7 +906,9 @@ static int rrd_write (const data_set_t *ds, const value_list_t *vl)
        {
                if (errno == ENOENT)
                {
-                       if (rrd_create_file (filename, ds, vl))
+                       status = cu_rrd_create_file (filename,
+                                       ds, vl, &rrdcreate_config);
+                       if (status != 0)
                                return (-1);
                }
                else
@@ -1158,7 +932,8 @@ static int rrd_write (const data_set_t *ds, const value_list_t *vl)
        return (status);
 } /* int rrd_write */
 
-static int rrd_flush (int timeout, const char *identifier)
+static int rrd_flush (cdtime_t timeout, const char *identifier,
+               __attribute__((unused)) user_data_t *user_data)
 {
        pthread_mutex_lock (&cache_lock);
 
@@ -1177,7 +952,7 @@ static int rrd_config (const char *key, const char *value)
 {
        if (strcasecmp ("CacheTimeout", key) == 0)
        {
-               int tmp = atoi (value);
+               double tmp = atof (value);
                if (tmp < 0)
                {
                        fprintf (stderr, "rrdtool: `CacheTimeout' must "
@@ -1186,7 +961,7 @@ static int rrd_config (const char *key, const char *value)
                                        "be greater than 0.\n");
                        return (1);
                }
-               cache_timeout = tmp;
+               cache_timeout = DOUBLE_TO_CDTIME_T (tmp);
        }
        else if (strcasecmp ("CacheFlush", key) == 0)
        {
@@ -1223,15 +998,15 @@ static int rrd_config (const char *key, const char *value)
        }
        else if (strcasecmp ("StepSize", key) == 0)
        {
-               stepsize = atoi (value);
-               if (stepsize < 0)
-                       stepsize = 0;
+               int temp = atoi (value);
+               if (temp > 0)
+                       rrdcreate_config.stepsize = temp;
        }
        else if (strcasecmp ("HeartBeat", key) == 0)
        {
-               heartbeat = atoi (value);
-               if (heartbeat < 0)
-                       heartbeat = 0;
+               int temp = atoi (value);
+               if (temp > 0)
+                       rrdcreate_config.heartbeat = temp;
        }
        else if (strcasecmp ("RRARows", key) == 0)
        {
@@ -1244,7 +1019,7 @@ static int rrd_config (const char *key, const char *value)
                                        "be greater than 0.\n");
                        return (1);
                }
-               rrarows = tmp;
+               rrdcreate_config.rrarows = tmp;
        }
        else if (strcasecmp ("RRATimespan", key) == 0)
        {
@@ -1263,8 +1038,8 @@ static int rrd_config (const char *key, const char *value)
                {
                        dummy = NULL;
                        
-                       tmp_alloc = realloc (rra_timespans_custom,
-                                       sizeof (int) * (rra_timespans_custom_num + 1));
+                       tmp_alloc = realloc (rrdcreate_config.timespans,
+                                       sizeof (int) * (rrdcreate_config.timespans_num + 1));
                        if (tmp_alloc == NULL)
                        {
                                fprintf (stderr, "rrdtool: realloc failed.\n");
@@ -1272,15 +1047,15 @@ static int rrd_config (const char *key, const char *value)
                                free (value_copy);
                                return (1);
                        }
-                       rra_timespans_custom = tmp_alloc;
-                       rra_timespans_custom[rra_timespans_custom_num] = atoi (ptr);
-                       if (rra_timespans_custom[rra_timespans_custom_num] != 0)
-                               rra_timespans_custom_num++;
+                       rrdcreate_config.timespans = tmp_alloc;
+                       rrdcreate_config.timespans[rrdcreate_config.timespans_num] = atoi (ptr);
+                       if (rrdcreate_config.timespans[rrdcreate_config.timespans_num] != 0)
+                               rrdcreate_config.timespans_num++;
                } /* while (strtok_r) */
 
-               qsort (/* base = */ rra_timespans_custom,
-                               /* nmemb  = */ rra_timespans_custom_num,
-                               /* size   = */ sizeof (rra_timespans_custom[0]),
+               qsort (/* base = */ rrdcreate_config.timespans,
+                               /* nmemb  = */ rrdcreate_config.timespans_num,
+                               /* size   = */ sizeof (rrdcreate_config.timespans[0]),
                                /* compar = */ rrd_compare_numeric);
 
                free (value_copy);
@@ -1296,7 +1071,7 @@ static int rrd_config (const char *key, const char *value)
                                        "be in the range 0 to 1 (exclusive).");
                        return (1);
                }
-               xff = tmp;
+               rrdcreate_config.xff = tmp;
        }
        else if (strcasecmp ("WritesPerSecond", key) == 0)
        {
@@ -1317,6 +1092,23 @@ static int rrd_config (const char *key, const char *value)
                        write_rate = 1.0 / wps;
                }
        }
+       else if (strcasecmp ("RandomTimeout", key) == 0)
+        {
+               double tmp;
+
+               tmp = atof (value);
+               if (tmp < 0.0)
+               {
+                       fprintf (stderr, "rrdtool: `RandomTimeout' must "
+                                       "be greater than or equal to zero.\n");
+                       ERROR ("rrdtool: `RandomTimeout' must "
+                                       "be greater then or equal to zero.");
+               }
+               else
+               {
+                       random_timeout = DOUBLE_TO_CDTIME_T (tmp);
+               }
+       }
        else
        {
                return (-1);
@@ -1327,7 +1119,7 @@ static int rrd_config (const char *key, const char *value)
 static int rrd_shutdown (void)
 {
        pthread_mutex_lock (&cache_lock);
-       rrd_cache_flush (-1);
+       rrd_cache_flush (0);
        pthread_mutex_unlock (&cache_lock);
 
        pthread_mutex_lock (&queue_lock);
@@ -1335,31 +1127,52 @@ static int rrd_shutdown (void)
        pthread_cond_signal (&queue_cond);
        pthread_mutex_unlock (&queue_lock);
 
+       if ((queue_thread_running != 0)
+                       && ((queue_head != NULL) || (flushq_head != NULL)))
+       {
+               INFO ("rrdtool plugin: Shutting down the queue thread. "
+                               "This may take a while.");
+       }
+       else if (queue_thread_running != 0)
+       {
+               INFO ("rrdtool plugin: Shutting down the queue thread.");
+       }
+
        /* Wait for all the values to be written to disk before returning. */
-       if (queue_thread != 0)
+       if (queue_thread_running != 0)
        {
                pthread_join (queue_thread, NULL);
-               queue_thread = 0;
+               memset (&queue_thread, 0, sizeof (queue_thread));
+               queue_thread_running = 0;
                DEBUG ("rrdtool plugin: queue_thread exited.");
        }
 
+       rrd_cache_destroy ();
+
        return (0);
 } /* int rrd_shutdown */
 
 static int rrd_init (void)
 {
+       static int init_once = 0;
        int status;
 
-       if (stepsize < 0)
-               stepsize = 0;
-       if (heartbeat <= 0)
-               heartbeat = 2 * stepsize;
+       if (init_once != 0)
+               return (0);
+       init_once = 1;
+
+       if (rrdcreate_config.stepsize < 0)
+               rrdcreate_config.stepsize = 0;
+       if (rrdcreate_config.heartbeat <= 0)
+               rrdcreate_config.heartbeat = 2 * rrdcreate_config.stepsize;
 
-       if ((heartbeat > 0) && (heartbeat < interval_g))
+       if ((rrdcreate_config.heartbeat > 0)
+                       && (rrdcreate_config.heartbeat < CDTIME_T_TO_TIME_T (interval_g)))
                WARNING ("rrdtool plugin: Your `heartbeat' is "
                                "smaller than your `interval'. This will "
                                "likely cause problems.");
-       else if ((stepsize > 0) && (stepsize < interval_g))
+       else if ((rrdcreate_config.stepsize > 0)
+                       && (rrdcreate_config.stepsize < CDTIME_T_TO_TIME_T (interval_g)))
                WARNING ("rrdtool plugin: Your `stepsize' is "
                                "smaller than your `interval'. This will "
                                "create needlessly big RRD-files.");
@@ -1374,10 +1187,9 @@ static int rrd_init (void)
                return (-1);
        }
 
-       cache_flush_last = time (NULL);
-       if (cache_timeout < 2)
+       cache_flush_last = cdtime ();
+       if (cache_timeout == 0)
        {
-               cache_timeout = 0;
                cache_flush_timeout = 0;
        }
        else if (cache_flush_timeout < cache_timeout)
@@ -1385,17 +1197,22 @@ static int rrd_init (void)
 
        pthread_mutex_unlock (&cache_lock);
 
-       status = pthread_create (&queue_thread, NULL, rrd_queue_thread, NULL);
+       status = pthread_create (&queue_thread, /* attr = */ NULL,
+                       rrd_queue_thread, /* args = */ NULL);
        if (status != 0)
        {
                ERROR ("rrdtool plugin: Cannot create queue-thread.");
                return (-1);
        }
+       queue_thread_running = 1;
 
        DEBUG ("rrdtool plugin: rrd_init: datadir = %s; stepsize = %i;"
                        " heartbeat = %i; rrarows = %i; xff = %lf;",
                        (datadir == NULL) ? "(null)" : datadir,
-                       stepsize, heartbeat, rrarows, xff);
+                       rrdcreate_config.stepsize,
+                       rrdcreate_config.heartbeat,
+                       rrdcreate_config.rrarows,
+                       rrdcreate_config.xff);
 
        return (0);
 } /* int rrd_init */
@@ -1405,7 +1222,7 @@ void module_register (void)
        plugin_register_config ("rrdtool", rrd_config,
                        config_keys, config_keys_num);
        plugin_register_init ("rrdtool", rrd_init);
-       plugin_register_write ("rrdtool", rrd_write);
-       plugin_register_flush ("rrdtool", rrd_flush);
+       plugin_register_write ("rrdtool", rrd_write, /* user_data = */ NULL);
+       plugin_register_flush ("rrdtool", rrd_flush, /* user_data = */ NULL);
        plugin_register_shutdown ("rrdtool", rrd_shutdown);
 }