X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Frrdtool.c;h=b366a9c64166eeeae135f700b069e81578cac28a;hb=83077c18c3e78739c2d2d18debf99875944eaa72;hp=986c8b41bed0fd5d9ff3f83c96fdd2a34206a939;hpb=30220fc97cad1717a2a3ce3ad03286ef1e6e29f3;p=collectd.git diff --git a/src/rrdtool.c b/src/rrdtool.c index 986c8b41..b366a9c6 100644 --- a/src/rrdtool.c +++ b/src/rrdtool.c @@ -1,6 +1,8 @@ /** * collectd - src/rrdtool.c * Copyright (C) 2006-2008 Florian octo Forster + * Copyright (C) 2008-2008 Sebastian Harl + * Copyright (C) 2009 Mariusz Gronczewski * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the @@ -17,12 +19,15 @@ * * Authors: * Florian octo Forster + * Sebastian Harl + * Mariusz Gronczewski **/ #include "collectd.h" #include "plugin.h" #include "common.h" #include "utils_avltree.h" +#include "utils_rrdcreate.h" #include @@ -35,10 +40,11 @@ */ struct rrd_cache_s { - int values_num; - char **values; - time_t first_value; - time_t last_value; + int values_num; + char **values; + cdtime_t first_value; + cdtime_t last_value; + int64_t random_variation; enum { FLAG_NONE = 0x00, @@ -65,27 +71,6 @@ typedef struct rrd_queue_s rrd_queue_t; /* * Private variables */ -static int rra_timespans[] = -{ - 3600, - 86400, - 604800, - 2678400, - 31622400 -}; -static int rra_timespans_num = STATIC_ARRAY_SIZE (rra_timespans); - -static int *rra_timespans_custom = NULL; -static int rra_timespans_custom_num = 0; - -static char *rra_types[] = -{ - "AVERAGE", - "MIN", - "MAX" -}; -static int rra_types_num = STATIC_ARRAY_SIZE (rra_types); - static const char *config_keys[] = { "CacheTimeout", @@ -96,25 +81,36 @@ static const char *config_keys[] = "RRARows", "RRATimespan", "XFF", - "WritesPerSecond" + "WritesPerSecond", + "RandomTimeout" }; static int config_keys_num = STATIC_ARRAY_SIZE (config_keys); /* If datadir is zero, the daemon's basedir is used. If stepsize or heartbeat * is zero a default, depending on the `interval' member of the value list is * being used. */ -static char *datadir = NULL; -static int stepsize = 0; -static int heartbeat = 0; -static int rrarows = 1200; -static double xff = 0.1; -static double write_rate = 0.0; +static char *datadir = NULL; +static double write_rate = 0.0; +static rrdcreate_config_t rrdcreate_config = +{ + /* stepsize = */ 0, + /* heartbeat = */ 0, + /* rrarows = */ 1200, + /* xff = */ 0.1, + + /* timespans = */ NULL, + /* timespans_num = */ 0, + + /* consolidation_functions = */ NULL, + /* consolidation_functions_num = */ 0 +}; /* XXX: If you need to lock both, cache_lock and queue_lock, at the same time, * ALWAYS lock `cache_lock' first! */ -static int cache_timeout = 0; -static int cache_flush_timeout = 0; -static time_t cache_flush_last; +static cdtime_t cache_timeout = 0; +static cdtime_t cache_flush_timeout = 0; +static cdtime_t random_timeout = TIME_T_TO_CDTIME_T (1); +static cdtime_t cache_flush_last; static c_avl_tree_t *cache = NULL; static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER; @@ -122,7 +118,8 @@ static rrd_queue_t *queue_head = NULL; static rrd_queue_t *queue_tail = NULL; static rrd_queue_t *flushq_head = NULL; static rrd_queue_t *flushq_tail = NULL; -static pthread_t queue_thread = 0; +static pthread_t queue_thread; +static int queue_thread_running = 1; static pthread_mutex_t queue_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER; @@ -132,233 +129,7 @@ static pthread_mutex_t librrd_lock = PTHREAD_MUTEX_INITIALIZER; static int do_shutdown = 0; -/* * * * * * * * * * - * WARNING: Magic * - * * * * * * * * * */ - -static void rra_free (int rra_num, char **rra_def) -{ - int i; - - for (i = 0; i < rra_num; i++) - { - sfree (rra_def[i]); - } - sfree (rra_def); -} /* void rra_free */ - -static int rra_get (char ***ret, const value_list_t *vl) -{ - char **rra_def; - int rra_num; - - int *rts; - int rts_num; - - int rra_max; - - int span; - - int cdp_num; - int cdp_len; - int i, j; - - char buffer[64]; - - /* The stepsize we use here: If it is user-set, use it. If not, use the - * interval of the value-list. */ - int ss; - - if (rrarows <= 0) - { - *ret = NULL; - return (-1); - } - - ss = (stepsize > 0) ? stepsize : vl->interval; - if (ss <= 0) - { - *ret = NULL; - return (-1); - } - - /* Use the configured timespans or fall back to the built-in defaults */ - if (rra_timespans_custom_num != 0) - { - rts = rra_timespans_custom; - rts_num = rra_timespans_custom_num; - } - else - { - rts = rra_timespans; - rts_num = rra_timespans_num; - } - - rra_max = rts_num * rra_types_num; - - if ((rra_def = (char **) malloc ((rra_max + 1) * sizeof (char *))) == NULL) - return (-1); - memset (rra_def, '\0', (rra_max + 1) * sizeof (char *)); - rra_num = 0; - - cdp_len = 0; - for (i = 0; i < rts_num; i++) - { - span = rts[i]; - - if ((span / ss) < rrarows) - span = ss * rrarows; - - if (cdp_len == 0) - cdp_len = 1; - else - cdp_len = (int) floor (((double) span) - / ((double) (rrarows * ss))); - - cdp_num = (int) ceil (((double) span) - / ((double) (cdp_len * ss))); - - for (j = 0; j < rra_types_num; j++) - { - if (rra_num >= rra_max) - break; - - if (ssnprintf (buffer, sizeof (buffer), "RRA:%s:%3.1f:%u:%u", - rra_types[j], xff, - cdp_len, cdp_num) >= sizeof (buffer)) - { - ERROR ("rra_get: Buffer would have been truncated."); - continue; - } - - rra_def[rra_num++] = sstrdup (buffer); - } - } - -#if COLLECT_DEBUG - DEBUG ("rra_num = %i", rra_num); - for (i = 0; i < rra_num; i++) - DEBUG (" %s", rra_def[i]); -#endif - - *ret = rra_def; - return (rra_num); -} /* int rra_get */ - -static void ds_free (int ds_num, char **ds_def) -{ - int i; - - for (i = 0; i < ds_num; i++) - if (ds_def[i] != NULL) - free (ds_def[i]); - free (ds_def); -} - -static int ds_get (char ***ret, const data_set_t *ds, const value_list_t *vl) -{ - char **ds_def; - int ds_num; - - char min[32]; - char max[32]; - char buffer[128]; - - DEBUG ("ds->ds_num = %i", ds->ds_num); - - ds_def = (char **) malloc (ds->ds_num * sizeof (char *)); - if (ds_def == NULL) - { - char errbuf[1024]; - ERROR ("rrdtool plugin: malloc failed: %s", - sstrerror (errno, errbuf, sizeof (errbuf))); - return (-1); - } - memset (ds_def, '\0', ds->ds_num * sizeof (char *)); - - for (ds_num = 0; ds_num < ds->ds_num; ds_num++) - { - data_source_t *d = ds->ds + ds_num; - char *type; - int status; - - ds_def[ds_num] = NULL; - - if (d->type == DS_TYPE_COUNTER) - type = "COUNTER"; - else if (d->type == DS_TYPE_GAUGE) - type = "GAUGE"; - else - { - ERROR ("rrdtool plugin: Unknown DS type: %i", - d->type); - break; - } - - if (isnan (d->min)) - { - sstrncpy (min, "U", sizeof (min)); - } - else - ssnprintf (min, sizeof (min), "%lf", d->min); - - if (isnan (d->max)) - { - sstrncpy (max, "U", sizeof (max)); - } - else - ssnprintf (max, sizeof (max), "%lf", d->max); - - status = ssnprintf (buffer, sizeof (buffer), - "DS:%s:%s:%i:%s:%s", - d->name, type, - (heartbeat > 0) ? heartbeat : (2 * vl->interval), - min, max); - if ((status < 1) || (status >= sizeof (buffer))) - break; - - ds_def[ds_num] = sstrdup (buffer); - } /* for ds_num = 0 .. ds->ds_num */ - -#if COLLECT_DEBUG -{ - int i; - DEBUG ("ds_num = %i", ds_num); - for (i = 0; i < ds_num; i++) - DEBUG (" %s", ds_def[i]); -} -#endif - - if (ds_num != ds->ds_num) - { - ds_free (ds_num, ds_def); - return (-1); - } - - *ret = ds_def; - return (ds_num); -} - #if HAVE_THREADSAFE_LIBRRD -static int srrd_create (char *filename, unsigned long pdp_step, time_t last_up, - int argc, const char **argv) -{ - int status; - - optind = 0; /* bug in librrd? */ - rrd_clear_error (); - - status = rrd_create_r (filename, pdp_step, last_up, argc, (void *) argv); - - if (status != 0) - { - WARNING ("rrdtool plugin: rrd_create_r (%s) failed: %s", - filename, rrd_get_error ()); - } - - return (status); -} /* int srrd_create */ - static int srrd_update (char *filename, char *template, int argc, const char **argv) { @@ -380,59 +151,6 @@ static int srrd_update (char *filename, char *template, /* #endif HAVE_THREADSAFE_LIBRRD */ #else /* !HAVE_THREADSAFE_LIBRRD */ -static int srrd_create (char *filename, unsigned long pdp_step, time_t last_up, - int argc, const char **argv) -{ - int status; - - int new_argc; - char **new_argv; - - char pdp_step_str[16]; - char last_up_str[16]; - - new_argc = 6 + argc; - new_argv = (char **) malloc ((new_argc + 1) * sizeof (char *)); - if (new_argv == NULL) - { - ERROR ("rrdtool plugin: malloc failed."); - return (-1); - } - - if (last_up == 0) - last_up = time (NULL) - 10; - - ssnprintf (pdp_step_str, sizeof (pdp_step_str), "%lu", pdp_step); - ssnprintf (last_up_str, sizeof (last_up_str), "%u", (unsigned int) last_up); - - new_argv[0] = "create"; - new_argv[1] = filename; - new_argv[2] = "-s"; - new_argv[3] = pdp_step_str; - new_argv[4] = "-b"; - new_argv[5] = last_up_str; - - memcpy (new_argv + 6, argv, argc * sizeof (char *)); - new_argv[new_argc] = NULL; - - pthread_mutex_lock (&librrd_lock); - optind = 0; /* bug in librrd? */ - rrd_clear_error (); - - status = rrd_create (new_argc, new_argv); - pthread_mutex_unlock (&librrd_lock); - - if (status != 0) - { - WARNING ("rrdtool plugin: rrd_create (%s) failed: %s", - filename, rrd_get_error ()); - } - - sfree (new_argv); - - return (status); -} /* int srrd_create */ - static int srrd_update (char *filename, char *template, int argc, const char **argv) { @@ -476,68 +194,18 @@ static int srrd_update (char *filename, char *template, } /* int srrd_update */ #endif /* !HAVE_THREADSAFE_LIBRRD */ -static int rrd_create_file (char *filename, const data_set_t *ds, const value_list_t *vl) -{ - char **argv; - int argc; - char **rra_def; - int rra_num; - char **ds_def; - int ds_num; - int status = 0; - - if (check_create_dir (filename)) - return (-1); - - if ((rra_num = rra_get (&rra_def, vl)) < 1) - { - ERROR ("rrd_create_file failed: Could not calculate RRAs"); - return (-1); - } - - if ((ds_num = ds_get (&ds_def, ds, vl)) < 1) - { - ERROR ("rrd_create_file failed: Could not calculate DSes"); - return (-1); - } - - argc = ds_num + rra_num; - - if ((argv = (char **) malloc (sizeof (char *) * (argc + 1))) == NULL) - { - char errbuf[1024]; - ERROR ("rrd_create failed: %s", - sstrerror (errno, errbuf, sizeof (errbuf))); - return (-1); - } - - memcpy (argv, ds_def, ds_num * sizeof (char *)); - memcpy (argv + ds_num, rra_def, rra_num * sizeof (char *)); - argv[ds_num + rra_num] = NULL; - - assert (vl->time > 10); - status = srrd_create (filename, - (stepsize > 0) ? stepsize : vl->interval, - vl->time - 10, - argc, (const char **)argv); - - free (argv); - ds_free (ds_num, ds_def); - rra_free (rra_num, rra_def); - - return (status); -} - static int value_list_to_string (char *buffer, int buffer_len, const data_set_t *ds, const value_list_t *vl) { int offset; int status; + time_t tt; int i; memset (buffer, '\0', buffer_len); - status = ssnprintf (buffer, buffer_len, "%u", (unsigned int) vl->time); + tt = CDTIME_T_TO_TIME_T (vl->time); + status = ssnprintf (buffer, buffer_len, "%u", (unsigned int) tt); if ((status < 1) || (status >= buffer_len)) return (-1); offset = status; @@ -545,15 +213,23 @@ static int value_list_to_string (char *buffer, int buffer_len, for (i = 0; i < ds->ds_num; i++) { if ((ds->ds[i].type != DS_TYPE_COUNTER) - && (ds->ds[i].type != DS_TYPE_GAUGE)) + && (ds->ds[i].type != DS_TYPE_GAUGE) + && (ds->ds[i].type != DS_TYPE_DERIVE) + && (ds->ds[i].type != DS_TYPE_ABSOLUTE)) return (-1); if (ds->ds[i].type == DS_TYPE_COUNTER) status = ssnprintf (buffer + offset, buffer_len - offset, ":%llu", vl->values[i].counter); - else + else if (ds->ds[i].type == DS_TYPE_GAUGE) status = ssnprintf (buffer + offset, buffer_len - offset, ":%lf", vl->values[i].gauge); + else if (ds->ds[i].type == DS_TYPE_DERIVE) + status = ssnprintf (buffer + offset, buffer_len - offset, + ":%"PRIi64, vl->values[i].derive); + else /*if (ds->ds[i].type == DS_TYPE_ABSOLUTE) */ + status = ssnprintf (buffer + offset, buffer_len - offset, + ":%"PRIu64, vl->values[i].absolute); if ((status < 1) || (status >= (buffer_len - offset))) return (-1); @@ -565,7 +241,7 @@ static int value_list_to_string (char *buffer, int buffer_len, } /* int value_list_to_string */ static int value_list_to_filename (char *buffer, int buffer_len, - const data_set_t *ds, const value_list_t *vl) + const data_set_t __attribute__((unused)) *ds, const value_list_t *vl) { int offset = 0; int status; @@ -608,7 +284,7 @@ static int value_list_to_filename (char *buffer, int buffer_len, return (0); } /* int value_list_to_filename */ -static void *rrd_queue_thread (void *data) +static void *rrd_queue_thread (void __attribute__((unused)) *data) { struct timeval tv_next_update; struct timeval tv_now; @@ -629,7 +305,7 @@ static void *rrd_queue_thread (void *data) pthread_mutex_lock (&queue_lock); /* Wait for values to arrive */ - while (true) + while (42) { struct timespec ts_wait; @@ -653,10 +329,9 @@ static void *rrd_queue_thread (void *data) break; gettimeofday (&tv_now, /* timezone = */ NULL); - status = timeval_sub_timespec (&tv_next_update, &tv_now, - &ts_wait); + status = timeval_cmp (tv_next_update, tv_now, NULL); /* We're good to go */ - if (status != 0) + if (status <= 0) break; /* We're supposed to wait a bit with this update, so we'll @@ -669,7 +344,7 @@ static void *rrd_queue_thread (void *data) &ts_wait); if (status == ETIMEDOUT) break; - } /* while (true) */ + } /* while (42) */ /* XXX: If you need to lock both, cache_lock and queue_lock, at * the same time, ALWAYS lock `cache_lock' first! */ @@ -746,8 +421,9 @@ static void *rrd_queue_thread (void *data) /* Write the values to the RRD-file */ srrd_update (queue_entry->filename, NULL, values_num, (const char **)values); - DEBUG ("rrdtool plugin: queue thread: Wrote %i values to %s", - values_num, queue_entry->filename); + DEBUG ("rrdtool plugin: queue thread: Wrote %i value%s to %s", + values_num, (values_num == 1) ? "" : "s", + queue_entry->filename); for (i = 0; i < values_num; i++) { @@ -758,11 +434,6 @@ static void *rrd_queue_thread (void *data) sfree (queue_entry); } /* while (42) */ - pthread_mutex_lock (&cache_lock); - c_avl_destroy (cache); - cache = NULL; - pthread_mutex_unlock (&cache_lock); - pthread_exit ((void *) 0); return ((void *) 0); } /* void *rrd_queue_thread */ @@ -841,10 +512,11 @@ static int rrd_queue_dequeue (const char *filename, return (0); } /* int rrd_queue_dequeue */ -static void rrd_cache_flush (int timeout) +/* XXX: You must hold "cache_lock" when calling this function! */ +static void rrd_cache_flush (cdtime_t timeout) { rrd_cache_t *rc; - time_t now; + cdtime_t now; char **keys = NULL; int keys_num = 0; @@ -853,9 +525,11 @@ static void rrd_cache_flush (int timeout) c_avl_iterator_t *iter; int i; - DEBUG ("rrdtool plugin: Flushing cache, timeout = %i", timeout); + DEBUG ("rrdtool plugin: Flushing cache, timeout = %.3f", + CDTIME_T_TO_DOUBLE (timeout)); - now = time (NULL); + now = cdtime (); + timeout = TIME_T_TO_CDTIME_T (timeout); /* Build a list of entries to be flushed */ iter = c_avl_get_iterator (cache); @@ -863,7 +537,9 @@ static void rrd_cache_flush (int timeout) { if (rc->flags != FLAG_NONE) continue; - else if ((now - rc->first_value) < timeout) + /* timeout == 0 => flush everything */ + else if ((timeout != 0) + && ((now - rc->first_value) < timeout)) continue; else if (rc->values_num > 0) { @@ -916,10 +592,11 @@ static void rrd_cache_flush (int timeout) cache_flush_last = now; } /* void rrd_cache_flush */ -static int rrd_cache_flush_identifier (int timeout, const char *identifier) +static int rrd_cache_flush_identifier (cdtime_t timeout, + const char *identifier) { rrd_cache_t *rc; - time_t now; + cdtime_t now; int status; char key[2048]; @@ -929,7 +606,7 @@ static int rrd_cache_flush_identifier (int timeout, const char *identifier) return (0); } - now = time (NULL); + now = cdtime (); if (datadir == NULL) snprintf (key, sizeof (key), "%s.rrd", @@ -942,7 +619,7 @@ static int rrd_cache_flush_identifier (int timeout, const char *identifier) status = c_avl_get (cache, key, (void *) &rc); if (status != 0) { - WARNING ("rrdtool plugin: rrd_cache_flush_identifier: " + INFO ("rrdtool plugin: rrd_cache_flush_identifier: " "c_avl_get (%s) failed. Does that file really exist?", key); return (status); @@ -973,8 +650,43 @@ static int rrd_cache_flush_identifier (int timeout, const char *identifier) return (status); } /* int rrd_cache_flush_identifier */ +static int64_t rrd_get_random_variation (void) +{ + double dbl_timeout; + cdtime_t ctm_timeout; + double rand_fact; + _Bool negative; + int64_t ret; + + if (random_timeout <= 0) + return (0); + + /* Assure that "cache_timeout + random_variation" is never negative. */ + if (random_timeout > cache_timeout) + { + INFO ("rrdtool plugin: Adjusting \"RandomTimeout\" to %.3f seconds.", + CDTIME_T_TO_DOUBLE (cache_timeout)); + random_timeout = cache_timeout; + } + + /* This seems a bit complicated, but "random_timeout" is likely larger than + * RAND_MAX, so we can't simply use modulo here. */ + dbl_timeout = CDTIME_T_TO_DOUBLE (random_timeout); + rand_fact = ((double) random ()) + / ((double) RAND_MAX); + negative = (_Bool) (random () % 2); + + ctm_timeout = DOUBLE_TO_CDTIME_T (dbl_timeout * rand_fact); + + ret = (int64_t) ctm_timeout; + if (negative) + ret *= -1; + + return (ret); +} /* int64_t rrd_get_random_variation */ + static int rrd_cache_insert (const char *filename, - const char *value, time_t value_time) + const char *value, cdtime_t value_time) { rrd_cache_t *rc = NULL; int new_rc = 0; @@ -995,13 +707,14 @@ static int rrd_cache_insert (const char *filename, if (rc == NULL) { - rc = (rrd_cache_t *) malloc (sizeof (rrd_cache_t)); + rc = malloc (sizeof (*rc)); if (rc == NULL) return (-1); rc->values_num = 0; rc->values = NULL; rc->first_value = 0; rc->last_value = 0; + rc->random_variation = rrd_get_random_variation (); rc->flags = FLAG_NONE; new_rc = 1; } @@ -1009,9 +722,9 @@ static int rrd_cache_insert (const char *filename, if (rc->last_value >= value_time) { pthread_mutex_unlock (&cache_lock); - WARNING ("rrdtool plugin: (rc->last_value = %u) >= (value_time = %u)", - (unsigned int) rc->last_value, - (unsigned int) value_time); + DEBUG ("rrdtool plugin: (rc->last_value = %"PRIu64") " + ">= (value_time = %"PRIu64")", + rc->last_value, value_time); return (-1); } @@ -1068,11 +781,11 @@ static int rrd_cache_insert (const char *filename, } DEBUG ("rrdtool plugin: rrd_cache_insert: file = %s; " - "values_num = %i; age = %lu;", + "values_num = %i; age = %.3f;", filename, rc->values_num, - (unsigned long)(rc->last_value - rc->first_value)); + CDTIME_T_TO_DOUBLE (rc->last_value - rc->first_value)); - if ((rc->last_value - rc->first_value) >= cache_timeout) + if ((rc->last_value - rc->first_value) >= (cache_timeout + rc->random_variation)) { /* XXX: If you need to lock both, cache_lock and queue_lock, at * the same time, ALWAYS lock `cache_lock' first! */ @@ -1083,6 +796,8 @@ static int rrd_cache_insert (const char *filename, status = rrd_queue_enqueue (filename, &queue_head, &queue_tail); if (status == 0) rc->flags = FLAG_QUEUED; + + rc->random_variation = rrd_get_random_variation (); } else { @@ -1091,7 +806,7 @@ static int rrd_cache_insert (const char *filename, } if ((cache_timeout > 0) && - ((time (NULL) - cache_flush_last) > cache_flush_timeout)) + ((cdtime () - cache_flush_last) > cache_flush_timeout)) rrd_cache_flush (cache_flush_timeout); pthread_mutex_unlock (&cache_lock); @@ -1099,6 +814,59 @@ static int rrd_cache_insert (const char *filename, return (0); } /* int rrd_cache_insert */ +static int rrd_cache_destroy (void) /* {{{ */ +{ + void *key = NULL; + void *value = NULL; + + int non_empty = 0; + + pthread_mutex_lock (&cache_lock); + + if (cache == NULL) + { + pthread_mutex_unlock (&cache_lock); + return (0); + } + + while (c_avl_pick (cache, &key, &value) == 0) + { + rrd_cache_t *rc; + int i; + + sfree (key); + key = NULL; + + rc = value; + value = NULL; + + if (rc->values_num > 0) + non_empty++; + + for (i = 0; i < rc->values_num; i++) + sfree (rc->values[i]); + sfree (rc->values); + sfree (rc); + } + + c_avl_destroy (cache); + cache = NULL; + + if (non_empty > 0) + { + INFO ("rrdtool plugin: %i cache %s had values when destroying the cache.", + non_empty, (non_empty == 1) ? "entry" : "entries"); + } + else + { + DEBUG ("rrdtool plugin: No values have been lost " + "when destroying the cache."); + } + + pthread_mutex_unlock (&cache_lock); + return (0); +} /* }}} int rrd_cache_destroy */ + static int rrd_compare_numeric (const void *a_ptr, const void *b_ptr) { int a = *((int *) a_ptr); @@ -1112,13 +880,17 @@ static int rrd_compare_numeric (const void *a_ptr, const void *b_ptr) return (0); } /* int rrd_compare_numeric */ -static int rrd_write (const data_set_t *ds, const value_list_t *vl) +static int rrd_write (const data_set_t *ds, const value_list_t *vl, + user_data_t __attribute__((unused)) *user_data) { struct stat statbuf; char filename[512]; char values[512]; int status; + if (do_shutdown) + return (0); + if (0 != strcmp (ds->type, vl->type)) { ERROR ("rrdtool plugin: DS type does not match value list type"); return -1; @@ -1134,7 +906,9 @@ static int rrd_write (const data_set_t *ds, const value_list_t *vl) { if (errno == ENOENT) { - if (rrd_create_file (filename, ds, vl)) + status = cu_rrd_create_file (filename, + ds, vl, &rrdcreate_config); + if (status != 0) return (-1); } else @@ -1158,7 +932,8 @@ static int rrd_write (const data_set_t *ds, const value_list_t *vl) return (status); } /* int rrd_write */ -static int rrd_flush (int timeout, const char *identifier) +static int rrd_flush (cdtime_t timeout, const char *identifier, + __attribute__((unused)) user_data_t *user_data) { pthread_mutex_lock (&cache_lock); @@ -1177,7 +952,7 @@ static int rrd_config (const char *key, const char *value) { if (strcasecmp ("CacheTimeout", key) == 0) { - int tmp = atoi (value); + double tmp = atof (value); if (tmp < 0) { fprintf (stderr, "rrdtool: `CacheTimeout' must " @@ -1186,7 +961,7 @@ static int rrd_config (const char *key, const char *value) "be greater than 0.\n"); return (1); } - cache_timeout = tmp; + cache_timeout = DOUBLE_TO_CDTIME_T (tmp); } else if (strcasecmp ("CacheFlush", key) == 0) { @@ -1223,15 +998,15 @@ static int rrd_config (const char *key, const char *value) } else if (strcasecmp ("StepSize", key) == 0) { - stepsize = atoi (value); - if (stepsize < 0) - stepsize = 0; + int temp = atoi (value); + if (temp > 0) + rrdcreate_config.stepsize = temp; } else if (strcasecmp ("HeartBeat", key) == 0) { - heartbeat = atoi (value); - if (heartbeat < 0) - heartbeat = 0; + int temp = atoi (value); + if (temp > 0) + rrdcreate_config.heartbeat = temp; } else if (strcasecmp ("RRARows", key) == 0) { @@ -1244,7 +1019,7 @@ static int rrd_config (const char *key, const char *value) "be greater than 0.\n"); return (1); } - rrarows = tmp; + rrdcreate_config.rrarows = tmp; } else if (strcasecmp ("RRATimespan", key) == 0) { @@ -1263,8 +1038,8 @@ static int rrd_config (const char *key, const char *value) { dummy = NULL; - tmp_alloc = realloc (rra_timespans_custom, - sizeof (int) * (rra_timespans_custom_num + 1)); + tmp_alloc = realloc (rrdcreate_config.timespans, + sizeof (int) * (rrdcreate_config.timespans_num + 1)); if (tmp_alloc == NULL) { fprintf (stderr, "rrdtool: realloc failed.\n"); @@ -1272,15 +1047,15 @@ static int rrd_config (const char *key, const char *value) free (value_copy); return (1); } - rra_timespans_custom = tmp_alloc; - rra_timespans_custom[rra_timespans_custom_num] = atoi (ptr); - if (rra_timespans_custom[rra_timespans_custom_num] != 0) - rra_timespans_custom_num++; + rrdcreate_config.timespans = tmp_alloc; + rrdcreate_config.timespans[rrdcreate_config.timespans_num] = atoi (ptr); + if (rrdcreate_config.timespans[rrdcreate_config.timespans_num] != 0) + rrdcreate_config.timespans_num++; } /* while (strtok_r) */ - qsort (/* base = */ rra_timespans_custom, - /* nmemb = */ rra_timespans_custom_num, - /* size = */ sizeof (rra_timespans_custom[0]), + qsort (/* base = */ rrdcreate_config.timespans, + /* nmemb = */ rrdcreate_config.timespans_num, + /* size = */ sizeof (rrdcreate_config.timespans[0]), /* compar = */ rrd_compare_numeric); free (value_copy); @@ -1296,7 +1071,7 @@ static int rrd_config (const char *key, const char *value) "be in the range 0 to 1 (exclusive)."); return (1); } - xff = tmp; + rrdcreate_config.xff = tmp; } else if (strcasecmp ("WritesPerSecond", key) == 0) { @@ -1317,6 +1092,23 @@ static int rrd_config (const char *key, const char *value) write_rate = 1.0 / wps; } } + else if (strcasecmp ("RandomTimeout", key) == 0) + { + double tmp; + + tmp = atof (value); + if (tmp < 0.0) + { + fprintf (stderr, "rrdtool: `RandomTimeout' must " + "be greater than or equal to zero.\n"); + ERROR ("rrdtool: `RandomTimeout' must " + "be greater then or equal to zero."); + } + else + { + random_timeout = DOUBLE_TO_CDTIME_T (tmp); + } + } else { return (-1); @@ -1327,7 +1119,7 @@ static int rrd_config (const char *key, const char *value) static int rrd_shutdown (void) { pthread_mutex_lock (&cache_lock); - rrd_cache_flush (-1); + rrd_cache_flush (0); pthread_mutex_unlock (&cache_lock); pthread_mutex_lock (&queue_lock); @@ -1335,31 +1127,52 @@ static int rrd_shutdown (void) pthread_cond_signal (&queue_cond); pthread_mutex_unlock (&queue_lock); + if ((queue_thread_running != 0) + && ((queue_head != NULL) || (flushq_head != NULL))) + { + INFO ("rrdtool plugin: Shutting down the queue thread. " + "This may take a while."); + } + else if (queue_thread_running != 0) + { + INFO ("rrdtool plugin: Shutting down the queue thread."); + } + /* Wait for all the values to be written to disk before returning. */ - if (queue_thread != 0) + if (queue_thread_running != 0) { pthread_join (queue_thread, NULL); - queue_thread = 0; + memset (&queue_thread, 0, sizeof (queue_thread)); + queue_thread_running = 0; DEBUG ("rrdtool plugin: queue_thread exited."); } + rrd_cache_destroy (); + return (0); } /* int rrd_shutdown */ static int rrd_init (void) { + static int init_once = 0; int status; - if (stepsize < 0) - stepsize = 0; - if (heartbeat <= 0) - heartbeat = 2 * stepsize; + if (init_once != 0) + return (0); + init_once = 1; + + if (rrdcreate_config.stepsize < 0) + rrdcreate_config.stepsize = 0; + if (rrdcreate_config.heartbeat <= 0) + rrdcreate_config.heartbeat = 2 * rrdcreate_config.stepsize; - if ((heartbeat > 0) && (heartbeat < interval_g)) + if ((rrdcreate_config.heartbeat > 0) + && (rrdcreate_config.heartbeat < CDTIME_T_TO_TIME_T (interval_g))) WARNING ("rrdtool plugin: Your `heartbeat' is " "smaller than your `interval'. This will " "likely cause problems."); - else if ((stepsize > 0) && (stepsize < interval_g)) + else if ((rrdcreate_config.stepsize > 0) + && (rrdcreate_config.stepsize < CDTIME_T_TO_TIME_T (interval_g))) WARNING ("rrdtool plugin: Your `stepsize' is " "smaller than your `interval'. This will " "create needlessly big RRD-files."); @@ -1374,10 +1187,9 @@ static int rrd_init (void) return (-1); } - cache_flush_last = time (NULL); - if (cache_timeout < 2) + cache_flush_last = cdtime (); + if (cache_timeout == 0) { - cache_timeout = 0; cache_flush_timeout = 0; } else if (cache_flush_timeout < cache_timeout) @@ -1385,17 +1197,22 @@ static int rrd_init (void) pthread_mutex_unlock (&cache_lock); - status = pthread_create (&queue_thread, NULL, rrd_queue_thread, NULL); + status = pthread_create (&queue_thread, /* attr = */ NULL, + rrd_queue_thread, /* args = */ NULL); if (status != 0) { ERROR ("rrdtool plugin: Cannot create queue-thread."); return (-1); } + queue_thread_running = 1; DEBUG ("rrdtool plugin: rrd_init: datadir = %s; stepsize = %i;" " heartbeat = %i; rrarows = %i; xff = %lf;", (datadir == NULL) ? "(null)" : datadir, - stepsize, heartbeat, rrarows, xff); + rrdcreate_config.stepsize, + rrdcreate_config.heartbeat, + rrdcreate_config.rrarows, + rrdcreate_config.xff); return (0); } /* int rrd_init */ @@ -1405,7 +1222,7 @@ void module_register (void) plugin_register_config ("rrdtool", rrd_config, config_keys, config_keys_num); plugin_register_init ("rrdtool", rrd_init); - plugin_register_write ("rrdtool", rrd_write); - plugin_register_flush ("rrdtool", rrd_flush); + plugin_register_write ("rrdtool", rrd_write, /* user_data = */ NULL); + plugin_register_flush ("rrdtool", rrd_flush, /* user_data = */ NULL); plugin_register_shutdown ("rrdtool", rrd_shutdown); }