X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Frrdtool.c;h=01fca30aad69751bf9a75c6aa39af2ffa568f335;hb=99ea6c890424d5045cd33db8daa7c28d887b2a88;hp=ea96290555670542dfdcd116f90006dc85cf1c50;hpb=5afa2f82d43e2a7952d8f347f6f40b5778c59db8;p=collectd.git diff --git a/src/rrdtool.c b/src/rrdtool.c index ea962905..01fca30a 100644 --- a/src/rrdtool.c +++ b/src/rrdtool.c @@ -1,6 +1,8 @@ /** * collectd - src/rrdtool.c - * Copyright (C) 2006,2007 Florian octo Forster + * Copyright (C) 2006-2008 Florian octo Forster + * Copyright (C) 2008-2008 Sebastian Harl + * Copyright (C) 2009 Mariusz Gronczewski * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the @@ -17,12 +19,17 @@ * * Authors: * Florian octo Forster + * Sebastian Harl + * Mariusz Gronczewski **/ #include "collectd.h" #include "plugin.h" #include "common.h" #include "utils_avltree.h" +#include "utils_rrdcreate.h" + +#include #if HAVE_PTHREAD_H # include @@ -37,33 +44,33 @@ struct rrd_cache_s char **values; time_t first_value; time_t last_value; + int random_variation; + enum + { + FLAG_NONE = 0x00, + FLAG_QUEUED = 0x01, + FLAG_FLUSHQ = 0x02 + } flags; }; typedef struct rrd_cache_s rrd_cache_t; -/* - * Private variables - */ -static int rra_timespans[] = +enum rrd_queue_dir_e { - 3600, - 86400, - 604800, - 2678400, - 31622400 + QUEUE_INSERT_FRONT, + QUEUE_INSERT_BACK }; -static int rra_timespans_num = STATIC_ARRAY_SIZE (rra_timespans); - -static int *rra_timespans_custom = NULL; -static int rra_timespans_custom_num = 0; +typedef enum rrd_queue_dir_e rrd_queue_dir_t; -static char *rra_types[] = +struct rrd_queue_s { - "AVERAGE", - "MIN", - "MAX" + char *filename; + struct rrd_queue_s *next; }; -static int rra_types_num = STATIC_ARRAY_SIZE (rra_types); +typedef struct rrd_queue_s rrd_queue_t; +/* + * Private variables + */ static const char *config_keys[] = { "CacheTimeout", @@ -73,286 +80,119 @@ static const char *config_keys[] = "HeartBeat", "RRARows", "RRATimespan", - "XFF" + "XFF", + "WritesPerSecond", + "RandomTimeout" }; static int config_keys_num = STATIC_ARRAY_SIZE (config_keys); -static char *datadir = NULL; -static int stepsize = 0; -static int heartbeat = 0; -static int rrarows = 1200; -static double xff = 0.1; +/* If datadir is zero, the daemon's basedir is used. If stepsize or heartbeat + * is zero a default, depending on the `interval' member of the value list is + * being used. */ +static char *datadir = NULL; +static double write_rate = 0.0; +static rrdcreate_config_t rrdcreate_config = +{ + /* stepsize = */ 0, + /* heartbeat = */ 0, + /* rrarows = */ 1200, + /* xff = */ 0.1, + + /* timespans = */ NULL, + /* timespans_num = */ 0, + + /* consolidation_functions = */ NULL, + /* consolidation_functions_num = */ 0 +}; +/* XXX: If you need to lock both, cache_lock and queue_lock, at the same time, + * ALWAYS lock `cache_lock' first! */ static int cache_timeout = 0; static int cache_flush_timeout = 0; +static int random_timeout = 1; static time_t cache_flush_last; -static avl_tree_t *cache = NULL; +static c_avl_tree_t *cache = NULL; static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER; -/* * * * * * * * * * - * WARNING: Magic * - * * * * * * * * * */ -static int rra_get (char ***ret) -{ - static char **rra_def = NULL; - static int rra_num = 0; - - int *rts; - int rts_num; - - int rra_max; - - int span; - - int cdp_num; - int cdp_len; - int i, j; - - char buffer[64]; - - if ((rra_num != 0) && (rra_def != NULL)) - { - *ret = rra_def; - return (rra_num); - } - - /* Use the configured timespans or fall back to the built-in defaults */ - if (rra_timespans_custom_num != 0) - { - rts = rra_timespans_custom; - rts_num = rra_timespans_custom_num; - } - else - { - rts = rra_timespans; - rts_num = rra_timespans_num; - } - - rra_max = rts_num * rra_types_num; - - if ((rra_def = (char **) malloc ((rra_max + 1) * sizeof (char *))) == NULL) - return (-1); - memset (rra_def, '\0', (rra_max + 1) * sizeof (char *)); - - if ((stepsize <= 0) || (rrarows <= 0)) - { - *ret = NULL; - return (-1); - } - - cdp_len = 0; - for (i = 0; i < rts_num; i++) - { - span = rts[i]; - - if ((span / stepsize) < rrarows) - continue; - - if (cdp_len == 0) - cdp_len = 1; - else - cdp_len = (int) floor (((double) span) - / ((double) (rrarows * stepsize))); - - cdp_num = (int) ceil (((double) span) - / ((double) (cdp_len * stepsize))); - - for (j = 0; j < rra_types_num; j++) - { - if (rra_num >= rra_max) - break; - - if (snprintf (buffer, sizeof (buffer), "RRA:%s:%3.1f:%u:%u", - rra_types[j], xff, - cdp_len, cdp_num) >= sizeof (buffer)) - { - ERROR ("rra_get: Buffer would have been truncated."); - continue; - } - - rra_def[rra_num++] = sstrdup (buffer); - } - } - -#if COLLECT_DEBUG - DEBUG ("rra_num = %i", rra_num); - for (i = 0; i < rra_num; i++) - DEBUG (" %s", rra_def[i]); +static rrd_queue_t *queue_head = NULL; +static rrd_queue_t *queue_tail = NULL; +static rrd_queue_t *flushq_head = NULL; +static rrd_queue_t *flushq_tail = NULL; +static pthread_t queue_thread; +static int queue_thread_running = 1; +static pthread_mutex_t queue_lock = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER; + +#if !HAVE_THREADSAFE_LIBRRD +static pthread_mutex_t librrd_lock = PTHREAD_MUTEX_INITIALIZER; #endif - *ret = rra_def; - return (rra_num); -} - -static void ds_free (int ds_num, char **ds_def) -{ - int i; - - for (i = 0; i < ds_num; i++) - if (ds_def[i] != NULL) - free (ds_def[i]); - free (ds_def); -} +static int do_shutdown = 0; -static int ds_get (char ***ret, const data_set_t *ds) +#if HAVE_THREADSAFE_LIBRRD +static int srrd_update (char *filename, char *template, + int argc, const char **argv) { - char **ds_def; - int ds_num; - - char min[32]; - char max[32]; - char buffer[128]; - - DEBUG ("ds->ds_num = %i", ds->ds_num); - - ds_def = (char **) malloc (ds->ds_num * sizeof (char *)); - if (ds_def == NULL) - { - char errbuf[1024]; - ERROR ("rrdtool plugin: malloc failed: %s", - sstrerror (errno, errbuf, sizeof (errbuf))); - return (-1); - } - memset (ds_def, '\0', ds->ds_num * sizeof (char *)); - - for (ds_num = 0; ds_num < ds->ds_num; ds_num++) - { - data_source_t *d = ds->ds + ds_num; - char *type; - int status; - - ds_def[ds_num] = NULL; - - if (d->type == DS_TYPE_COUNTER) - type = "COUNTER"; - else if (d->type == DS_TYPE_GAUGE) - type = "GAUGE"; - else - { - ERROR ("rrdtool plugin: Unknown DS type: %i", - d->type); - break; - } - - if (isnan (d->min)) - { - strcpy (min, "U"); - } - else - { - snprintf (min, sizeof (min), "%lf", d->min); - min[sizeof (min) - 1] = '\0'; - } - - if (isnan (d->max)) - { - strcpy (max, "U"); - } - else - { - snprintf (max, sizeof (max), "%lf", d->max); - max[sizeof (max) - 1] = '\0'; - } - - status = snprintf (buffer, sizeof (buffer), - "DS:%s:%s:%i:%s:%s", - d->name, type, heartbeat, - min, max); - if ((status < 1) || (status >= sizeof (buffer))) - break; + int status; - ds_def[ds_num] = sstrdup (buffer); - } /* for ds_num = 0 .. ds->ds_num */ + optind = 0; /* bug in librrd? */ + rrd_clear_error (); -#if COLLECT_DEBUG -{ - int i; - DEBUG ("ds_num = %i", ds_num); - for (i = 0; i < ds_num; i++) - DEBUG (" %s", ds_def[i]); -} -#endif + status = rrd_update_r (filename, template, argc, (void *) argv); - if (ds_num != ds->ds_num) + if (status != 0) { - ds_free (ds_num, ds_def); - return (-1); + WARNING ("rrdtool plugin: rrd_update_r (%s) failed: %s", + filename, rrd_get_error ()); } - *ret = ds_def; - return (ds_num); -} + return (status); +} /* int srrd_update */ +/* #endif HAVE_THREADSAFE_LIBRRD */ -static int rrd_create_file (char *filename, const data_set_t *ds) +#else /* !HAVE_THREADSAFE_LIBRRD */ +static int srrd_update (char *filename, char *template, + int argc, const char **argv) { - char **argv; - int argc; - char **rra_def; - int rra_num; - char **ds_def; - int ds_num; - int i, j; - char stepsize_str[16]; - int status = 0; - - if (check_create_dir (filename)) - return (-1); - - if ((rra_num = rra_get (&rra_def)) < 1) - { - ERROR ("rrd_create_file failed: Could not calculate RRAs"); - return (-1); - } - - if ((ds_num = ds_get (&ds_def, ds)) < 1) - { - ERROR ("rrd_create_file failed: Could not calculate DSes"); - return (-1); - } + int status; - argc = ds_num + rra_num + 4; + int new_argc; + char **new_argv; - if ((argv = (char **) malloc (sizeof (char *) * (argc + 1))) == NULL) - { - char errbuf[1024]; - ERROR ("rrd_create failed: %s", - sstrerror (errno, errbuf, sizeof (errbuf))); - return (-1); - } + assert (template == NULL); - status = snprintf (stepsize_str, sizeof (stepsize_str), - "%i", stepsize); - if ((status < 1) || (status >= sizeof (stepsize_str))) + new_argc = 2 + argc; + new_argv = (char **) malloc ((new_argc + 1) * sizeof (char *)); + if (new_argv == NULL) { - ERROR ("rrdtool plugin: snprintf failed."); + ERROR ("rrdtool plugin: malloc failed."); return (-1); } - argv[0] = "create"; - argv[1] = filename; - argv[2] = "-s"; - argv[3] = stepsize_str; + new_argv[0] = "update"; + new_argv[1] = filename; - j = 4; - for (i = 0; i < ds_num; i++) - argv[j++] = ds_def[i]; - for (i = 0; i < rra_num; i++) - argv[j++] = rra_def[i]; - argv[j] = NULL; + memcpy (new_argv + 2, argv, argc * sizeof (char *)); + new_argv[new_argc] = NULL; + pthread_mutex_lock (&librrd_lock); optind = 0; /* bug in librrd? */ rrd_clear_error (); - if (rrd_create (argc, argv) == -1) + + status = rrd_update (new_argc, new_argv); + pthread_mutex_unlock (&librrd_lock); + + if (status != 0) { - ERROR ("rrd_create failed: %s: %s", filename, rrd_get_error ()); - status = -1; + WARNING ("rrdtool plugin: rrd_update_r failed: %s: %s", + argv[1], rrd_get_error ()); } - free (argv); - ds_free (ds_num, ds_def); + sfree (new_argv); return (status); -} +} /* int srrd_update */ +#endif /* !HAVE_THREADSAFE_LIBRRD */ static int value_list_to_string (char *buffer, int buffer_len, const data_set_t *ds, const value_list_t *vl) @@ -361,9 +201,9 @@ static int value_list_to_string (char *buffer, int buffer_len, int status; int i; - memset (buffer, '\0', sizeof (buffer_len)); + memset (buffer, '\0', buffer_len); - status = snprintf (buffer, buffer_len, "%u", (unsigned int) vl->time); + status = ssnprintf (buffer, buffer_len, "%u", (unsigned int) vl->time); if ((status < 1) || (status >= buffer_len)) return (-1); offset = status; @@ -371,15 +211,23 @@ static int value_list_to_string (char *buffer, int buffer_len, for (i = 0; i < ds->ds_num; i++) { if ((ds->ds[i].type != DS_TYPE_COUNTER) - && (ds->ds[i].type != DS_TYPE_GAUGE)) + && (ds->ds[i].type != DS_TYPE_GAUGE) + && (ds->ds[i].type != DS_TYPE_DERIVE) + && (ds->ds[i].type != DS_TYPE_ABSOLUTE)) return (-1); if (ds->ds[i].type == DS_TYPE_COUNTER) - status = snprintf (buffer + offset, buffer_len - offset, + status = ssnprintf (buffer + offset, buffer_len - offset, ":%llu", vl->values[i].counter); - else - status = snprintf (buffer + offset, buffer_len - offset, + else if (ds->ds[i].type == DS_TYPE_GAUGE) + status = ssnprintf (buffer + offset, buffer_len - offset, ":%lf", vl->values[i].gauge); + else if (ds->ds[i].type == DS_TYPE_DERIVE) + status = ssnprintf (buffer + offset, buffer_len - offset, + ":%"PRIi64, vl->values[i].derive); + else /*if (ds->ds[i].type == DS_TYPE_ABSOLUTE) */ + status = ssnprintf (buffer + offset, buffer_len - offset, + ":%"PRIu64, vl->values[i].absolute); if ((status < 1) || (status >= (buffer_len - offset))) return (-1); @@ -391,42 +239,42 @@ static int value_list_to_string (char *buffer, int buffer_len, } /* int value_list_to_string */ static int value_list_to_filename (char *buffer, int buffer_len, - const data_set_t *ds, const value_list_t *vl) + const data_set_t __attribute__((unused)) *ds, const value_list_t *vl) { int offset = 0; int status; if (datadir != NULL) { - status = snprintf (buffer + offset, buffer_len - offset, + status = ssnprintf (buffer + offset, buffer_len - offset, "%s/", datadir); if ((status < 1) || (status >= buffer_len - offset)) return (-1); offset += status; } - status = snprintf (buffer + offset, buffer_len - offset, + status = ssnprintf (buffer + offset, buffer_len - offset, "%s/", vl->host); if ((status < 1) || (status >= buffer_len - offset)) return (-1); offset += status; if (strlen (vl->plugin_instance) > 0) - status = snprintf (buffer + offset, buffer_len - offset, + status = ssnprintf (buffer + offset, buffer_len - offset, "%s-%s/", vl->plugin, vl->plugin_instance); else - status = snprintf (buffer + offset, buffer_len - offset, + status = ssnprintf (buffer + offset, buffer_len - offset, "%s/", vl->plugin); if ((status < 1) || (status >= buffer_len - offset)) return (-1); offset += status; if (strlen (vl->type_instance) > 0) - status = snprintf (buffer + offset, buffer_len - offset, - "%s-%s.rrd", ds->type, vl->type_instance); + status = ssnprintf (buffer + offset, buffer_len - offset, + "%s-%s.rrd", vl->type, vl->type_instance); else - status = snprintf (buffer + offset, buffer_len - offset, - "%s.rrd", ds->type); + status = ssnprintf (buffer + offset, buffer_len - offset, + "%s.rrd", vl->type); if ((status < 1) || (status >= buffer_len - offset)) return (-1); offset += status; @@ -434,138 +282,233 @@ static int value_list_to_filename (char *buffer, int buffer_len, return (0); } /* int value_list_to_filename */ -static rrd_cache_t *rrd_cache_insert (const char *filename, - const char *value, time_t value_time) +static void *rrd_queue_thread (void __attribute__((unused)) *data) { - rrd_cache_t *rc = NULL; - int new_rc = 0; + struct timeval tv_next_update; + struct timeval tv_now; - if (cache != NULL) - avl_get (cache, filename, (void *) &rc); + gettimeofday (&tv_next_update, /* timezone = */ NULL); - if (rc == NULL) + while (42) { - rc = (rrd_cache_t *) malloc (sizeof (rrd_cache_t)); - if (rc == NULL) - return (NULL); - rc->values_num = 0; - rc->values = NULL; - rc->first_value = 0; - rc->last_value = 0; - new_rc = 1; - } + rrd_queue_t *queue_entry; + rrd_cache_t *cache_entry; + char **values; + int values_num; + int status; + int i; + + values = NULL; + values_num = 0; + + pthread_mutex_lock (&queue_lock); + /* Wait for values to arrive */ + while (true) + { + struct timespec ts_wait; + + while ((flushq_head == NULL) && (queue_head == NULL) + && (do_shutdown == 0)) + pthread_cond_wait (&queue_cond, &queue_lock); + + if ((flushq_head == NULL) && (queue_head == NULL)) + break; + + /* Don't delay if there's something to flush */ + if (flushq_head != NULL) + break; + + /* Don't delay if we're shutting down */ + if (do_shutdown != 0) + break; + + /* Don't delay if no delay was configured. */ + if (write_rate <= 0.0) + break; + + gettimeofday (&tv_now, /* timezone = */ NULL); + status = timeval_cmp (tv_next_update, tv_now, NULL); + /* We're good to go */ + if (status <= 0) + break; + + /* We're supposed to wait a bit with this update, so we'll + * wait for the next addition to the queue or to the end of + * the wait period - whichever comes first. */ + ts_wait.tv_sec = tv_next_update.tv_sec; + ts_wait.tv_nsec = 1000 * tv_next_update.tv_usec; + + status = pthread_cond_timedwait (&queue_cond, &queue_lock, + &ts_wait); + if (status == ETIMEDOUT) + break; + } /* while (true) */ + + /* XXX: If you need to lock both, cache_lock and queue_lock, at + * the same time, ALWAYS lock `cache_lock' first! */ + + /* We're in the shutdown phase */ + if ((flushq_head == NULL) && (queue_head == NULL)) + { + pthread_mutex_unlock (&queue_lock); + break; + } + + if (flushq_head != NULL) + { + /* Dequeue the first flush entry */ + queue_entry = flushq_head; + if (flushq_head == flushq_tail) + flushq_head = flushq_tail = NULL; + else + flushq_head = flushq_head->next; + } + else /* if (queue_head != NULL) */ + { + /* Dequeue the first regular entry */ + queue_entry = queue_head; + if (queue_head == queue_tail) + queue_head = queue_tail = NULL; + else + queue_head = queue_head->next; + } + + /* Unlock the queue again */ + pthread_mutex_unlock (&queue_lock); + + /* We now need the cache lock so the entry isn't updated while + * we make a copy of it's values */ + pthread_mutex_lock (&cache_lock); + + status = c_avl_get (cache, queue_entry->filename, + (void *) &cache_entry); + + if (status == 0) + { + values = cache_entry->values; + values_num = cache_entry->values_num; - if (rc->last_value >= value_time) - { - WARNING ("rrdtool plugin: (rc->last_value = %u) >= (value_time = %u)", - (unsigned int) rc->last_value, - (unsigned int) value_time); - return (NULL); - } + cache_entry->values = NULL; + cache_entry->values_num = 0; + cache_entry->flags = FLAG_NONE; + } - rc->values = (char **) realloc ((void *) rc->values, - (rc->values_num + 1) * sizeof (char *)); - if (rc->values == NULL) - { - char errbuf[1024]; - ERROR ("rrdtool plugin: realloc failed: %s", - sstrerror (errno, errbuf, sizeof (errbuf))); - if (cache != NULL) + pthread_mutex_unlock (&cache_lock); + + if (status != 0) { - void *cache_key = NULL; - avl_remove (cache, filename, &cache_key, NULL); - sfree (cache_key); + sfree (queue_entry->filename); + sfree (queue_entry); + continue; } - free (rc); - return (NULL); - } - rc->values[rc->values_num] = strdup (value); - if (rc->values[rc->values_num] != NULL) - rc->values_num++; + /* Update `tv_next_update' */ + if (write_rate > 0.0) + { + gettimeofday (&tv_now, /* timezone = */ NULL); + tv_next_update.tv_sec = tv_now.tv_sec; + tv_next_update.tv_usec = tv_now.tv_usec + + ((suseconds_t) (1000000 * write_rate)); + while (tv_next_update.tv_usec > 1000000) + { + tv_next_update.tv_sec++; + tv_next_update.tv_usec -= 1000000; + } + } + + /* Write the values to the RRD-file */ + srrd_update (queue_entry->filename, NULL, + values_num, (const char **)values); + DEBUG ("rrdtool plugin: queue thread: Wrote %i value%s to %s", + values_num, (values_num == 1) ? "" : "s", + queue_entry->filename); + + for (i = 0; i < values_num; i++) + { + sfree (values[i]); + } + sfree (values); + sfree (queue_entry->filename); + sfree (queue_entry); + } /* while (42) */ - if (rc->values_num == 1) - rc->first_value = value_time; - rc->last_value = value_time; + pthread_exit ((void *) 0); + return ((void *) 0); +} /* void *rrd_queue_thread */ - /* Insert if this is the first value */ - if ((cache != NULL) && (new_rc == 1)) - { - void *cache_key = strdup (filename); +static int rrd_queue_enqueue (const char *filename, + rrd_queue_t **head, rrd_queue_t **tail) +{ + rrd_queue_t *queue_entry; - if (cache_key == NULL) - { - char errbuf[1024]; - ERROR ("rrdtool plugin: strdup failed: %s", - sstrerror (errno, errbuf, sizeof (errbuf))); - sfree (rc->values[0]); - sfree (rc->values); - sfree (rc); - return (NULL); - } + queue_entry = (rrd_queue_t *) malloc (sizeof (rrd_queue_t)); + if (queue_entry == NULL) + return (-1); - avl_insert (cache, cache_key, rc); - } + queue_entry->filename = strdup (filename); + if (queue_entry->filename == NULL) + { + free (queue_entry); + return (-1); + } - DEBUG ("rrd_cache_insert (%s, %s, %u) = %p", filename, value, - (unsigned int) value_time, (void *) rc); + queue_entry->next = NULL; - return (rc); -} /* rrd_cache_t *rrd_cache_insert */ + pthread_mutex_lock (&queue_lock); -static int rrd_write_cache_entry (const char *filename, rrd_cache_t *rc) -{ - char **argv; - int argc; + if (*tail == NULL) + *head = queue_entry; + else + (*tail)->next = queue_entry; + *tail = queue_entry; - char *fn; - int status; + pthread_cond_signal (&queue_cond); + pthread_mutex_unlock (&queue_lock); - int i; + return (0); +} /* int rrd_queue_enqueue */ - if (rc->values_num < 1) - return (0); +static int rrd_queue_dequeue (const char *filename, + rrd_queue_t **head, rrd_queue_t **tail) +{ + rrd_queue_t *this; + rrd_queue_t *prev; - argc = rc->values_num + 2; - argv = (char **) malloc ((argc + 1) * sizeof (char *)); - if (argv == NULL) - return (-1); + pthread_mutex_lock (&queue_lock); - fn = strdup (filename); - if (fn == NULL) - { - free (argv); - return (-1); - } + prev = NULL; + this = *head; - argv[0] = "update"; - argv[1] = fn; - memcpy (argv + 2, rc->values, rc->values_num * sizeof (char *)); - argv[argc] = NULL; + while (this != NULL) + { + if (strcmp (this->filename, filename) == 0) + break; + + prev = this; + this = this->next; + } - DEBUG ("rrd_update (argc = %i, argv = %p)", argc, (void *) argv); + if (this == NULL) + { + pthread_mutex_unlock (&queue_lock); + return (-1); + } - optind = 0; /* bug in librrd? */ - rrd_clear_error (); - status = rrd_update (argc, argv); - if (status != 0) - { - WARNING ("rrd_update failed: %s: %s", - filename, rrd_get_error ()); - status = -1; - } + if (prev == NULL) + *head = this->next; + else + prev->next = this->next; - free (argv); - free (fn); - /* Free the value list of `rc' */ - for (i = 0; i < rc->values_num; i++) - free (rc->values[i]); - free (rc->values); - rc->values = NULL; - rc->values_num = 0; + if (this->next == NULL) + *tail = prev; - return (status); -} /* int rrd_write_cache_entry */ + pthread_mutex_unlock (&queue_lock); + + sfree (this->filename); + sfree (this); + + return (0); +} /* int rrd_queue_dequeue */ static void rrd_cache_flush (int timeout) { @@ -576,69 +519,349 @@ static void rrd_cache_flush (int timeout) int keys_num = 0; char *key; - avl_iterator_t *iter; + c_avl_iterator_t *iter; int i; - if (cache == NULL) - return; - - DEBUG ("Flushing cache, timeout = %i", timeout); + DEBUG ("rrdtool plugin: Flushing cache, timeout = %i", timeout); now = time (NULL); /* Build a list of entries to be flushed */ - iter = avl_get_iterator (cache); - while (avl_iterator_next (iter, (void *) &key, (void *) &rc) == 0) + iter = c_avl_get_iterator (cache); + while (c_avl_iterator_next (iter, (void *) &key, (void *) &rc) == 0) { - DEBUG ("key = %s; age = %i;", key, now - rc->first_value); - if ((now - rc->first_value) >= timeout) + if (rc->flags != FLAG_NONE) + continue; + else if ((now - rc->first_value) < timeout) + continue; + else if (rc->values_num > 0) { - keys = (char **) realloc ((void *) keys, + int status; + + status = rrd_queue_enqueue (key, &queue_head, &queue_tail); + if (status == 0) + rc->flags = FLAG_QUEUED; + } + else /* ancient and no values -> waste of memory */ + { + char **tmp = (char **) realloc ((void *) keys, (keys_num + 1) * sizeof (char *)); - if (keys == NULL) + if (tmp == NULL) { char errbuf[1024]; ERROR ("rrdtool plugin: " "realloc failed: %s", sstrerror (errno, errbuf, sizeof (errbuf))); - avl_iterator_destroy (iter); + c_avl_iterator_destroy (iter); + sfree (keys); return; } + keys = tmp; keys[keys_num] = key; keys_num++; } - } /* while (avl_iterator_next) */ - avl_iterator_destroy (iter); + } /* while (c_avl_iterator_next) */ + c_avl_iterator_destroy (iter); for (i = 0; i < keys_num; i++) { - if (avl_remove (cache, keys[i], (void *) &key, (void *) &rc) != 0) + if (c_avl_remove (cache, keys[i], (void *) &key, (void *) &rc) != 0) { - DEBUG ("avl_remove (%s) failed.", keys[i]); + DEBUG ("rrdtool plugin: c_avl_remove (%s) failed.", keys[i]); continue; } - rrd_write_cache_entry (keys[i], rc); - /* rc's value-list is free's by `rrd_write_cache_entry' */ + assert (rc->values == NULL); + assert (rc->values_num == 0); + sfree (rc); sfree (key); keys[i] = NULL; } /* for (i = 0..keys_num) */ - free (keys); - DEBUG ("Flushed %i value(s)", keys_num); + sfree (keys); cache_flush_last = now; } /* void rrd_cache_flush */ -static int rrd_write (const data_set_t *ds, const value_list_t *vl) +static int rrd_cache_flush_identifier (int timeout, const char *identifier) +{ + rrd_cache_t *rc; + time_t now; + int status; + char key[2048]; + + if (identifier == NULL) + { + rrd_cache_flush (timeout); + return (0); + } + + now = time (NULL); + + if (datadir == NULL) + snprintf (key, sizeof (key), "%s.rrd", + identifier); + else + snprintf (key, sizeof (key), "%s/%s.rrd", + datadir, identifier); + key[sizeof (key) - 1] = 0; + + status = c_avl_get (cache, key, (void *) &rc); + if (status != 0) + { + INFO ("rrdtool plugin: rrd_cache_flush_identifier: " + "c_avl_get (%s) failed. Does that file really exist?", + key); + return (status); + } + + if (rc->flags == FLAG_FLUSHQ) + { + status = 0; + } + else if (rc->flags == FLAG_QUEUED) + { + rrd_queue_dequeue (key, &queue_head, &queue_tail); + status = rrd_queue_enqueue (key, &flushq_head, &flushq_tail); + if (status == 0) + rc->flags = FLAG_FLUSHQ; + } + else if ((now - rc->first_value) < timeout) + { + status = 0; + } + else if (rc->values_num > 0) + { + status = rrd_queue_enqueue (key, &flushq_head, &flushq_tail); + if (status == 0) + rc->flags = FLAG_FLUSHQ; + } + + return (status); +} /* int rrd_cache_flush_identifier */ + +static int rrd_cache_insert (const char *filename, + const char *value, time_t value_time) +{ + rrd_cache_t *rc = NULL; + int new_rc = 0; + char **values_new; + + pthread_mutex_lock (&cache_lock); + + /* This shouldn't happen, but it did happen at least once, so we'll be + * careful. */ + if (cache == NULL) + { + pthread_mutex_unlock (&cache_lock); + WARNING ("rrdtool plugin: cache == NULL."); + return (-1); + } + + c_avl_get (cache, filename, (void *) &rc); + + if (rc == NULL) + { + rc = (rrd_cache_t *) malloc (sizeof (rrd_cache_t)); + if (rc == NULL) + return (-1); + rc->values_num = 0; + rc->values = NULL; + rc->first_value = 0; + rc->last_value = 0; + rc->random_variation = 0; + rc->flags = FLAG_NONE; + new_rc = 1; + } + + if (rc->last_value >= value_time) + { + pthread_mutex_unlock (&cache_lock); + WARNING ("rrdtool plugin: (rc->last_value = %u) >= (value_time = %u)", + (unsigned int) rc->last_value, + (unsigned int) value_time); + return (-1); + } + + values_new = (char **) realloc ((void *) rc->values, + (rc->values_num + 1) * sizeof (char *)); + if (values_new == NULL) + { + char errbuf[1024]; + void *cache_key = NULL; + + sstrerror (errno, errbuf, sizeof (errbuf)); + + c_avl_remove (cache, filename, &cache_key, NULL); + pthread_mutex_unlock (&cache_lock); + + ERROR ("rrdtool plugin: realloc failed: %s", errbuf); + + sfree (cache_key); + sfree (rc->values); + sfree (rc); + return (-1); + } + rc->values = values_new; + + rc->values[rc->values_num] = strdup (value); + if (rc->values[rc->values_num] != NULL) + rc->values_num++; + + if (rc->values_num == 1) + rc->first_value = value_time; + rc->last_value = value_time; + + /* Insert if this is the first value */ + if (new_rc == 1) + { + void *cache_key = strdup (filename); + + if (cache_key == NULL) + { + char errbuf[1024]; + sstrerror (errno, errbuf, sizeof (errbuf)); + + pthread_mutex_unlock (&cache_lock); + + ERROR ("rrdtool plugin: strdup failed: %s", errbuf); + + sfree (rc->values[0]); + sfree (rc->values); + sfree (rc); + return (-1); + } + + c_avl_insert (cache, cache_key, rc); + } + + DEBUG ("rrdtool plugin: rrd_cache_insert: file = %s; " + "values_num = %i; age = %lu;", + filename, rc->values_num, + (unsigned long)(rc->last_value - rc->first_value)); + + if ((rc->last_value + rc->random_variation - rc->first_value) >= cache_timeout) + { + /* XXX: If you need to lock both, cache_lock and queue_lock, at + * the same time, ALWAYS lock `cache_lock' first! */ + if (rc->flags == FLAG_NONE) + { + int status; + + status = rrd_queue_enqueue (filename, &queue_head, &queue_tail); + if (status == 0) + rc->flags = FLAG_QUEUED; + + /* Update the jitter value. Negative values are + * slightly preferred. */ + if (random_timeout > 0) + { + rc->random_variation = (rand () % (2 * random_timeout)) + - random_timeout; + } + else + { + rc->random_variation = 0; + } + } + else + { + DEBUG ("rrdtool plugin: `%s' is already queued.", filename); + } + } + + if ((cache_timeout > 0) && + ((time (NULL) - cache_flush_last) > cache_flush_timeout)) + rrd_cache_flush (cache_flush_timeout); + + pthread_mutex_unlock (&cache_lock); + + return (0); +} /* int rrd_cache_insert */ + +static int rrd_cache_destroy (void) /* {{{ */ +{ + void *key = NULL; + void *value = NULL; + + int non_empty = 0; + + pthread_mutex_lock (&cache_lock); + + if (cache == NULL) + { + pthread_mutex_unlock (&cache_lock); + return (0); + } + + while (c_avl_pick (cache, &key, &value) == 0) + { + rrd_cache_t *rc; + int i; + + sfree (key); + key = NULL; + + rc = value; + value = NULL; + + if (rc->values_num > 0) + non_empty++; + + for (i = 0; i < rc->values_num; i++) + sfree (rc->values[i]); + sfree (rc->values); + sfree (rc); + } + + c_avl_destroy (cache); + cache = NULL; + + if (non_empty > 0) + { + INFO ("rrdtool plugin: %i cache %s had values when destroying the cache.", + non_empty, (non_empty == 1) ? "entry" : "entries"); + } + else + { + DEBUG ("rrdtool plugin: No values have been lost " + "when destroying the cache."); + } + + pthread_mutex_unlock (&cache_lock); + return (0); +} /* }}} int rrd_cache_destroy */ + +static int rrd_compare_numeric (const void *a_ptr, const void *b_ptr) +{ + int a = *((int *) a_ptr); + int b = *((int *) b_ptr); + + if (a < b) + return (-1); + else if (a > b) + return (1); + else + return (0); +} /* int rrd_compare_numeric */ + +static int rrd_write (const data_set_t *ds, const value_list_t *vl, + user_data_t __attribute__((unused)) *user_data) { struct stat statbuf; char filename[512]; char values[512]; - rrd_cache_t *rc; - time_t now; + int status; + + if (do_shutdown) + return (0); + + if (0 != strcmp (ds->type, vl->type)) { + ERROR ("rrdtool plugin: DS type does not match value list type"); + return -1; + } if (value_list_to_filename (filename, sizeof (filename), ds, vl) != 0) return (-1); @@ -650,7 +873,9 @@ static int rrd_write (const data_set_t *ds, const value_list_t *vl) { if (errno == ENOENT) { - if (rrd_create_file (filename, ds)) + status = cu_rrd_create_file (filename, + ds, vl, &rrdcreate_config); + if (status != 0) return (-1); } else @@ -669,38 +894,26 @@ static int rrd_write (const data_set_t *ds, const value_list_t *vl) return (-1); } + status = rrd_cache_insert (filename, values, vl->time); + + return (status); +} /* int rrd_write */ + +static int rrd_flush (int timeout, const char *identifier, + user_data_t __attribute__((unused)) *user_data) +{ pthread_mutex_lock (&cache_lock); - rc = rrd_cache_insert (filename, values, vl->time); - if (rc == NULL) - { - pthread_mutex_unlock (&cache_lock); - return (-1); - } - if (cache == NULL) - { - rrd_write_cache_entry (filename, rc); - /* rc's value-list is free's by `rrd_write_cache_entry' */ - sfree (rc); + if (cache == NULL) { pthread_mutex_unlock (&cache_lock); return (0); } - now = time (NULL); - - DEBUG ("age (%s) = %i", filename, now - rc->first_value); - - /* `rc' is not free'd here, because we'll likely reuse it. If not, then - * the next flush will remove this entry. */ - if ((now - rc->first_value) >= cache_timeout) - rrd_write_cache_entry (filename, rc); - - if ((now - cache_flush_last) >= cache_flush_timeout) - rrd_cache_flush (cache_flush_timeout); + rrd_cache_flush_identifier (timeout, identifier); pthread_mutex_unlock (&cache_lock); return (0); -} /* int rrd_write */ +} /* int rrd_flush */ static int rrd_config (const char *key, const char *value) { @@ -711,6 +924,8 @@ static int rrd_config (const char *key, const char *value) { fprintf (stderr, "rrdtool: `CacheTimeout' must " "be greater than 0.\n"); + ERROR ("rrdtool: `CacheTimeout' must " + "be greater than 0.\n"); return (1); } cache_timeout = tmp; @@ -722,6 +937,8 @@ static int rrd_config (const char *key, const char *value) { fprintf (stderr, "rrdtool: `CacheFlush' must " "be greater than 0.\n"); + ERROR ("rrdtool: `CacheFlush' must " + "be greater than 0.\n"); return (1); } cache_flush_timeout = tmp; @@ -748,25 +965,15 @@ static int rrd_config (const char *key, const char *value) } else if (strcasecmp ("StepSize", key) == 0) { - int tmp = atoi (value); - if (tmp <= 0) - { - fprintf (stderr, "rrdtool: `StepSize' must " - "be greater than 0.\n"); - return (1); - } - stepsize = tmp; + int temp = atoi (value); + if (temp > 0) + rrdcreate_config.stepsize = temp; } else if (strcasecmp ("HeartBeat", key) == 0) { - int tmp = atoi (value); - if (tmp <= 0) - { - fprintf (stderr, "rrdtool: `HeartBeat' must " - "be greater than 0.\n"); - return (1); - } - heartbeat = tmp; + int temp = atoi (value); + if (temp > 0) + rrdcreate_config.heartbeat = temp; } else if (strcasecmp ("RRARows", key) == 0) { @@ -775,9 +982,11 @@ static int rrd_config (const char *key, const char *value) { fprintf (stderr, "rrdtool: `RRARows' must " "be greater than 0.\n"); + ERROR ("rrdtool: `RRARows' must " + "be greater than 0.\n"); return (1); } - rrarows = tmp; + rrdcreate_config.rrarows = tmp; } else if (strcasecmp ("RRATimespan", key) == 0) { @@ -796,19 +1005,26 @@ static int rrd_config (const char *key, const char *value) { dummy = NULL; - tmp_alloc = realloc (rra_timespans_custom, - sizeof (int) * (rra_timespans_custom_num + 1)); + tmp_alloc = realloc (rrdcreate_config.timespans, + sizeof (int) * (rrdcreate_config.timespans_num + 1)); if (tmp_alloc == NULL) { fprintf (stderr, "rrdtool: realloc failed.\n"); + ERROR ("rrdtool: realloc failed.\n"); free (value_copy); return (1); } - rra_timespans_custom = tmp_alloc; - rra_timespans_custom[rra_timespans_custom_num] = atoi (ptr); - if (rra_timespans_custom[rra_timespans_custom_num] != 0) - rra_timespans_custom_num++; + rrdcreate_config.timespans = tmp_alloc; + rrdcreate_config.timespans[rrdcreate_config.timespans_num] = atoi (ptr); + if (rrdcreate_config.timespans[rrdcreate_config.timespans_num] != 0) + rrdcreate_config.timespans_num++; } /* while (strtok_r) */ + + qsort (/* base = */ rrdcreate_config.timespans, + /* nmemb = */ rrdcreate_config.timespans_num, + /* size = */ sizeof (rrdcreate_config.timespans[0]), + /* compar = */ rrd_compare_numeric); + free (value_copy); } else if (strcasecmp ("XFF", key) == 0) @@ -818,9 +1034,47 @@ static int rrd_config (const char *key, const char *value) { fprintf (stderr, "rrdtool: `XFF' must " "be in the range 0 to 1 (exclusive)."); + ERROR ("rrdtool: `XFF' must " + "be in the range 0 to 1 (exclusive)."); return (1); } - xff = tmp; + rrdcreate_config.xff = tmp; + } + else if (strcasecmp ("WritesPerSecond", key) == 0) + { + double wps = atof (value); + + if (wps < 0.0) + { + fprintf (stderr, "rrdtool: `WritesPerSecond' must be " + "greater than or equal to zero."); + return (1); + } + else if (wps == 0.0) + { + write_rate = 0.0; + } + else + { + write_rate = 1.0 / wps; + } + } + else if (strcasecmp ("RandomTimeout", key) == 0) + { + int tmp; + + tmp = atoi (value); + if (tmp < 0) + { + fprintf (stderr, "rrdtool: `RandomTimeout' must " + "be greater than or equal to zero.\n"); + ERROR ("rrdtool: `RandomTimeout' must " + "be greater then or equal to zero."); + } + else + { + random_timeout = tmp; + } } else { @@ -833,51 +1087,100 @@ static int rrd_shutdown (void) { pthread_mutex_lock (&cache_lock); rrd_cache_flush (-1); - if (cache != NULL) - avl_destroy (cache); - cache = NULL; pthread_mutex_unlock (&cache_lock); + pthread_mutex_lock (&queue_lock); + do_shutdown = 1; + pthread_cond_signal (&queue_cond); + pthread_mutex_unlock (&queue_lock); + + if ((queue_thread_running != 0) + && ((queue_head != NULL) || (flushq_head != NULL))) + { + INFO ("rrdtool plugin: Shutting down the queue thread. " + "This may take a while."); + } + else if (queue_thread_running != 0) + { + INFO ("rrdtool plugin: Shutting down the queue thread."); + } + + /* Wait for all the values to be written to disk before returning. */ + if (queue_thread_running != 0) + { + pthread_join (queue_thread, NULL); + memset (&queue_thread, 0, sizeof (queue_thread)); + queue_thread_running = 0; + DEBUG ("rrdtool plugin: queue_thread exited."); + } + + rrd_cache_destroy (); + return (0); } /* int rrd_shutdown */ static int rrd_init (void) { - if (stepsize <= 0) - stepsize = interval_g; - if (heartbeat <= 0) - heartbeat = 2 * interval_g; + static int init_once = 0; + int status; - if (heartbeat < interval_g) + if (init_once != 0) + return (0); + init_once = 1; + + if (rrdcreate_config.stepsize < 0) + rrdcreate_config.stepsize = 0; + if (rrdcreate_config.heartbeat <= 0) + rrdcreate_config.heartbeat = 2 * rrdcreate_config.stepsize; + + if ((rrdcreate_config.heartbeat > 0) + && (rrdcreate_config.heartbeat < interval_g)) WARNING ("rrdtool plugin: Your `heartbeat' is " "smaller than your `interval'. This will " "likely cause problems."); - else if (stepsize < interval_g) + else if ((rrdcreate_config.stepsize > 0) + && (rrdcreate_config.stepsize < interval_g)) WARNING ("rrdtool plugin: Your `stepsize' is " "smaller than your `interval'. This will " "create needlessly big RRD-files."); + /* Set the cache up */ pthread_mutex_lock (&cache_lock); + + cache = c_avl_create ((int (*) (const void *, const void *)) strcmp); + if (cache == NULL) + { + ERROR ("rrdtool plugin: c_avl_create failed."); + return (-1); + } + + cache_flush_last = time (NULL); if (cache_timeout < 2) { cache_timeout = 0; cache_flush_timeout = 0; } - else - { - if (cache_flush_timeout < cache_timeout) - cache_flush_timeout = 10 * cache_timeout; + else if (cache_flush_timeout < cache_timeout) + cache_flush_timeout = 10 * cache_timeout; - cache = avl_create ((int (*) (const void *, const void *)) strcmp); - cache_flush_last = time (NULL); - plugin_register_shutdown ("rrdtool", rrd_shutdown); - } pthread_mutex_unlock (&cache_lock); + status = pthread_create (&queue_thread, /* attr = */ NULL, + rrd_queue_thread, /* args = */ NULL); + if (status != 0) + { + ERROR ("rrdtool plugin: Cannot create queue-thread."); + return (-1); + } + queue_thread_running = 1; + DEBUG ("rrdtool plugin: rrd_init: datadir = %s; stepsize = %i;" " heartbeat = %i; rrarows = %i; xff = %lf;", (datadir == NULL) ? "(null)" : datadir, - stepsize, heartbeat, rrarows, xff); + rrdcreate_config.stepsize, + rrdcreate_config.heartbeat, + rrdcreate_config.rrarows, + rrdcreate_config.xff); return (0); } /* int rrd_init */ @@ -887,5 +1190,7 @@ void module_register (void) plugin_register_config ("rrdtool", rrd_config, config_keys, config_keys_num); plugin_register_init ("rrdtool", rrd_init); - plugin_register_write ("rrdtool", rrd_write); + plugin_register_write ("rrdtool", rrd_write, /* user_data = */ NULL); + plugin_register_flush ("rrdtool", rrd_flush, /* user_data = */ NULL); + plugin_register_shutdown ("rrdtool", rrd_shutdown); }