/**
* collectd - src/rrdtool.c
- * Copyright (C) 2006 Florian octo Forster
+ * Copyright (C) 2006,2007 Florian octo Forster
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
#include "collectd.h"
#include "plugin.h"
#include "common.h"
-#include "utils_llist.h"
-#include "utils_debug.h"
+#include "utils_avltree.h"
-/*
- * This weird macro cascade forces the glibc to define `NAN'. I don't know
- * another way to solve this, so more intelligent solutions are welcome. -octo
- */
-#ifndef __USE_ISOC99
-# define DISABLE__USE_ISOC99 1
-# define __USE_ISOC99 1
-#endif
-#include <math.h>
-#ifdef DISABLE__USE_ISOC99
-# undef DISABLE__USE_ISOC99
-# undef __USE_ISOC99
+#include <rrd.h>
+
+#if HAVE_PTHREAD_H
+# include <pthread.h>
#endif
/*
int values_num;
char **values;
time_t first_value;
+ time_t last_value;
+ enum
+ {
+ FLAG_NONE = 0x00,
+ FLAG_QUEUED = 0x01
+ } flags;
};
typedef struct rrd_cache_s rrd_cache_t;
+struct rrd_queue_s
+{
+ char *filename;
+ struct rrd_queue_s *next;
+};
+typedef struct rrd_queue_s rrd_queue_t;
+
/*
* Private variables
*/
86400,
604800,
2678400,
- 31622400,
- 0
+ 31622400
};
-static int rra_timespans_num = 5;
+static int rra_timespans_num = STATIC_ARRAY_SIZE (rra_timespans);
+
+static int *rra_timespans_custom = NULL;
+static int rra_timespans_custom_num = 0;
static char *rra_types[] =
{
"AVERAGE",
"MIN",
- "MAX",
- NULL
+ "MAX"
};
-static int rra_types_num = 3;
+static int rra_types_num = STATIC_ARRAY_SIZE (rra_types);
static const char *config_keys[] =
{
"CacheTimeout",
- NULL
+ "CacheFlush",
+ "DataDir",
+ "StepSize",
+ "HeartBeat",
+ "RRARows",
+ "RRATimespan",
+ "XFF"
};
-static int config_keys_num = 1;
+static int config_keys_num = STATIC_ARRAY_SIZE (config_keys);
+
+/* If datadir is zero, the daemon's basedir is used. If stepsize or heartbeat
+ * is zero a default, depending on the `interval' member of the value list is
+ * being used. */
+static char *datadir = NULL;
+static int stepsize = 0;
+static int heartbeat = 0;
+static int rrarows = 1200;
+static double xff = 0.1;
+
+/* XXX: If you need to lock both, cache_lock and queue_lock, at the same time,
+ * ALWAYS lock `cache_lock' first! */
+static int cache_timeout = 0;
+static int cache_flush_timeout = 0;
+static time_t cache_flush_last;
+static c_avl_tree_t *cache = NULL;
+static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
+
+static rrd_queue_t *queue_head = NULL;
+static rrd_queue_t *queue_tail = NULL;
+static pthread_t queue_thread = 0;
+static pthread_mutex_t queue_lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
+
+#if !HAVE_THREADSAFE_LIBRRD
+static pthread_mutex_t librrd_lock = PTHREAD_MUTEX_INITIALIZER;
+#endif
-static int cache_timeout = 0;
-static time_t cache_flush;
-static llist_t *cache = NULL;
+static int do_shutdown = 0;
/* * * * * * * * * *
* WARNING: Magic *
* * * * * * * * * */
-static int rra_get (char ***ret)
+
+static void rra_free (int rra_num, char **rra_def)
{
- static char **rra_def = NULL;
- static int rra_num = 0;
+ int i;
+
+ for (i = 0; i < rra_num; i++)
+ {
+ sfree (rra_def[i]);
+ }
+ sfree (rra_def);
+} /* void rra_free */
+
+static int rra_get (char ***ret, const value_list_t *vl)
+{
+ char **rra_def;
+ int rra_num;
+
+ int *rts;
+ int rts_num;
- int rra_max = rra_timespans_num * rra_types_num;
+ int rra_max;
- int step;
- int rows;
int span;
int cdp_num;
char buffer[64];
- if ((rra_num != 0) && (rra_def != NULL))
- {
- *ret = rra_def;
- return (rra_num);
- }
+ /* The stepsize we use here: If it is user-set, use it. If not, use the
+ * interval of the value-list. */
+ int ss;
- if ((rra_def = (char **) malloc ((rra_max + 1) * sizeof (char *))) == NULL)
+ if (rrarows <= 0)
+ {
+ *ret = NULL;
return (-1);
- memset (rra_def, '\0', (rra_max + 1) * sizeof (char *));
-
- step = atoi (COLLECTD_STEP);
- rows = atoi (COLLECTD_ROWS);
+ }
- if ((step <= 0) || (rows <= 0))
+ ss = (stepsize > 0) ? stepsize : vl->interval;
+ if (ss <= 0)
{
*ret = NULL;
return (-1);
}
+ /* Use the configured timespans or fall back to the built-in defaults */
+ if (rra_timespans_custom_num != 0)
+ {
+ rts = rra_timespans_custom;
+ rts_num = rra_timespans_custom_num;
+ }
+ else
+ {
+ rts = rra_timespans;
+ rts_num = rra_timespans_num;
+ }
+
+ rra_max = rts_num * rra_types_num;
+
+ if ((rra_def = (char **) malloc ((rra_max + 1) * sizeof (char *))) == NULL)
+ return (-1);
+ memset (rra_def, '\0', (rra_max + 1) * sizeof (char *));
+ rra_num = 0;
+
cdp_len = 0;
- for (i = 0; i < rra_timespans_num; i++)
+ for (i = 0; i < rts_num; i++)
{
- span = rra_timespans[i];
+ span = rts[i];
- if ((span / step) < rows)
+ if ((span / ss) < rrarows)
continue;
if (cdp_len == 0)
cdp_len = 1;
else
- cdp_len = (int) floor (((double) span) / ((double) (rows * step)));
+ cdp_len = (int) floor (((double) span)
+ / ((double) (rrarows * ss)));
- cdp_num = (int) ceil (((double) span) / ((double) (cdp_len * step)));
+ cdp_num = (int) ceil (((double) span)
+ / ((double) (cdp_len * ss)));
for (j = 0; j < rra_types_num; j++)
{
if (rra_num >= rra_max)
break;
- if (snprintf (buffer, sizeof(buffer), "RRA:%s:%3.1f:%u:%u",
- rra_types[j], COLLECTD_XFF,
+ if (snprintf (buffer, sizeof (buffer), "RRA:%s:%3.1f:%u:%u",
+ rra_types[j], xff,
cdp_len, cdp_num) >= sizeof (buffer))
{
- syslog (LOG_ERR, "rra_get: Buffer would have been truncated.");
+ ERROR ("rra_get: Buffer would have been truncated.");
continue;
}
}
#if COLLECT_DEBUG
- DBG ("rra_num = %i", rra_num);
+ DEBUG ("rra_num = %i", rra_num);
for (i = 0; i < rra_num; i++)
- DBG (" %s", rra_def[i]);
+ DEBUG (" %s", rra_def[i]);
#endif
*ret = rra_def;
return (rra_num);
-}
+} /* int rra_get */
static void ds_free (int ds_num, char **ds_def)
{
free (ds_def);
}
-static int ds_get (char ***ret, const data_set_t *ds)
+static int ds_get (char ***ret, const data_set_t *ds, const value_list_t *vl)
{
char **ds_def;
int ds_num;
char max[32];
char buffer[128];
- DBG ("ds->ds_num = %i", ds->ds_num);
+ DEBUG ("ds->ds_num = %i", ds->ds_num);
ds_def = (char **) malloc (ds->ds_num * sizeof (char *));
if (ds_def == NULL)
{
- syslog (LOG_ERR, "rrdtool plugin: malloc failed: %s",
- strerror (errno));
+ char errbuf[1024];
+ ERROR ("rrdtool plugin: malloc failed: %s",
+ sstrerror (errno, errbuf, sizeof (errbuf)));
return (-1);
}
memset (ds_def, '\0', ds->ds_num * sizeof (char *));
type = "GAUGE";
else
{
- syslog (LOG_ERR, "rrdtool plugin: Unknown DS type: %i",
+ ERROR ("rrdtool plugin: Unknown DS type: %i",
d->type);
break;
}
- if (d->min == NAN)
+ if (isnan (d->min))
{
strcpy (min, "U");
}
min[sizeof (min) - 1] = '\0';
}
- if (d->max == NAN)
+ if (isnan (d->max))
{
strcpy (max, "U");
}
}
status = snprintf (buffer, sizeof (buffer),
- "DS:%s:%s:%s:%s:%s",
- d->name, type, COLLECTD_HEARTBEAT,
+ "DS:%s:%s:%i:%s:%s",
+ d->name, type,
+ (heartbeat > 0) ? heartbeat : (2 * vl->interval),
min, max);
if ((status < 1) || (status >= sizeof (buffer)))
break;
#if COLLECT_DEBUG
{
int i;
- DBG ("ds_num = %i", ds_num);
+ DEBUG ("ds_num = %i", ds_num);
for (i = 0; i < ds_num; i++)
- DBG (" %s", ds_def[i]);
+ DEBUG (" %s", ds_def[i]);
}
#endif
return (ds_num);
}
-static int rrd_create_file (char *filename, const data_set_t *ds)
+#if HAVE_THREADSAFE_LIBRRD
+static int srrd_create (char *filename, unsigned long pdp_step, time_t last_up,
+ int argc, char **argv)
+{
+ int status;
+
+ optind = 0; /* bug in librrd? */
+ rrd_clear_error ();
+
+ status = rrd_create_r (filename, pdp_step, last_up, argc, argv);
+
+ if (status != 0)
+ {
+ WARNING ("rrdtool plugin: rrd_create_r (%s) failed: %s",
+ filename, rrd_get_error ());
+ }
+
+ return (status);
+} /* int srrd_create */
+
+static int srrd_update (char *filename, char *template, int argc, char **argv)
+{
+ int status;
+
+ optind = 0; /* bug in librrd? */
+ rrd_clear_error ();
+
+ status = rrd_update_r (filename, template, argc, argv);
+
+ if (status != 0)
+ {
+ WARNING ("rrdtool plugin: rrd_update_r (%s) failed: %s",
+ filename, rrd_get_error ());
+ }
+
+ return (status);
+} /* int srrd_update */
+/* #endif HAVE_THREADSAFE_LIBRRD */
+
+#else /* !HAVE_THREADSAFE_LIBRRD */
+static int srrd_create (char *filename, unsigned long pdp_step, time_t last_up,
+ int argc, char **argv)
+{
+ int status;
+
+ int new_argc;
+ char **new_argv;
+
+ char pdp_step_str[16];
+ char last_up_str[16];
+
+ new_argc = 6 + argc;
+ new_argv = (char **) malloc ((new_argc + 1) * sizeof (char *));
+ if (new_argv == NULL)
+ {
+ ERROR ("rrdtool plugin: malloc failed.");
+ return (-1);
+ }
+
+ if (last_up == 0)
+ last_up = time (NULL) - 10;
+
+ snprintf (pdp_step_str, sizeof (pdp_step_str), "%lu", pdp_step);
+ pdp_step_str[sizeof (pdp_step_str) - 1] = '\0';
+ snprintf (last_up_str, sizeof (last_up_str), "%u", (unsigned int) last_up);
+ last_up_str[sizeof (last_up_str) - 1] = '\0';
+
+ new_argv[0] = "create";
+ new_argv[1] = filename;
+ new_argv[2] = "-s";
+ new_argv[3] = pdp_step_str;
+ new_argv[4] = "-b";
+ new_argv[5] = last_up_str;
+
+ memcpy (new_argv + 6, argv, argc * sizeof (char *));
+ new_argv[new_argc] = NULL;
+
+ pthread_mutex_lock (&librrd_lock);
+ optind = 0; /* bug in librrd? */
+ rrd_clear_error ();
+
+ status = rrd_create (new_argc, new_argv);
+ pthread_mutex_unlock (&librrd_lock);
+
+ if (status != 0)
+ {
+ WARNING ("rrdtool plugin: rrd_create (%s) failed: %s",
+ filename, rrd_get_error ());
+ }
+
+ sfree (new_argv);
+
+ return (status);
+} /* int srrd_create */
+
+static int srrd_update (char *filename, char *template, int argc, char **argv)
+{
+ int status;
+
+ int new_argc;
+ char **new_argv;
+
+ assert (template == NULL);
+
+ new_argc = 2 + argc;
+ new_argv = (char **) malloc ((new_argc + 1) * sizeof (char *));
+ if (new_argv == NULL)
+ {
+ ERROR ("rrdtool plugin: malloc failed.");
+ return (-1);
+ }
+
+ new_argv[0] = "update";
+ new_argv[1] = filename;
+
+ memcpy (new_argv + 2, argv, argc * sizeof (char *));
+ new_argv[new_argc] = NULL;
+
+ pthread_mutex_lock (&librrd_lock);
+ optind = 0; /* bug in librrd? */
+ rrd_clear_error ();
+
+ status = rrd_update (new_argc, new_argv);
+ pthread_mutex_unlock (&librrd_lock);
+
+ if (status != 0)
+ {
+ WARNING ("rrdtool plugin: rrd_update_r failed: %s: %s",
+ argv[1], rrd_get_error ());
+ }
+
+ sfree (new_argv);
+
+ return (status);
+} /* int srrd_update */
+#endif /* !HAVE_THREADSAFE_LIBRRD */
+
+static int rrd_create_file (char *filename, const data_set_t *ds, const value_list_t *vl)
{
char **argv;
int argc;
int rra_num;
char **ds_def;
int ds_num;
- int i, j;
int status = 0;
if (check_create_dir (filename))
return (-1);
- if ((rra_num = rra_get (&rra_def)) < 1)
+ if ((rra_num = rra_get (&rra_def, vl)) < 1)
{
- syslog (LOG_ERR, "rrd_create_file failed: Could not calculate RRAs");
+ ERROR ("rrd_create_file failed: Could not calculate RRAs");
return (-1);
}
- if ((ds_num = ds_get (&ds_def, ds)) < 1)
+ if ((ds_num = ds_get (&ds_def, ds, vl)) < 1)
{
- syslog (LOG_ERR, "rrd_create_file failed: Could not calculate DSes");
+ ERROR ("rrd_create_file failed: Could not calculate DSes");
return (-1);
}
- argc = ds_num + rra_num + 4;
+ argc = ds_num + rra_num;
if ((argv = (char **) malloc (sizeof (char *) * (argc + 1))) == NULL)
{
- syslog (LOG_ERR, "rrd_create failed: %s", strerror (errno));
+ char errbuf[1024];
+ ERROR ("rrd_create failed: %s",
+ sstrerror (errno, errbuf, sizeof (errbuf)));
return (-1);
}
- argv[0] = "create";
- argv[1] = filename;
- argv[2] = "-s";
- argv[3] = COLLECTD_STEP;
-
- j = 4;
- for (i = 0; i < ds_num; i++)
- argv[j++] = ds_def[i];
- for (i = 0; i < rra_num; i++)
- argv[j++] = rra_def[i];
- argv[j] = NULL;
+ memcpy (argv, ds_def, ds_num * sizeof (char *));
+ memcpy (argv + ds_num, rra_def, rra_num * sizeof (char *));
+ argv[ds_num + rra_num] = NULL;
- optind = 0; /* bug in librrd? */
- rrd_clear_error ();
- if (rrd_create (argc, argv) == -1)
- {
- syslog (LOG_ERR, "rrd_create failed: %s: %s", filename, rrd_get_error ());
- status = -1;
- }
+ assert (vl->time > 10);
+ status = srrd_create (filename,
+ (stepsize > 0) ? stepsize : vl->interval,
+ vl->time - 10,
+ argc, argv);
free (argv);
ds_free (ds_num, ds_def);
+ rra_free (rra_num, rra_def);
return (status);
}
int status;
int i;
- memset (buffer, '\0', sizeof (buffer_len));
+ memset (buffer, '\0', buffer_len);
status = snprintf (buffer, buffer_len, "%u", (unsigned int) vl->time);
if ((status < 1) || (status >= buffer_len))
int offset = 0;
int status;
+ if (datadir != NULL)
+ {
+ status = snprintf (buffer + offset, buffer_len - offset,
+ "%s/", datadir);
+ if ((status < 1) || (status >= buffer_len - offset))
+ return (-1);
+ offset += status;
+ }
+
status = snprintf (buffer + offset, buffer_len - offset,
"%s/", vl->host);
if ((status < 1) || (status >= buffer_len - offset))
return (0);
} /* int value_list_to_filename */
-static rrd_cache_t *rrd_cache_insert (const char *filename,
- const char *value)
+static void *rrd_queue_thread (void *data)
{
- rrd_cache_t *rc = NULL;
- llentry_t *le = NULL;
-
- if (cache != NULL)
- {
- le = llist_search (cache, filename);
- if (le != NULL)
- rc = (rrd_cache_t *) le->value;
- }
-
- if (rc == NULL)
- {
- rc = (rrd_cache_t *) malloc (sizeof (rrd_cache_t));
- if (rc == NULL)
- return (NULL);
- rc->values_num = 0;
- rc->values = NULL;
- rc->first_value = 0;
- }
-
- rc->values = (char **) realloc ((void *) rc->values,
- (rc->values_num + 1) * sizeof (char *));
- if (rc->values == NULL)
+ while (42)
{
- syslog (LOG_ERR, "rrdtool plugin: realloc failed: %s",
- strerror (errno));
- free (rc);
- if (le != NULL)
+ rrd_queue_t *queue_entry;
+ rrd_cache_t *cache_entry;
+ char **values;
+ int values_num;
+ int i;
+
+ /* XXX: If you need to lock both, cache_lock and queue_lock, at
+ * the same time, ALWAYS lock `cache_lock' first! */
+
+ /* wait until an entry is available */
+ pthread_mutex_lock (&queue_lock);
+ while ((queue_head == NULL) && (do_shutdown == 0))
+ pthread_cond_wait (&queue_cond, &queue_lock);
+
+ /* We're in the shutdown phase */
+ if (queue_head == NULL)
{
- llist_remove (cache, le);
- llentry_destroy (le);
+ pthread_mutex_unlock (&queue_lock);
+ break;
}
- return (NULL);
- }
- rc->values[rc->values_num] = strdup (value);
- if (rc->values[rc->values_num] != NULL)
- rc->values_num++;
+ /* Dequeue the first entry */
+ queue_entry = queue_head;
+ if (queue_head == queue_tail)
+ queue_head = queue_tail = NULL;
+ else
+ queue_head = queue_head->next;
- if (rc->values_num == 1)
- rc->first_value = time (NULL);
+ /* Unlock the queue again */
+ pthread_mutex_unlock (&queue_lock);
- if ((cache != NULL) && (le == NULL))
- {
- le = llentry_create (filename, (void *) rc);
- if (le != NULL)
- llist_prepend (cache, le);
- }
+ /* We now need the cache lock so the entry isn't updated while
+ * we make a copy of it's values */
+ pthread_mutex_lock (&cache_lock);
- DBG ("rrd_cache_insert (%s, %s) = %p", filename, value, (void *) rc);
+ c_avl_get (cache, queue_entry->filename, (void *) &cache_entry);
- return (rc);
-} /* rrd_cache_t *rrd_cache_insert */
+ values = cache_entry->values;
+ values_num = cache_entry->values_num;
-static int rrd_write_cache_entry (const char *filename, rrd_cache_t *rc)
-{
- char **argv;
- int argc;
+ cache_entry->values = NULL;
+ cache_entry->values_num = 0;
+ cache_entry->flags = FLAG_NONE;
- char *fn;
- int status;
+ pthread_mutex_unlock (&cache_lock);
- argc = rc->values_num + 2;
- argv = (char **) malloc ((argc + 1) * sizeof (char *));
- if (argv == NULL)
- return (-1);
+ /* Write the values to the RRD-file */
+ srrd_update (queue_entry->filename, NULL, values_num, values);
+ DEBUG ("rrdtool plugin: queue thread: Wrote %i values to %s",
+ values_num, queue_entry->filename);
- fn = strdup (filename);
- if (fn == NULL)
- {
- free (argv);
- return (-1);
- }
+ for (i = 0; i < values_num; i++)
+ {
+ sfree (values[i]);
+ }
+ sfree (values);
+ sfree (queue_entry->filename);
+ sfree (queue_entry);
+ } /* while (42) */
- argv[0] = "update";
- argv[1] = fn;
- memcpy (argv + 2, rc->values, rc->values_num * sizeof (char *));
- argv[argc] = NULL;
+ pthread_mutex_lock (&cache_lock);
+ c_avl_destroy (cache);
+ cache = NULL;
+ pthread_mutex_unlock (&cache_lock);
- DBG ("rrd_update (argc = %i, argv = %p)", argc, (void *) argv);
+ pthread_exit ((void *) 0);
+ return ((void *) 0);
+} /* void *rrd_queue_thread */
- optind = 0; /* bug in librrd? */
- rrd_clear_error ();
- status = rrd_update (argc, argv);
-
- free (argv);
- free (fn);
+static int rrd_queue_cache_entry (const char *filename)
+{
+ rrd_queue_t *queue_entry;
- free (rc->values);
- rc->values = NULL;
- rc->values_num = 0;
+ queue_entry = (rrd_queue_t *) malloc (sizeof (rrd_queue_t));
+ if (queue_entry == NULL)
+ return (-1);
- if (status != 0)
+ queue_entry->filename = strdup (filename);
+ if (queue_entry->filename == NULL)
{
- syslog (LOG_WARNING, "rrd_update failed: %s: %s",
- filename, rrd_get_error ());
+ free (queue_entry);
return (-1);
}
+ queue_entry->next = NULL;
+
+ pthread_mutex_lock (&queue_lock);
+ if (queue_tail == NULL)
+ queue_head = queue_entry;
+ else
+ queue_tail->next = queue_entry;
+ queue_tail = queue_entry;
+ pthread_cond_signal (&queue_cond);
+ pthread_mutex_unlock (&queue_lock);
+
+ DEBUG ("rrdtool plugin: Put `%s' into the update queue", filename);
+
return (0);
-} /* int rrd_update_file */
+} /* int rrd_queue_cache_entry */
static void rrd_cache_flush (int timeout)
{
- llentry_t *le;
rrd_cache_t *rc;
time_t now;
- if (cache == NULL)
- return;
+ char **keys = NULL;
+ int keys_num = 0;
+
+ char *key;
+ c_avl_iterator_t *iter;
+ int i;
- DBG ("Flushing cache, timeout = %i", timeout);
+ DEBUG ("rrdtool plugin: Flushing cache, timeout = %i", timeout);
now = time (NULL);
- /* Remove empty entries */
- le = llist_head (cache);
- while (le != NULL)
+ /* Build a list of entries to be flushed */
+ iter = c_avl_get_iterator (cache);
+ while (c_avl_iterator_next (iter, (void *) &key, (void *) &rc) == 0)
+ {
+ if (rc->flags == FLAG_QUEUED)
+ continue;
+ else if ((now - rc->first_value) < timeout)
+ continue;
+ else if (rc->values_num > 0)
+ {
+ if (rrd_queue_cache_entry (key) == 0)
+ rc->flags = FLAG_QUEUED;
+ }
+ else /* ancient and no values -> waste of memory */
+ {
+ char **tmp = (char **) realloc ((void *) keys,
+ (keys_num + 1) * sizeof (char *));
+ if (tmp == NULL)
+ {
+ char errbuf[1024];
+ ERROR ("rrdtool plugin: "
+ "realloc failed: %s",
+ sstrerror (errno, errbuf,
+ sizeof (errbuf)));
+ c_avl_iterator_destroy (iter);
+ sfree (keys);
+ return;
+ }
+ keys = tmp;
+ keys[keys_num] = key;
+ keys_num++;
+ }
+ } /* while (c_avl_iterator_next) */
+ c_avl_iterator_destroy (iter);
+
+ for (i = 0; i < keys_num; i++)
{
- llentry_t *next = le->next;
- rc = (rrd_cache_t *) le->value;
- if (rc->values_num == 0)
+ if (c_avl_remove (cache, keys[i], (void *) &key, (void *) &rc) != 0)
{
- DBG ("Removing cache entry for `%s'", le->key);
- free (rc->values);
- free (rc);
- llist_remove (cache, le);
+ DEBUG ("rrdtool plugin: c_avl_remove (%s) failed.", keys[i]);
+ continue;
}
- le = next;
+
+ assert (rc->values == NULL);
+ assert (rc->values_num == 0);
+
+ sfree (rc);
+ sfree (key);
+ keys[i] = NULL;
+ } /* for (i = 0..keys_num) */
+
+ sfree (keys);
+
+ cache_flush_last = now;
+} /* void rrd_cache_flush */
+
+static int rrd_cache_insert (const char *filename,
+ const char *value, time_t value_time)
+{
+ rrd_cache_t *rc = NULL;
+ int new_rc = 0;
+ char **values_new;
+
+ pthread_mutex_lock (&cache_lock);
+
+ c_avl_get (cache, filename, (void *) &rc);
+
+ if (rc == NULL)
+ {
+ rc = (rrd_cache_t *) malloc (sizeof (rrd_cache_t));
+ if (rc == NULL)
+ return (-1);
+ rc->values_num = 0;
+ rc->values = NULL;
+ rc->first_value = 0;
+ rc->last_value = 0;
+ rc->flags = FLAG_NONE;
+ new_rc = 1;
+ }
+
+ if (rc->last_value >= value_time)
+ {
+ pthread_mutex_unlock (&cache_lock);
+ WARNING ("rrdtool plugin: (rc->last_value = %u) >= (value_time = %u)",
+ (unsigned int) rc->last_value,
+ (unsigned int) value_time);
+ return (-1);
}
- /* Write timed out entries */
- le = llist_head (cache);
- while (le != NULL)
+ values_new = (char **) realloc ((void *) rc->values,
+ (rc->values_num + 1) * sizeof (char *));
+ if (values_new == NULL)
{
- rc = (rrd_cache_t *) le->value;
- if ((now - rc->first_value) >= timeout)
- rrd_write_cache_entry (le->key, rc);
+ char errbuf[1024];
+ void *cache_key = NULL;
+
+ sstrerror (errno, errbuf, sizeof (errbuf));
+
+ c_avl_remove (cache, filename, &cache_key, NULL);
+ pthread_mutex_unlock (&cache_lock);
+
+ ERROR ("rrdtool plugin: realloc failed: %s", errbuf);
- le = le->next;
+ sfree (cache_key);
+ sfree (rc->values);
+ sfree (rc);
+ return (-1);
}
+ rc->values = values_new;
- cache_flush = now;
-} /* void rrd_cache_flush */
+ rc->values[rc->values_num] = strdup (value);
+ if (rc->values[rc->values_num] != NULL)
+ rc->values_num++;
+
+ if (rc->values_num == 1)
+ rc->first_value = value_time;
+ rc->last_value = value_time;
+
+ /* Insert if this is the first value */
+ if (new_rc == 1)
+ {
+ void *cache_key = strdup (filename);
+
+ if (cache_key == NULL)
+ {
+ char errbuf[1024];
+ sstrerror (errno, errbuf, sizeof (errbuf));
+
+ pthread_mutex_unlock (&cache_lock);
+
+ ERROR ("rrdtool plugin: strdup failed: %s", errbuf);
+
+ sfree (rc->values[0]);
+ sfree (rc->values);
+ sfree (rc);
+ return (-1);
+ }
+
+ c_avl_insert (cache, cache_key, rc);
+ }
+
+ DEBUG ("rrdtool plugin: rrd_cache_insert: file = %s; "
+ "values_num = %i; age = %u;",
+ filename, rc->values_num,
+ rc->last_value - rc->first_value);
+
+ if ((rc->last_value - rc->first_value) >= cache_timeout)
+ {
+ /* XXX: If you need to lock both, cache_lock and queue_lock, at
+ * the same time, ALWAYS lock `cache_lock' first! */
+ if (rc->flags != FLAG_QUEUED)
+ {
+ if (rrd_queue_cache_entry (filename) == 0)
+ rc->flags = FLAG_QUEUED;
+ }
+ else
+ {
+ DEBUG ("rrdtool plugin: `%s' is already queued.", filename);
+ }
+ }
+
+ if ((cache_timeout > 0) &&
+ ((time (NULL) - cache_flush_last) > cache_flush_timeout))
+ rrd_cache_flush (cache_flush_timeout);
+
+
+ pthread_mutex_unlock (&cache_lock);
+
+ return (0);
+} /* int rrd_cache_insert */
static int rrd_write (const data_set_t *ds, const value_list_t *vl)
{
struct stat statbuf;
char filename[512];
char values[512];
- rrd_cache_t *rc;
- time_t now;
+ int status;
if (value_list_to_filename (filename, sizeof (filename), ds, vl) != 0)
return (-1);
{
if (errno == ENOENT)
{
- if (rrd_create_file (filename, ds))
+ if (rrd_create_file (filename, ds, vl))
return (-1);
}
else
{
- syslog (LOG_ERR, "stat(%s) failed: %s",
- filename, strerror (errno));
+ char errbuf[1024];
+ ERROR ("stat(%s) failed: %s", filename,
+ sstrerror (errno, errbuf,
+ sizeof (errbuf)));
return (-1);
}
}
else if (!S_ISREG (statbuf.st_mode))
{
- syslog (LOG_ERR, "stat(%s): Not a regular file!",
+ ERROR ("stat(%s): Not a regular file!",
filename);
return (-1);
}
- rc = rrd_cache_insert (filename, values);
- if (rc == NULL)
- return (-1);
-
- if (cache == NULL)
- {
- rrd_write_cache_entry (filename, rc);
- free (rc->values);
- free (rc);
- return (0);
- }
-
- now = time (NULL);
-
- DBG ("age (%s) = %i", filename, now - rc->first_value);
-
- if ((now - rc->first_value) >= cache_timeout)
- rrd_write_cache_entry (filename, rc);
-
- if ((time (NULL) - cache_flush) >= cache_timeout)
- {
- rrd_cache_flush (cache_timeout);
- }
+ status = rrd_cache_insert (filename, values, vl->time);
- return (0);
-} /* int rrd_dispatch */
+ return (status);
+} /* int rrd_write */
-static int rrd_config (const char *key, const char *val)
+static int rrd_config (const char *key, const char *value)
{
if (strcasecmp ("CacheTimeout", key) == 0)
{
- int tmp = atoi (val);
+ int tmp = atoi (value);
if (tmp < 0)
{
fprintf (stderr, "rrdtool: `CacheTimeout' must "
}
cache_timeout = tmp;
}
+ else if (strcasecmp ("CacheFlush", key) == 0)
+ {
+ int tmp = atoi (value);
+ if (tmp < 0)
+ {
+ fprintf (stderr, "rrdtool: `CacheFlush' must "
+ "be greater than 0.\n");
+ return (1);
+ }
+ cache_flush_timeout = tmp;
+ }
+ else if (strcasecmp ("DataDir", key) == 0)
+ {
+ if (datadir != NULL)
+ free (datadir);
+ datadir = strdup (value);
+ if (datadir != NULL)
+ {
+ int len = strlen (datadir);
+ while ((len > 0) && (datadir[len - 1] == '/'))
+ {
+ len--;
+ datadir[len] = '\0';
+ }
+ if (len <= 0)
+ {
+ free (datadir);
+ datadir = NULL;
+ }
+ }
+ }
+ else if (strcasecmp ("StepSize", key) == 0)
+ {
+ stepsize = atoi (value);
+ if (stepsize < 0)
+ stepsize = 0;
+ }
+ else if (strcasecmp ("HeartBeat", key) == 0)
+ {
+ heartbeat = atoi (value);
+ if (heartbeat < 0)
+ heartbeat = 0;
+ }
+ else if (strcasecmp ("RRARows", key) == 0)
+ {
+ int tmp = atoi (value);
+ if (tmp <= 0)
+ {
+ fprintf (stderr, "rrdtool: `RRARows' must "
+ "be greater than 0.\n");
+ return (1);
+ }
+ rrarows = tmp;
+ }
+ else if (strcasecmp ("RRATimespan", key) == 0)
+ {
+ char *saveptr = NULL;
+ char *dummy;
+ char *ptr;
+ char *value_copy;
+ int *tmp_alloc;
+
+ value_copy = strdup (value);
+ if (value_copy == NULL)
+ return (1);
+
+ dummy = value_copy;
+ while ((ptr = strtok_r (dummy, ", \t", &saveptr)) != NULL)
+ {
+ dummy = NULL;
+
+ tmp_alloc = realloc (rra_timespans_custom,
+ sizeof (int) * (rra_timespans_custom_num + 1));
+ if (tmp_alloc == NULL)
+ {
+ fprintf (stderr, "rrdtool: realloc failed.\n");
+ free (value_copy);
+ return (1);
+ }
+ rra_timespans_custom = tmp_alloc;
+ rra_timespans_custom[rra_timespans_custom_num] = atoi (ptr);
+ if (rra_timespans_custom[rra_timespans_custom_num] != 0)
+ rra_timespans_custom_num++;
+ } /* while (strtok_r) */
+ free (value_copy);
+ }
+ else if (strcasecmp ("XFF", key) == 0)
+ {
+ double tmp = atof (value);
+ if ((tmp < 0.0) || (tmp >= 1.0))
+ {
+ fprintf (stderr, "rrdtool: `XFF' must "
+ "be in the range 0 to 1 (exclusive).");
+ return (1);
+ }
+ xff = tmp;
+ }
else
{
return (-1);
static int rrd_shutdown (void)
{
+ pthread_mutex_lock (&cache_lock);
rrd_cache_flush (-1);
+ pthread_mutex_unlock (&cache_lock);
+
+ pthread_mutex_lock (&queue_lock);
+ do_shutdown = 1;
+ pthread_cond_signal (&queue_cond);
+ pthread_mutex_unlock (&queue_lock);
+
+ /* Wait for all the values to be written to disk before returning. */
+ if (queue_thread != 0)
+ {
+ pthread_join (queue_thread, NULL);
+ queue_thread = 0;
+ DEBUG ("rrdtool plugin: queue_thread exited.");
+ }
return (0);
} /* int rrd_shutdown */
static int rrd_init (void)
{
+ int status;
+
+ if (stepsize < 0)
+ stepsize = 0;
+ if (heartbeat <= 0)
+ {
+ if (stepsize > 0)
+ heartbeat = 2 * stepsize;
+ else
+ heartbeat = 0;
+ }
+
+ if ((heartbeat > 0) && (heartbeat < interval_g))
+ WARNING ("rrdtool plugin: Your `heartbeat' is "
+ "smaller than your `interval'. This will "
+ "likely cause problems.");
+ else if ((stepsize > 0) && (stepsize < interval_g))
+ WARNING ("rrdtool plugin: Your `stepsize' is "
+ "smaller than your `interval'. This will "
+ "create needlessly big RRD-files.");
+
+ /* Set the cache up */
+ pthread_mutex_lock (&cache_lock);
+
+ cache = c_avl_create ((int (*) (const void *, const void *)) strcmp);
+ if (cache == NULL)
+ {
+ ERROR ("rrdtool plugin: c_avl_create failed.");
+ return (-1);
+ }
+
+ cache_flush_last = time (NULL);
if (cache_timeout < 2)
{
cache_timeout = 0;
+ cache_flush_timeout = 0;
}
- else
+ else if (cache_flush_timeout < cache_timeout)
+ cache_flush_timeout = 10 * cache_timeout;
+
+ pthread_mutex_unlock (&cache_lock);
+
+ status = pthread_create (&queue_thread, NULL, rrd_queue_thread, NULL);
+ if (status != 0)
{
- cache = llist_create ();
- cache_flush = time (NULL);
- plugin_register_shutdown ("rrdtool", rrd_shutdown);
+ ERROR ("rrdtool plugin: Cannot create queue-thread.");
+ return (-1);
}
+
+ DEBUG ("rrdtool plugin: rrd_init: datadir = %s; stepsize = %i;"
+ " heartbeat = %i; rrarows = %i; xff = %lf;",
+ (datadir == NULL) ? "(null)" : datadir,
+ stepsize, heartbeat, rrarows, xff);
+
return (0);
} /* int rrd_init */
config_keys, config_keys_num);
plugin_register_init ("rrdtool", rrd_init);
plugin_register_write ("rrdtool", rrd_write);
+ plugin_register_shutdown ("rrdtool", rrd_shutdown);
}