bindings/erlang: Added README.
[collectd.git] / src / rrdtool.c
index 6ddbfc9..ee5d70c 100644 (file)
@@ -1,6 +1,8 @@
 /**
  * collectd - src/rrdtool.c
  * Copyright (C) 2006-2008  Florian octo Forster
+ * Copyright (C) 2008-2008  Sebastian Harl
+ * Copyright (C) 2009       Mariusz Gronczewski
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License as published by the
@@ -17,6 +19,8 @@
  *
  * Authors:
  *   Florian octo Forster <octo at verplant.org>
+ *   Sebastian Harl <sh at tokkee.org>
+ *   Mariusz Gronczewski <xani666 at gmail.com>
  **/
 
 #include "collectd.h"
@@ -40,6 +44,7 @@ struct rrd_cache_s
        char **values;
        time_t first_value;
        time_t last_value;
+       int random_variation;
        enum
        {
                FLAG_NONE   = 0x00,
@@ -76,7 +81,8 @@ static const char *config_keys[] =
        "RRARows",
        "RRATimespan",
        "XFF",
-       "WritesPerSecond"
+       "WritesPerSecond",
+       "RandomTimeout"
 };
 static int config_keys_num = STATIC_ARRAY_SIZE (config_keys);
 
@@ -103,6 +109,7 @@ static rrdcreate_config_t rrdcreate_config =
  * ALWAYS lock `cache_lock' first! */
 static int         cache_timeout = 0;
 static int         cache_flush_timeout = 0;
+static int         random_timeout = 1;
 static time_t      cache_flush_last;
 static c_avl_tree_t *cache = NULL;
 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
@@ -111,7 +118,8 @@ static rrd_queue_t    *queue_head = NULL;
 static rrd_queue_t    *queue_tail = NULL;
 static rrd_queue_t    *flushq_head = NULL;
 static rrd_queue_t    *flushq_tail = NULL;
-static pthread_t       queue_thread = 0;
+static pthread_t       queue_thread;
+static int             queue_thread_running = 1;
 static pthread_mutex_t queue_lock = PTHREAD_MUTEX_INITIALIZER;
 static pthread_cond_t  queue_cond = PTHREAD_COND_INITIALIZER;
 
@@ -203,15 +211,23 @@ static int value_list_to_string (char *buffer, int buffer_len,
        for (i = 0; i < ds->ds_num; i++)
        {
                if ((ds->ds[i].type != DS_TYPE_COUNTER)
-                               && (ds->ds[i].type != DS_TYPE_GAUGE))
+                               && (ds->ds[i].type != DS_TYPE_GAUGE)
+                               && (ds->ds[i].type != DS_TYPE_DERIVE)
+                               && (ds->ds[i].type != DS_TYPE_ABSOLUTE))
                        return (-1);
 
                if (ds->ds[i].type == DS_TYPE_COUNTER)
                        status = ssnprintf (buffer + offset, buffer_len - offset,
                                        ":%llu", vl->values[i].counter);
-               else
+               else if (ds->ds[i].type == DS_TYPE_GAUGE)
                        status = ssnprintf (buffer + offset, buffer_len - offset,
                                        ":%lf", vl->values[i].gauge);
+               else if (ds->ds[i].type == DS_TYPE_DERIVE)
+                       status = ssnprintf (buffer + offset, buffer_len - offset,
+                                       ":%"PRIi64, vl->values[i].derive);
+               else /*if (ds->ds[i].type == DS_TYPE_ABSOLUTE) */
+                       status = ssnprintf (buffer + offset, buffer_len - offset,
+                                       ":%"PRIu64, vl->values[i].absolute);
 
                if ((status < 1) || (status >= (buffer_len - offset)))
                        return (-1);
@@ -279,14 +295,17 @@ static void *rrd_queue_thread (void __attribute__((unused)) *data)
                rrd_cache_t *cache_entry;
                char **values;
                int    values_num;
+               int    status;
                int    i;
 
+               values = NULL;
+               values_num = 0;
+
                 pthread_mutex_lock (&queue_lock);
                 /* Wait for values to arrive */
                 while (true)
                 {
                   struct timespec ts_wait;
-                  int status;
 
                   while ((flushq_head == NULL) && (queue_head == NULL)
                       && (do_shutdown == 0))
@@ -361,17 +380,28 @@ static void *rrd_queue_thread (void __attribute__((unused)) *data)
                 * we make a copy of it's values */
                pthread_mutex_lock (&cache_lock);
 
-               c_avl_get (cache, queue_entry->filename, (void *) &cache_entry);
+               status = c_avl_get (cache, queue_entry->filename,
+                               (void *) &cache_entry);
 
-               values = cache_entry->values;
-               values_num = cache_entry->values_num;
+               if (status == 0)
+               {
+                       values = cache_entry->values;
+                       values_num = cache_entry->values_num;
 
-               cache_entry->values = NULL;
-               cache_entry->values_num = 0;
-               cache_entry->flags = FLAG_NONE;
+                       cache_entry->values = NULL;
+                       cache_entry->values_num = 0;
+                       cache_entry->flags = FLAG_NONE;
+               }
 
                pthread_mutex_unlock (&cache_lock);
 
+               if (status != 0)
+               {
+                       sfree (queue_entry->filename);
+                       sfree (queue_entry);
+                       continue;
+               }
+
                /* Update `tv_next_update' */
                if (write_rate > 0.0) 
                 {
@@ -389,8 +419,9 @@ static void *rrd_queue_thread (void __attribute__((unused)) *data)
                /* Write the values to the RRD-file */
                srrd_update (queue_entry->filename, NULL,
                                values_num, (const char **)values);
-               DEBUG ("rrdtool plugin: queue thread: Wrote %i values to %s",
-                               values_num, queue_entry->filename);
+               DEBUG ("rrdtool plugin: queue thread: Wrote %i value%s to %s",
+                               values_num, (values_num == 1) ? "" : "s",
+                               queue_entry->filename);
 
                for (i = 0; i < values_num; i++)
                {
@@ -585,7 +616,7 @@ static int rrd_cache_flush_identifier (int timeout, const char *identifier)
   status = c_avl_get (cache, key, (void *) &rc);
   if (status != 0)
   {
-    WARNING ("rrdtool plugin: rrd_cache_flush_identifier: "
+    INFO ("rrdtool plugin: rrd_cache_flush_identifier: "
         "c_avl_get (%s) failed. Does that file really exist?",
         key);
     return (status);
@@ -625,6 +656,15 @@ static int rrd_cache_insert (const char *filename,
 
        pthread_mutex_lock (&cache_lock);
 
+       /* This shouldn't happen, but it did happen at least once, so we'll be
+        * careful. */
+       if (cache == NULL)
+       {
+               pthread_mutex_unlock (&cache_lock);
+               WARNING ("rrdtool plugin: cache == NULL.");
+               return (-1);
+       }
+
        c_avl_get (cache, filename, (void *) &rc);
 
        if (rc == NULL)
@@ -706,7 +746,7 @@ static int rrd_cache_insert (const char *filename,
                        filename, rc->values_num,
                        (unsigned long)(rc->last_value - rc->first_value));
 
-       if ((rc->last_value - rc->first_value) >= cache_timeout)
+       if ((rc->last_value + rc->random_variation - rc->first_value) >= cache_timeout)
        {
                /* XXX: If you need to lock both, cache_lock and queue_lock, at
                 * the same time, ALWAYS lock `cache_lock' first! */
@@ -717,6 +757,18 @@ static int rrd_cache_insert (const char *filename,
                        status = rrd_queue_enqueue (filename, &queue_head, &queue_tail);
                        if (status == 0)
                                rc->flags = FLAG_QUEUED;
+
+                       /* Update the jitter value. Negative values are
+                        * slightly preferred. */
+                       if (random_timeout > 0)
+                       {
+                               rc->random_variation = (rand () % (2 * random_timeout))
+                                       - random_timeout;
+                       }
+                       else
+                       {
+                               rc->random_variation = 0;
+                       }
                }
                else
                {
@@ -955,6 +1007,23 @@ static int rrd_config (const char *key, const char *value)
                        write_rate = 1.0 / wps;
                }
        }
+       else if (strcasecmp ("RandomTimeout", key) == 0)
+        {
+               int tmp;
+
+               tmp = atoi (value);
+               if (tmp < 0)
+               {
+                       fprintf (stderr, "rrdtool: `RandomTimeout' must "
+                                       "be greater than or equal to zero.\n");
+                       ERROR ("rrdtool: `RandomTimeout' must "
+                                       "be greater then or equal to zero.");
+               }
+               else
+               {
+                       random_timeout = tmp;
+               }
+       }
        else
        {
                return (-1);
@@ -973,21 +1042,40 @@ static int rrd_shutdown (void)
        pthread_cond_signal (&queue_cond);
        pthread_mutex_unlock (&queue_lock);
 
+       if ((queue_thread_running != 0)
+                       && ((queue_head != NULL) || (flushq_head != NULL)))
+       {
+               INFO ("rrdtool plugin: Shutting down the queue thread. "
+                               "This may take a while.");
+       }
+       else if (queue_thread_running != 0)
+       {
+               INFO ("rrdtool plugin: Shutting down the queue thread.");
+       }
+
        /* Wait for all the values to be written to disk before returning. */
-       if (queue_thread != 0)
+       if (queue_thread_running != 0)
        {
                pthread_join (queue_thread, NULL);
-               queue_thread = 0;
+               memset (&queue_thread, 0, sizeof (queue_thread));
+               queue_thread_running = 0;
                DEBUG ("rrdtool plugin: queue_thread exited.");
        }
 
+       /* TODO: Maybe it'd be a good idea to free the cache here.. */
+
        return (0);
 } /* int rrd_shutdown */
 
 static int rrd_init (void)
 {
+       static int init_once = 0;
        int status;
 
+       if (init_once != 0)
+               return (0);
+       init_once = 1;
+
        if (rrdcreate_config.stepsize < 0)
                rrdcreate_config.stepsize = 0;
        if (rrdcreate_config.heartbeat <= 0)
@@ -1025,12 +1113,14 @@ static int rrd_init (void)
 
        pthread_mutex_unlock (&cache_lock);
 
-       status = pthread_create (&queue_thread, NULL, rrd_queue_thread, NULL);
+       status = pthread_create (&queue_thread, /* attr = */ NULL,
+                       rrd_queue_thread, /* args = */ NULL);
        if (status != 0)
        {
                ERROR ("rrdtool plugin: Cannot create queue-thread.");
                return (-1);
        }
+       queue_thread_running = 1;
 
        DEBUG ("rrdtool plugin: rrd_init: datadir = %s; stepsize = %i;"
                        " heartbeat = %i; rrarows = %i; xff = %lf;",