X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Frrdtool.c;h=243a8c8ff90feff941e5d20e179f63ef2e15f815;hb=34e095d9e7c605ded713203f21e4f00c8d6b6d44;hp=9b236cb85553a5d632d35760bb266c97bfba27ff;hpb=332cf199806de625661fa3a9bf9555f7a47ba9f9;p=collectd.git diff --git a/src/rrdtool.c b/src/rrdtool.c index 9b236cb8..243a8c8f 100644 --- a/src/rrdtool.c +++ b/src/rrdtool.c @@ -111,7 +111,8 @@ static rrd_queue_t *queue_head = NULL; static rrd_queue_t *queue_tail = NULL; static rrd_queue_t *flushq_head = NULL; static rrd_queue_t *flushq_tail = NULL; -static pthread_t queue_thread = 0; +static pthread_t queue_thread; +static int queue_thread_running = 1; static pthread_mutex_t queue_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER; @@ -223,7 +224,7 @@ static int value_list_to_string (char *buffer, int buffer_len, } /* int value_list_to_string */ static int value_list_to_filename (char *buffer, int buffer_len, - const data_set_t *ds, const value_list_t *vl) + const data_set_t __attribute__((unused)) *ds, const value_list_t *vl) { int offset = 0; int status; @@ -266,7 +267,7 @@ static int value_list_to_filename (char *buffer, int buffer_len, return (0); } /* int value_list_to_filename */ -static void *rrd_queue_thread (void *data) +static void *rrd_queue_thread (void __attribute__((unused)) *data) { struct timeval tv_next_update; struct timeval tv_now; @@ -279,14 +280,17 @@ static void *rrd_queue_thread (void *data) rrd_cache_t *cache_entry; char **values; int values_num; + int status; int i; + values = NULL; + values_num = 0; + pthread_mutex_lock (&queue_lock); /* Wait for values to arrive */ while (true) { struct timespec ts_wait; - int status; while ((flushq_head == NULL) && (queue_head == NULL) && (do_shutdown == 0)) @@ -308,10 +312,9 @@ static void *rrd_queue_thread (void *data) break; gettimeofday (&tv_now, /* timezone = */ NULL); - status = timeval_sub_timespec (&tv_next_update, &tv_now, - &ts_wait); + status = timeval_cmp (tv_next_update, tv_now, NULL); /* We're good to go */ - if (status != 0) + if (status <= 0) break; /* We're supposed to wait a bit with this update, so we'll @@ -362,17 +365,28 @@ static void *rrd_queue_thread (void *data) * we make a copy of it's values */ pthread_mutex_lock (&cache_lock); - c_avl_get (cache, queue_entry->filename, (void *) &cache_entry); + status = c_avl_get (cache, queue_entry->filename, + (void *) &cache_entry); - values = cache_entry->values; - values_num = cache_entry->values_num; + if (status == 0) + { + values = cache_entry->values; + values_num = cache_entry->values_num; - cache_entry->values = NULL; - cache_entry->values_num = 0; - cache_entry->flags = FLAG_NONE; + cache_entry->values = NULL; + cache_entry->values_num = 0; + cache_entry->flags = FLAG_NONE; + } pthread_mutex_unlock (&cache_lock); + if (status != 0) + { + sfree (queue_entry->filename); + sfree (queue_entry); + continue; + } + /* Update `tv_next_update' */ if (write_rate > 0.0) { @@ -390,8 +404,9 @@ static void *rrd_queue_thread (void *data) /* Write the values to the RRD-file */ srrd_update (queue_entry->filename, NULL, values_num, (const char **)values); - DEBUG ("rrdtool plugin: queue thread: Wrote %i values to %s", - values_num, queue_entry->filename); + DEBUG ("rrdtool plugin: queue thread: Wrote %i value%s to %s", + values_num, (values_num == 1) ? "" : "s", + queue_entry->filename); for (i = 0; i < values_num; i++) { @@ -586,7 +601,7 @@ static int rrd_cache_flush_identifier (int timeout, const char *identifier) status = c_avl_get (cache, key, (void *) &rc); if (status != 0) { - WARNING ("rrdtool plugin: rrd_cache_flush_identifier: " + INFO ("rrdtool plugin: rrd_cache_flush_identifier: " "c_avl_get (%s) failed. Does that file really exist?", key); return (status); @@ -626,6 +641,15 @@ static int rrd_cache_insert (const char *filename, pthread_mutex_lock (&cache_lock); + /* This shouldn't happen, but it did happen at least once, so we'll be + * careful. */ + if (cache == NULL) + { + pthread_mutex_unlock (&cache_lock); + WARNING ("rrdtool plugin: cache == NULL."); + return (-1); + } + c_avl_get (cache, filename, (void *) &rc); if (rc == NULL) @@ -747,7 +771,8 @@ static int rrd_compare_numeric (const void *a_ptr, const void *b_ptr) return (0); } /* int rrd_compare_numeric */ -static int rrd_write (const data_set_t *ds, const value_list_t *vl) +static int rrd_write (const data_set_t *ds, const value_list_t *vl, + user_data_t __attribute__((unused)) *user_data) { struct stat statbuf; char filename[512]; @@ -795,7 +820,8 @@ static int rrd_write (const data_set_t *ds, const value_list_t *vl) return (status); } /* int rrd_write */ -static int rrd_flush (int timeout, const char *identifier) +static int rrd_flush (int timeout, const char *identifier, + user_data_t __attribute__((unused)) *user_data) { pthread_mutex_lock (&cache_lock); @@ -819,6 +845,8 @@ static int rrd_config (const char *key, const char *value) { fprintf (stderr, "rrdtool: `CacheTimeout' must " "be greater than 0.\n"); + ERROR ("rrdtool: `CacheTimeout' must " + "be greater than 0.\n"); return (1); } cache_timeout = tmp; @@ -830,6 +858,8 @@ static int rrd_config (const char *key, const char *value) { fprintf (stderr, "rrdtool: `CacheFlush' must " "be greater than 0.\n"); + ERROR ("rrdtool: `CacheFlush' must " + "be greater than 0.\n"); return (1); } cache_flush_timeout = tmp; @@ -873,6 +903,8 @@ static int rrd_config (const char *key, const char *value) { fprintf (stderr, "rrdtool: `RRARows' must " "be greater than 0.\n"); + ERROR ("rrdtool: `RRARows' must " + "be greater than 0.\n"); return (1); } rrdcreate_config.rrarows = tmp; @@ -899,6 +931,7 @@ static int rrd_config (const char *key, const char *value) if (tmp_alloc == NULL) { fprintf (stderr, "rrdtool: realloc failed.\n"); + ERROR ("rrdtool: realloc failed.\n"); free (value_copy); return (1); } @@ -922,6 +955,8 @@ static int rrd_config (const char *key, const char *value) { fprintf (stderr, "rrdtool: `XFF' must " "be in the range 0 to 1 (exclusive)."); + ERROR ("rrdtool: `XFF' must " + "be in the range 0 to 1 (exclusive)."); return (1); } rrdcreate_config.xff = tmp; @@ -963,21 +998,40 @@ static int rrd_shutdown (void) pthread_cond_signal (&queue_cond); pthread_mutex_unlock (&queue_lock); + if ((queue_thread_running != 0) + && ((queue_head != NULL) || (flushq_head != NULL))) + { + INFO ("rrdtool plugin: Shutting down the queue thread. " + "This may take a while."); + } + else if (queue_thread_running != 0) + { + INFO ("rrdtool plugin: Shutting down the queue thread."); + } + /* Wait for all the values to be written to disk before returning. */ - if (queue_thread != 0) + if (queue_thread_running != 0) { pthread_join (queue_thread, NULL); - queue_thread = 0; + memset (&queue_thread, 0, sizeof (queue_thread)); + queue_thread_running = 0; DEBUG ("rrdtool plugin: queue_thread exited."); } + /* TODO: Maybe it'd be a good idea to free the cache here.. */ + return (0); } /* int rrd_shutdown */ static int rrd_init (void) { + static int init_once = 0; int status; + if (init_once != 0) + return (0); + init_once = 1; + if (rrdcreate_config.stepsize < 0) rrdcreate_config.stepsize = 0; if (rrdcreate_config.heartbeat <= 0) @@ -1015,12 +1069,14 @@ static int rrd_init (void) pthread_mutex_unlock (&cache_lock); - status = pthread_create (&queue_thread, NULL, rrd_queue_thread, NULL); + status = pthread_create (&queue_thread, /* attr = */ NULL, + rrd_queue_thread, /* args = */ NULL); if (status != 0) { ERROR ("rrdtool plugin: Cannot create queue-thread."); return (-1); } + queue_thread_running = 1; DEBUG ("rrdtool plugin: rrd_init: datadir = %s; stepsize = %i;" " heartbeat = %i; rrarows = %i; xff = %lf;", @@ -1038,7 +1094,7 @@ void module_register (void) plugin_register_config ("rrdtool", rrd_config, config_keys, config_keys_num); plugin_register_init ("rrdtool", rrd_init); - plugin_register_write ("rrdtool", rrd_write); - plugin_register_flush ("rrdtool", rrd_flush); + plugin_register_write ("rrdtool", rrd_write, /* user_data = */ NULL); + plugin_register_flush ("rrdtool", rrd_flush, /* user_data = */ NULL); plugin_register_shutdown ("rrdtool", rrd_shutdown); }