X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Frrdtool.c;h=2b384d09d7690b113aeadd5b4fec588f72b9b4ba;hb=c6da31fb30c2fb1d131f92efcde0b3ec9a010b2c;hp=ef1b25cd42e17432affd93ec2c199f881f116394;hpb=0171b227b670c7ca338b26bfaff303fd7cf2d023;p=collectd.git diff --git a/src/rrdtool.c b/src/rrdtool.c index ef1b25cd..2b384d09 100644 --- a/src/rrdtool.c +++ b/src/rrdtool.c @@ -1,6 +1,8 @@ /** * collectd - src/rrdtool.c * Copyright (C) 2006-2008 Florian octo Forster + * Copyright (C) 2008-2008 Sebastian Harl + * Copyright (C) 2009 Mariusz Gronczewski * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the @@ -17,6 +19,8 @@ * * Authors: * Florian octo Forster + * Sebastian Harl + * Mariusz Gronczewski **/ #include "collectd.h" @@ -40,6 +44,7 @@ struct rrd_cache_s char **values; time_t first_value; time_t last_value; + int random_variation; enum { FLAG_NONE = 0x00, @@ -76,7 +81,8 @@ static const char *config_keys[] = "RRARows", "RRATimespan", "XFF", - "WritesPerSecond" + "WritesPerSecond", + "RandomTimeout" }; static int config_keys_num = STATIC_ARRAY_SIZE (config_keys); @@ -103,6 +109,7 @@ static rrdcreate_config_t rrdcreate_config = * ALWAYS lock `cache_lock' first! */ static int cache_timeout = 0; static int cache_flush_timeout = 0; +static int random_timeout = 1; static time_t cache_flush_last; static c_avl_tree_t *cache = NULL; static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER; @@ -111,7 +118,8 @@ static rrd_queue_t *queue_head = NULL; static rrd_queue_t *queue_tail = NULL; static rrd_queue_t *flushq_head = NULL; static rrd_queue_t *flushq_tail = NULL; -static pthread_t queue_thread = 0; +static pthread_t queue_thread; +static int queue_thread_running = 1; static pthread_mutex_t queue_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER; @@ -177,7 +185,7 @@ static int srrd_update (char *filename, char *template, if (status != 0) { WARNING ("rrdtool plugin: rrd_update_r failed: %s: %s", - argv[1], rrd_get_error ()); + filename, rrd_get_error ()); } sfree (new_argv); @@ -203,15 +211,23 @@ static int value_list_to_string (char *buffer, int buffer_len, for (i = 0; i < ds->ds_num; i++) { if ((ds->ds[i].type != DS_TYPE_COUNTER) - && (ds->ds[i].type != DS_TYPE_GAUGE)) + && (ds->ds[i].type != DS_TYPE_GAUGE) + && (ds->ds[i].type != DS_TYPE_DERIVE) + && (ds->ds[i].type != DS_TYPE_ABSOLUTE)) return (-1); if (ds->ds[i].type == DS_TYPE_COUNTER) status = ssnprintf (buffer + offset, buffer_len - offset, ":%llu", vl->values[i].counter); - else + else if (ds->ds[i].type == DS_TYPE_GAUGE) status = ssnprintf (buffer + offset, buffer_len - offset, ":%lf", vl->values[i].gauge); + else if (ds->ds[i].type == DS_TYPE_DERIVE) + status = ssnprintf (buffer + offset, buffer_len - offset, + ":%"PRIi64, vl->values[i].derive); + else /*if (ds->ds[i].type == DS_TYPE_ABSOLUTE) */ + status = ssnprintf (buffer + offset, buffer_len - offset, + ":%"PRIu64, vl->values[i].absolute); if ((status < 1) || (status >= (buffer_len - offset))) return (-1); @@ -403,8 +419,9 @@ static void *rrd_queue_thread (void __attribute__((unused)) *data) /* Write the values to the RRD-file */ srrd_update (queue_entry->filename, NULL, values_num, (const char **)values); - DEBUG ("rrdtool plugin: queue thread: Wrote %i values to %s", - values_num, queue_entry->filename); + DEBUG ("rrdtool plugin: queue thread: Wrote %i value%s to %s", + values_num, (values_num == 1) ? "" : "s", + queue_entry->filename); for (i = 0; i < values_num; i++) { @@ -415,11 +432,6 @@ static void *rrd_queue_thread (void __attribute__((unused)) *data) sfree (queue_entry); } /* while (42) */ - pthread_mutex_lock (&cache_lock); - c_avl_destroy (cache); - cache = NULL; - pthread_mutex_unlock (&cache_lock); - pthread_exit ((void *) 0); return ((void *) 0); } /* void *rrd_queue_thread */ @@ -659,6 +671,7 @@ static int rrd_cache_insert (const char *filename, rc->values = NULL; rc->first_value = 0; rc->last_value = 0; + rc->random_variation = 0; rc->flags = FLAG_NONE; new_rc = 1; } @@ -666,7 +679,7 @@ static int rrd_cache_insert (const char *filename, if (rc->last_value >= value_time) { pthread_mutex_unlock (&cache_lock); - WARNING ("rrdtool plugin: (rc->last_value = %u) >= (value_time = %u)", + DEBUG ("rrdtool plugin: (rc->last_value = %u) >= (value_time = %u)", (unsigned int) rc->last_value, (unsigned int) value_time); return (-1); @@ -729,7 +742,7 @@ static int rrd_cache_insert (const char *filename, filename, rc->values_num, (unsigned long)(rc->last_value - rc->first_value)); - if ((rc->last_value - rc->first_value) >= cache_timeout) + if ((rc->last_value + rc->random_variation - rc->first_value) >= cache_timeout) { /* XXX: If you need to lock both, cache_lock and queue_lock, at * the same time, ALWAYS lock `cache_lock' first! */ @@ -740,6 +753,18 @@ static int rrd_cache_insert (const char *filename, status = rrd_queue_enqueue (filename, &queue_head, &queue_tail); if (status == 0) rc->flags = FLAG_QUEUED; + + /* Update the jitter value. Negative values are + * slightly preferred. */ + if (random_timeout > 0) + { + rc->random_variation = (rand () % (2 * random_timeout)) + - random_timeout; + } + else + { + rc->random_variation = 0; + } } else { @@ -756,6 +781,59 @@ static int rrd_cache_insert (const char *filename, return (0); } /* int rrd_cache_insert */ +static int rrd_cache_destroy (void) /* {{{ */ +{ + void *key = NULL; + void *value = NULL; + + int non_empty = 0; + + pthread_mutex_lock (&cache_lock); + + if (cache == NULL) + { + pthread_mutex_unlock (&cache_lock); + return (0); + } + + while (c_avl_pick (cache, &key, &value) == 0) + { + rrd_cache_t *rc; + int i; + + sfree (key); + key = NULL; + + rc = value; + value = NULL; + + if (rc->values_num > 0) + non_empty++; + + for (i = 0; i < rc->values_num; i++) + sfree (rc->values[i]); + sfree (rc->values); + sfree (rc); + } + + c_avl_destroy (cache); + cache = NULL; + + if (non_empty > 0) + { + INFO ("rrdtool plugin: %i cache %s had values when destroying the cache.", + non_empty, (non_empty == 1) ? "entry" : "entries"); + } + else + { + DEBUG ("rrdtool plugin: No values have been lost " + "when destroying the cache."); + } + + pthread_mutex_unlock (&cache_lock); + return (0); +} /* }}} int rrd_cache_destroy */ + static int rrd_compare_numeric (const void *a_ptr, const void *b_ptr) { int a = *((int *) a_ptr); @@ -769,13 +847,17 @@ static int rrd_compare_numeric (const void *a_ptr, const void *b_ptr) return (0); } /* int rrd_compare_numeric */ -static int rrd_write (const data_set_t *ds, const value_list_t *vl) +static int rrd_write (const data_set_t *ds, const value_list_t *vl, + user_data_t __attribute__((unused)) *user_data) { struct stat statbuf; char filename[512]; char values[512]; int status; + if (do_shutdown) + return (0); + if (0 != strcmp (ds->type, vl->type)) { ERROR ("rrdtool plugin: DS type does not match value list type"); return -1; @@ -817,7 +899,8 @@ static int rrd_write (const data_set_t *ds, const value_list_t *vl) return (status); } /* int rrd_write */ -static int rrd_flush (int timeout, const char *identifier) +static int rrd_flush (int timeout, const char *identifier, + user_data_t __attribute__((unused)) *user_data) { pthread_mutex_lock (&cache_lock); @@ -976,6 +1059,23 @@ static int rrd_config (const char *key, const char *value) write_rate = 1.0 / wps; } } + else if (strcasecmp ("RandomTimeout", key) == 0) + { + int tmp; + + tmp = atoi (value); + if (tmp < 0) + { + fprintf (stderr, "rrdtool: `RandomTimeout' must " + "be greater than or equal to zero.\n"); + ERROR ("rrdtool: `RandomTimeout' must " + "be greater then or equal to zero."); + } + else + { + random_timeout = tmp; + } + } else { return (-1); @@ -994,14 +1094,28 @@ static int rrd_shutdown (void) pthread_cond_signal (&queue_cond); pthread_mutex_unlock (&queue_lock); + if ((queue_thread_running != 0) + && ((queue_head != NULL) || (flushq_head != NULL))) + { + INFO ("rrdtool plugin: Shutting down the queue thread. " + "This may take a while."); + } + else if (queue_thread_running != 0) + { + INFO ("rrdtool plugin: Shutting down the queue thread."); + } + /* Wait for all the values to be written to disk before returning. */ - if (queue_thread != 0) + if (queue_thread_running != 0) { pthread_join (queue_thread, NULL); - queue_thread = 0; + memset (&queue_thread, 0, sizeof (queue_thread)); + queue_thread_running = 0; DEBUG ("rrdtool plugin: queue_thread exited."); } + rrd_cache_destroy (); + return (0); } /* int rrd_shutdown */ @@ -1051,12 +1165,14 @@ static int rrd_init (void) pthread_mutex_unlock (&cache_lock); - status = pthread_create (&queue_thread, NULL, rrd_queue_thread, NULL); + status = pthread_create (&queue_thread, /* attr = */ NULL, + rrd_queue_thread, /* args = */ NULL); if (status != 0) { ERROR ("rrdtool plugin: Cannot create queue-thread."); return (-1); } + queue_thread_running = 1; DEBUG ("rrdtool plugin: rrd_init: datadir = %s; stepsize = %i;" " heartbeat = %i; rrarows = %i; xff = %lf;", @@ -1074,7 +1190,7 @@ void module_register (void) plugin_register_config ("rrdtool", rrd_config, config_keys, config_keys_num); plugin_register_init ("rrdtool", rrd_init); - plugin_register_write ("rrdtool", rrd_write); - plugin_register_flush ("rrdtool", rrd_flush); + plugin_register_write ("rrdtool", rrd_write, /* user_data = */ NULL); + plugin_register_flush ("rrdtool", rrd_flush, /* user_data = */ NULL); plugin_register_shutdown ("rrdtool", rrd_shutdown); }