X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Frrdtool.c;h=27e64c11c2bab98bc430b0355d4514e38fc82e14;hb=4e57613ae19a71a0f180f0648223a139f48c932c;hp=3b2c54fc9265bb49c05618164af4dc7d8f8ce121;hpb=a8d1499f57d3ffaff4c0ef3259a9fbf21b2953c5;p=collectd.git diff --git a/src/rrdtool.c b/src/rrdtool.c index 3b2c54fc..27e64c11 100644 --- a/src/rrdtool.c +++ b/src/rrdtool.c @@ -1,6 +1,6 @@ /** * collectd - src/rrdtool.c - * Copyright (C) 2006,2007 Florian octo Forster + * Copyright (C) 2006-2008 Florian octo Forster * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the @@ -42,11 +42,19 @@ struct rrd_cache_s enum { FLAG_NONE = 0x00, - FLAG_QUEUED = 0x01 + FLAG_QUEUED = 0x01, + FLAG_FLUSHQ = 0x02 } flags; }; typedef struct rrd_cache_s rrd_cache_t; +enum rrd_queue_dir_e +{ + QUEUE_INSERT_FRONT, + QUEUE_INSERT_BACK +}; +typedef enum rrd_queue_dir_e rrd_queue_dir_t; + struct rrd_queue_s { char *filename; @@ -87,7 +95,8 @@ static const char *config_keys[] = "HeartBeat", "RRARows", "RRATimespan", - "XFF" + "XFF", + "WritesPerSecond" }; static int config_keys_num = STATIC_ARRAY_SIZE (config_keys); @@ -99,6 +108,7 @@ static int stepsize = 0; static int heartbeat = 0; static int rrarows = 1200; static double xff = 0.1; +static double write_rate = 0.0; /* XXX: If you need to lock both, cache_lock and queue_lock, at the same time, * ALWAYS lock `cache_lock' first! */ @@ -110,6 +120,8 @@ static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER; static rrd_queue_t *queue_head = NULL; static rrd_queue_t *queue_tail = NULL; +static rrd_queue_t *flushq_head = NULL; +static rrd_queue_t *flushq_tail = NULL; static pthread_t queue_thread = 0; static pthread_mutex_t queue_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER; @@ -211,7 +223,7 @@ static int rra_get (char ***ret, const value_list_t *vl) if (rra_num >= rra_max) break; - if (snprintf (buffer, sizeof (buffer), "RRA:%s:%3.1f:%u:%u", + if (ssnprintf (buffer, sizeof (buffer), "RRA:%s:%3.1f:%u:%u", rra_types[j], xff, cdp_len, cdp_num) >= sizeof (buffer)) { @@ -288,22 +300,16 @@ static int ds_get (char ***ret, const data_set_t *ds, const value_list_t *vl) sstrncpy (min, "U", sizeof (min)); } else - { - snprintf (min, sizeof (min), "%lf", d->min); - min[sizeof (min) - 1] = '\0'; - } + ssnprintf (min, sizeof (min), "%lf", d->min); if (isnan (d->max)) { sstrncpy (max, "U", sizeof (max)); } else - { - snprintf (max, sizeof (max), "%lf", d->max); - max[sizeof (max) - 1] = '\0'; - } + ssnprintf (max, sizeof (max), "%lf", d->max); - status = snprintf (buffer, sizeof (buffer), + status = ssnprintf (buffer, sizeof (buffer), "DS:%s:%s:%i:%s:%s", d->name, type, (heartbeat > 0) ? heartbeat : (2 * vl->interval), @@ -335,7 +341,7 @@ static int ds_get (char ***ret, const data_set_t *ds, const value_list_t *vl) #if HAVE_THREADSAFE_LIBRRD static int srrd_create (char *filename, unsigned long pdp_step, time_t last_up, - int argc, char **argv) + int argc, const char **argv) { int status; @@ -353,7 +359,8 @@ static int srrd_create (char *filename, unsigned long pdp_step, time_t last_up, return (status); } /* int srrd_create */ -static int srrd_update (char *filename, char *template, int argc, char **argv) +static int srrd_update (char *filename, char *template, + int argc, const char **argv) { int status; @@ -374,7 +381,7 @@ static int srrd_update (char *filename, char *template, int argc, char **argv) #else /* !HAVE_THREADSAFE_LIBRRD */ static int srrd_create (char *filename, unsigned long pdp_step, time_t last_up, - int argc, char **argv) + int argc, const char **argv) { int status; @@ -395,10 +402,8 @@ static int srrd_create (char *filename, unsigned long pdp_step, time_t last_up, if (last_up == 0) last_up = time (NULL) - 10; - snprintf (pdp_step_str, sizeof (pdp_step_str), "%lu", pdp_step); - pdp_step_str[sizeof (pdp_step_str) - 1] = '\0'; - snprintf (last_up_str, sizeof (last_up_str), "%u", (unsigned int) last_up); - last_up_str[sizeof (last_up_str) - 1] = '\0'; + ssnprintf (pdp_step_str, sizeof (pdp_step_str), "%lu", pdp_step); + ssnprintf (last_up_str, sizeof (last_up_str), "%u", (unsigned int) last_up); new_argv[0] = "create"; new_argv[1] = filename; @@ -428,7 +433,8 @@ static int srrd_create (char *filename, unsigned long pdp_step, time_t last_up, return (status); } /* int srrd_create */ -static int srrd_update (char *filename, char *template, int argc, char **argv) +static int srrd_update (char *filename, char *template, + int argc, const char **argv) { int status; @@ -513,7 +519,7 @@ static int rrd_create_file (char *filename, const data_set_t *ds, const value_li status = srrd_create (filename, (stepsize > 0) ? stepsize : vl->interval, vl->time - 10, - argc, argv); + argc, (const char **)argv); free (argv); ds_free (ds_num, ds_def); @@ -531,7 +537,7 @@ static int value_list_to_string (char *buffer, int buffer_len, memset (buffer, '\0', buffer_len); - status = snprintf (buffer, buffer_len, "%u", (unsigned int) vl->time); + status = ssnprintf (buffer, buffer_len, "%u", (unsigned int) vl->time); if ((status < 1) || (status >= buffer_len)) return (-1); offset = status; @@ -543,10 +549,10 @@ static int value_list_to_string (char *buffer, int buffer_len, return (-1); if (ds->ds[i].type == DS_TYPE_COUNTER) - status = snprintf (buffer + offset, buffer_len - offset, + status = ssnprintf (buffer + offset, buffer_len - offset, ":%llu", vl->values[i].counter); else - status = snprintf (buffer + offset, buffer_len - offset, + status = ssnprintf (buffer + offset, buffer_len - offset, ":%lf", vl->values[i].gauge); if ((status < 1) || (status >= (buffer_len - offset))) @@ -566,35 +572,35 @@ static int value_list_to_filename (char *buffer, int buffer_len, if (datadir != NULL) { - status = snprintf (buffer + offset, buffer_len - offset, + status = ssnprintf (buffer + offset, buffer_len - offset, "%s/", datadir); if ((status < 1) || (status >= buffer_len - offset)) return (-1); offset += status; } - status = snprintf (buffer + offset, buffer_len - offset, + status = ssnprintf (buffer + offset, buffer_len - offset, "%s/", vl->host); if ((status < 1) || (status >= buffer_len - offset)) return (-1); offset += status; if (strlen (vl->plugin_instance) > 0) - status = snprintf (buffer + offset, buffer_len - offset, + status = ssnprintf (buffer + offset, buffer_len - offset, "%s-%s/", vl->plugin, vl->plugin_instance); else - status = snprintf (buffer + offset, buffer_len - offset, + status = ssnprintf (buffer + offset, buffer_len - offset, "%s/", vl->plugin); if ((status < 1) || (status >= buffer_len - offset)) return (-1); offset += status; if (strlen (vl->type_instance) > 0) - status = snprintf (buffer + offset, buffer_len - offset, - "%s-%s.rrd", ds->type, vl->type_instance); + status = ssnprintf (buffer + offset, buffer_len - offset, + "%s-%s.rrd", vl->type, vl->type_instance); else - status = snprintf (buffer + offset, buffer_len - offset, - "%s.rrd", ds->type); + status = ssnprintf (buffer + offset, buffer_len - offset, + "%s.rrd", vl->type); if ((status < 1) || (status >= buffer_len - offset)) return (-1); offset += status; @@ -604,35 +610,92 @@ static int value_list_to_filename (char *buffer, int buffer_len, static void *rrd_queue_thread (void *data) { + struct timeval tv_next_update; + struct timeval tv_now; + + gettimeofday (&tv_next_update, /* timezone = */ NULL); + while (42) { rrd_queue_t *queue_entry; rrd_cache_t *cache_entry; char **values; int values_num; + int status; int i; - /* XXX: If you need to lock both, cache_lock and queue_lock, at - * the same time, ALWAYS lock `cache_lock' first! */ - - /* wait until an entry is available */ - pthread_mutex_lock (&queue_lock); - while ((queue_head == NULL) && (do_shutdown == 0)) - pthread_cond_wait (&queue_cond, &queue_lock); - - /* We're in the shutdown phase */ - if (queue_head == NULL) - { - pthread_mutex_unlock (&queue_lock); - break; - } - - /* Dequeue the first entry */ - queue_entry = queue_head; - if (queue_head == queue_tail) - queue_head = queue_tail = NULL; - else - queue_head = queue_head->next; + pthread_mutex_lock (&queue_lock); + /* Wait for values to arrive */ + while (true) + { + struct timespec ts_wait; + + while ((flushq_head == NULL) && (queue_head == NULL) + && (do_shutdown == 0)) + pthread_cond_wait (&queue_cond, &queue_lock); + + if ((flushq_head == NULL) && (queue_head == NULL)) + break; + + /* Don't delay if there's something to flush */ + if (flushq_head != NULL) + break; + + /* Don't delay if we're shutting down */ + if (do_shutdown != 0) + break; + + /* Don't delay if no delay was configured. */ + if (write_rate <= 0.0) + break; + + gettimeofday (&tv_now, /* timezone = */ NULL); + status = timeval_sub_timespec (&tv_next_update, &tv_now, + &ts_wait); + /* We're good to go */ + if (status != 0) + break; + + /* We're supposed to wait a bit with this update, so we'll + * wait for the next addition to the queue or to the end of + * the wait period - whichever comes first. */ + ts_wait.tv_sec = tv_next_update.tv_sec; + ts_wait.tv_nsec = 1000 * tv_next_update.tv_usec; + + status = pthread_cond_timedwait (&queue_cond, &queue_lock, + &ts_wait); + if (status == ETIMEDOUT) + break; + } /* while (true) */ + + /* XXX: If you need to lock both, cache_lock and queue_lock, at + * the same time, ALWAYS lock `cache_lock' first! */ + + /* We're in the shutdown phase */ + if ((flushq_head == NULL) && (queue_head == NULL)) + { + pthread_mutex_unlock (&queue_lock); + break; + } + + if (flushq_head != NULL) + { + /* Dequeue the first flush entry */ + queue_entry = flushq_head; + if (flushq_head == flushq_tail) + flushq_head = flushq_tail = NULL; + else + flushq_head = flushq_head->next; + } + else /* if (queue_head != NULL) */ + { + /* Dequeue the first regular entry */ + queue_entry = queue_head; + if (queue_head == queue_tail) + queue_head = queue_tail = NULL; + else + queue_head = queue_head->next; + } /* Unlock the queue again */ pthread_mutex_unlock (&queue_lock); @@ -641,19 +704,45 @@ static void *rrd_queue_thread (void *data) * we make a copy of it's values */ pthread_mutex_lock (&cache_lock); - c_avl_get (cache, queue_entry->filename, (void *) &cache_entry); + status = c_avl_get (cache, queue_entry->filename, + (void *) &cache_entry); - values = cache_entry->values; - values_num = cache_entry->values_num; + if (status == 0) + { + values = cache_entry->values; + values_num = cache_entry->values_num; - cache_entry->values = NULL; - cache_entry->values_num = 0; - cache_entry->flags = FLAG_NONE; + cache_entry->values = NULL; + cache_entry->values_num = 0; + cache_entry->flags = FLAG_NONE; + } pthread_mutex_unlock (&cache_lock); + if (status != 0) + { + sfree (queue_entry->filename); + sfree (queue_entry); + continue; + } + + /* Update `tv_next_update' */ + if (write_rate > 0.0) + { + gettimeofday (&tv_now, /* timezone = */ NULL); + tv_next_update.tv_sec = tv_now.tv_sec; + tv_next_update.tv_usec = tv_now.tv_usec + + ((suseconds_t) (1000000 * write_rate)); + while (tv_next_update.tv_usec > 1000000) + { + tv_next_update.tv_sec++; + tv_next_update.tv_usec -= 1000000; + } + } + /* Write the values to the RRD-file */ - srrd_update (queue_entry->filename, NULL, values_num, values); + srrd_update (queue_entry->filename, NULL, + values_num, (const char **)values); DEBUG ("rrdtool plugin: queue thread: Wrote %i values to %s", values_num, queue_entry->filename); @@ -675,36 +764,79 @@ static void *rrd_queue_thread (void *data) return ((void *) 0); } /* void *rrd_queue_thread */ -static int rrd_queue_cache_entry (const char *filename) +static int rrd_queue_enqueue (const char *filename, + rrd_queue_t **head, rrd_queue_t **tail) { - rrd_queue_t *queue_entry; + rrd_queue_t *queue_entry; - queue_entry = (rrd_queue_t *) malloc (sizeof (rrd_queue_t)); - if (queue_entry == NULL) - return (-1); + queue_entry = (rrd_queue_t *) malloc (sizeof (rrd_queue_t)); + if (queue_entry == NULL) + return (-1); - queue_entry->filename = strdup (filename); - if (queue_entry->filename == NULL) - { - free (queue_entry); - return (-1); - } + queue_entry->filename = strdup (filename); + if (queue_entry->filename == NULL) + { + free (queue_entry); + return (-1); + } - queue_entry->next = NULL; + queue_entry->next = NULL; - pthread_mutex_lock (&queue_lock); - if (queue_tail == NULL) - queue_head = queue_entry; - else - queue_tail->next = queue_entry; - queue_tail = queue_entry; - pthread_cond_signal (&queue_cond); - pthread_mutex_unlock (&queue_lock); + pthread_mutex_lock (&queue_lock); - DEBUG ("rrdtool plugin: Put `%s' into the update queue", filename); + if (*tail == NULL) + *head = queue_entry; + else + (*tail)->next = queue_entry; + *tail = queue_entry; - return (0); -} /* int rrd_queue_cache_entry */ + pthread_cond_signal (&queue_cond); + pthread_mutex_unlock (&queue_lock); + + return (0); +} /* int rrd_queue_enqueue */ + +static int rrd_queue_dequeue (const char *filename, + rrd_queue_t **head, rrd_queue_t **tail) +{ + rrd_queue_t *this; + rrd_queue_t *prev; + + pthread_mutex_lock (&queue_lock); + + prev = NULL; + this = *head; + + while (this != NULL) + { + if (strcmp (this->filename, filename) == 0) + break; + + prev = this; + this = this->next; + } + + if (this == NULL) + { + pthread_mutex_unlock (&queue_lock); + return (-1); + } + + if (prev == NULL) + *head = this->next; + else + prev->next = this->next; + + if (this->next == NULL) + *tail = prev; + + pthread_mutex_unlock (&queue_lock); + + sfree (this->filename); + sfree (this); + + return (0); +} /* int rrd_queue_dequeue */ static void rrd_cache_flush (int timeout) { @@ -726,13 +858,16 @@ static void rrd_cache_flush (int timeout) iter = c_avl_get_iterator (cache); while (c_avl_iterator_next (iter, (void *) &key, (void *) &rc) == 0) { - if (rc->flags == FLAG_QUEUED) + if (rc->flags != FLAG_NONE) continue; else if ((now - rc->first_value) < timeout) continue; else if (rc->values_num > 0) { - if (rrd_queue_cache_entry (key) == 0) + int status; + + status = rrd_queue_enqueue (key, &queue_head, &queue_tail); + if (status == 0) rc->flags = FLAG_QUEUED; } else /* ancient and no values -> waste of memory */ @@ -778,6 +913,63 @@ static void rrd_cache_flush (int timeout) cache_flush_last = now; } /* void rrd_cache_flush */ +static int rrd_cache_flush_identifier (int timeout, const char *identifier) +{ + rrd_cache_t *rc; + time_t now; + int status; + char key[2048]; + + if (identifier == NULL) + { + rrd_cache_flush (timeout); + return (0); + } + + now = time (NULL); + + if (datadir == NULL) + snprintf (key, sizeof (key), "%s.rrd", + identifier); + else + snprintf (key, sizeof (key), "%s/%s.rrd", + datadir, identifier); + key[sizeof (key) - 1] = 0; + + status = c_avl_get (cache, key, (void *) &rc); + if (status != 0) + { + WARNING ("rrdtool plugin: rrd_cache_flush_identifier: " + "c_avl_get (%s) failed. Does that file really exist?", + key); + return (status); + } + + if (rc->flags == FLAG_FLUSHQ) + { + status = 0; + } + else if (rc->flags == FLAG_QUEUED) + { + rrd_queue_dequeue (key, &queue_head, &queue_tail); + status = rrd_queue_enqueue (key, &flushq_head, &flushq_tail); + if (status == 0) + rc->flags = FLAG_FLUSHQ; + } + else if ((now - rc->first_value) < timeout) + { + status = 0; + } + else if (rc->values_num > 0) + { + status = rrd_queue_enqueue (key, &flushq_head, &flushq_tail); + if (status == 0) + rc->flags = FLAG_FLUSHQ; + } + + return (status); +} /* int rrd_cache_flush_identifier */ + static int rrd_cache_insert (const char *filename, const char *value, time_t value_time) { @@ -787,6 +979,15 @@ static int rrd_cache_insert (const char *filename, pthread_mutex_lock (&cache_lock); + /* This shouldn't happen, but it did happen at least once, so we'll be + * careful. */ + if (cache == NULL) + { + pthread_mutex_unlock (&cache_lock); + WARNING ("rrdtool plugin: cache == NULL."); + return (-1); + } + c_avl_get (cache, filename, (void *) &rc); if (rc == NULL) @@ -864,17 +1065,20 @@ static int rrd_cache_insert (const char *filename, } DEBUG ("rrdtool plugin: rrd_cache_insert: file = %s; " - "values_num = %i; age = %u;", + "values_num = %i; age = %lu;", filename, rc->values_num, - rc->last_value - rc->first_value); + (unsigned long)(rc->last_value - rc->first_value)); if ((rc->last_value - rc->first_value) >= cache_timeout) { /* XXX: If you need to lock both, cache_lock and queue_lock, at * the same time, ALWAYS lock `cache_lock' first! */ - if (rc->flags != FLAG_QUEUED) + if (rc->flags == FLAG_NONE) { - if (rrd_queue_cache_entry (filename) == 0) + int status; + + status = rrd_queue_enqueue (filename, &queue_head, &queue_tail); + if (status == 0) rc->flags = FLAG_QUEUED; } else @@ -887,7 +1091,6 @@ static int rrd_cache_insert (const char *filename, ((time (NULL) - cache_flush_last) > cache_flush_timeout)) rrd_cache_flush (cache_flush_timeout); - pthread_mutex_unlock (&cache_lock); return (0); @@ -913,6 +1116,11 @@ static int rrd_write (const data_set_t *ds, const value_list_t *vl) char values[512]; int status; + if (0 != strcmp (ds->type, vl->type)) { + ERROR ("rrdtool plugin: DS type does not match value list type"); + return -1; + } + if (value_list_to_filename (filename, sizeof (filename), ds, vl) != 0) return (-1); @@ -947,7 +1155,7 @@ static int rrd_write (const data_set_t *ds, const value_list_t *vl) return (status); } /* int rrd_write */ -static int rrd_flush (const int timeout) +static int rrd_flush (int timeout, const char *identifier) { pthread_mutex_lock (&cache_lock); @@ -956,7 +1164,8 @@ static int rrd_flush (const int timeout) return (0); } - rrd_cache_flush (timeout); + rrd_cache_flush_identifier (timeout, identifier); + pthread_mutex_unlock (&cache_lock); return (0); } /* int rrd_flush */ @@ -970,6 +1179,8 @@ static int rrd_config (const char *key, const char *value) { fprintf (stderr, "rrdtool: `CacheTimeout' must " "be greater than 0.\n"); + ERROR ("rrdtool: `CacheTimeout' must " + "be greater than 0.\n"); return (1); } cache_timeout = tmp; @@ -981,6 +1192,8 @@ static int rrd_config (const char *key, const char *value) { fprintf (stderr, "rrdtool: `CacheFlush' must " "be greater than 0.\n"); + ERROR ("rrdtool: `CacheFlush' must " + "be greater than 0.\n"); return (1); } cache_flush_timeout = tmp; @@ -1024,6 +1237,8 @@ static int rrd_config (const char *key, const char *value) { fprintf (stderr, "rrdtool: `RRARows' must " "be greater than 0.\n"); + ERROR ("rrdtool: `RRARows' must " + "be greater than 0.\n"); return (1); } rrarows = tmp; @@ -1050,6 +1265,7 @@ static int rrd_config (const char *key, const char *value) if (tmp_alloc == NULL) { fprintf (stderr, "rrdtool: realloc failed.\n"); + ERROR ("rrdtool: realloc failed.\n"); free (value_copy); return (1); } @@ -1073,10 +1289,31 @@ static int rrd_config (const char *key, const char *value) { fprintf (stderr, "rrdtool: `XFF' must " "be in the range 0 to 1 (exclusive)."); + ERROR ("rrdtool: `XFF' must " + "be in the range 0 to 1 (exclusive)."); return (1); } xff = tmp; } + else if (strcasecmp ("WritesPerSecond", key) == 0) + { + double wps = atof (value); + + if (wps < 0.0) + { + fprintf (stderr, "rrdtool: `WritesPerSecond' must be " + "greater than or equal to zero."); + return (1); + } + else if (wps == 0.0) + { + write_rate = 0.0; + } + else + { + write_rate = 1.0 / wps; + } + } else { return (-1);