X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Frrdtool.c;h=7cf243671254cf261b77fe3e79d21885f321384e;hb=efbeffe4cc730d6e14ba45dddfee9ab5229f07dc;hp=0dae46b3fe7b4eb5d46e097b16ca5055cb9bbaaf;hpb=c8b0fd41dc6730981e4facf54c2e5f690f6c3c2f;p=collectd.git diff --git a/src/rrdtool.c b/src/rrdtool.c index 0dae46b3..7cf24367 100644 --- a/src/rrdtool.c +++ b/src/rrdtool.c @@ -1,6 +1,6 @@ /** * collectd - src/rrdtool.c - * Copyright (C) 2006,2007 Florian octo Forster + * Copyright (C) 2006-2008 Florian octo Forster * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the @@ -24,6 +24,8 @@ #include "common.h" #include "utils_avltree.h" +#include + #if HAVE_PTHREAD_H # include #endif @@ -45,6 +47,13 @@ struct rrd_cache_s }; typedef struct rrd_cache_s rrd_cache_t; +enum rrd_queue_dir_e +{ + QUEUE_INSERT_FRONT, + QUEUE_INSERT_BACK +}; +typedef enum rrd_queue_dir_e rrd_queue_dir_t; + struct rrd_queue_s { char *filename; @@ -103,7 +112,7 @@ static double xff = 0.1; static int cache_timeout = 0; static int cache_flush_timeout = 0; static time_t cache_flush_last; -static avl_tree_t *cache = NULL; +static c_avl_tree_t *cache = NULL; static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER; static rrd_queue_t *queue_head = NULL; @@ -193,7 +202,7 @@ static int rra_get (char ***ret, const value_list_t *vl) span = rts[i]; if ((span / ss) < rrarows) - continue; + span = ss * rrarows; if (cdp_len == 0) cdp_len = 1; @@ -209,7 +218,7 @@ static int rra_get (char ***ret, const value_list_t *vl) if (rra_num >= rra_max) break; - if (snprintf (buffer, sizeof (buffer), "RRA:%s:%3.1f:%u:%u", + if (ssnprintf (buffer, sizeof (buffer), "RRA:%s:%3.1f:%u:%u", rra_types[j], xff, cdp_len, cdp_num) >= sizeof (buffer)) { @@ -283,25 +292,19 @@ static int ds_get (char ***ret, const data_set_t *ds, const value_list_t *vl) if (isnan (d->min)) { - strcpy (min, "U"); + sstrncpy (min, "U", sizeof (min)); } else - { - snprintf (min, sizeof (min), "%lf", d->min); - min[sizeof (min) - 1] = '\0'; - } + ssnprintf (min, sizeof (min), "%lf", d->min); if (isnan (d->max)) { - strcpy (max, "U"); + sstrncpy (max, "U", sizeof (max)); } else - { - snprintf (max, sizeof (max), "%lf", d->max); - max[sizeof (max) - 1] = '\0'; - } + ssnprintf (max, sizeof (max), "%lf", d->max); - status = snprintf (buffer, sizeof (buffer), + status = ssnprintf (buffer, sizeof (buffer), "DS:%s:%s:%i:%s:%s", d->name, type, (heartbeat > 0) ? heartbeat : (2 * vl->interval), @@ -333,14 +336,14 @@ static int ds_get (char ***ret, const data_set_t *ds, const value_list_t *vl) #if HAVE_THREADSAFE_LIBRRD static int srrd_create (char *filename, unsigned long pdp_step, time_t last_up, - int argc, char **argv) + int argc, const char **argv) { int status; optind = 0; /* bug in librrd? */ rrd_clear_error (); - status = rrd_create_r (filename, pdp_step, last_up, argc, argv); + status = rrd_create_r (filename, pdp_step, last_up, argc, (void *) argv); if (status != 0) { @@ -351,14 +354,15 @@ static int srrd_create (char *filename, unsigned long pdp_step, time_t last_up, return (status); } /* int srrd_create */ -static int srrd_update (char *filename, char *template, int argc, char **argv) +static int srrd_update (char *filename, char *template, + int argc, const char **argv) { int status; optind = 0; /* bug in librrd? */ rrd_clear_error (); - status = rrd_update_r (filename, template, argc, argv); + status = rrd_update_r (filename, template, argc, (void *) argv); if (status != 0) { @@ -372,7 +376,7 @@ static int srrd_update (char *filename, char *template, int argc, char **argv) #else /* !HAVE_THREADSAFE_LIBRRD */ static int srrd_create (char *filename, unsigned long pdp_step, time_t last_up, - int argc, char **argv) + int argc, const char **argv) { int status; @@ -393,10 +397,8 @@ static int srrd_create (char *filename, unsigned long pdp_step, time_t last_up, if (last_up == 0) last_up = time (NULL) - 10; - snprintf (pdp_step_str, sizeof (pdp_step_str), "%lu", pdp_step); - pdp_step_str[sizeof (pdp_step_str) - 1] = '\0'; - snprintf (last_up_str, sizeof (last_up_str), "%u", (unsigned int) last_up); - last_up_str[sizeof (last_up_str) - 1] = '\0'; + ssnprintf (pdp_step_str, sizeof (pdp_step_str), "%lu", pdp_step); + ssnprintf (last_up_str, sizeof (last_up_str), "%u", (unsigned int) last_up); new_argv[0] = "create"; new_argv[1] = filename; @@ -426,7 +428,8 @@ static int srrd_create (char *filename, unsigned long pdp_step, time_t last_up, return (status); } /* int srrd_create */ -static int srrd_update (char *filename, char *template, int argc, char **argv) +static int srrd_update (char *filename, char *template, + int argc, const char **argv) { int status; @@ -511,7 +514,7 @@ static int rrd_create_file (char *filename, const data_set_t *ds, const value_li status = srrd_create (filename, (stepsize > 0) ? stepsize : vl->interval, vl->time - 10, - argc, argv); + argc, (const char **)argv); free (argv); ds_free (ds_num, ds_def); @@ -529,7 +532,7 @@ static int value_list_to_string (char *buffer, int buffer_len, memset (buffer, '\0', buffer_len); - status = snprintf (buffer, buffer_len, "%u", (unsigned int) vl->time); + status = ssnprintf (buffer, buffer_len, "%u", (unsigned int) vl->time); if ((status < 1) || (status >= buffer_len)) return (-1); offset = status; @@ -541,10 +544,10 @@ static int value_list_to_string (char *buffer, int buffer_len, return (-1); if (ds->ds[i].type == DS_TYPE_COUNTER) - status = snprintf (buffer + offset, buffer_len - offset, + status = ssnprintf (buffer + offset, buffer_len - offset, ":%llu", vl->values[i].counter); else - status = snprintf (buffer + offset, buffer_len - offset, + status = ssnprintf (buffer + offset, buffer_len - offset, ":%lf", vl->values[i].gauge); if ((status < 1) || (status >= (buffer_len - offset))) @@ -564,35 +567,35 @@ static int value_list_to_filename (char *buffer, int buffer_len, if (datadir != NULL) { - status = snprintf (buffer + offset, buffer_len - offset, + status = ssnprintf (buffer + offset, buffer_len - offset, "%s/", datadir); if ((status < 1) || (status >= buffer_len - offset)) return (-1); offset += status; } - status = snprintf (buffer + offset, buffer_len - offset, + status = ssnprintf (buffer + offset, buffer_len - offset, "%s/", vl->host); if ((status < 1) || (status >= buffer_len - offset)) return (-1); offset += status; if (strlen (vl->plugin_instance) > 0) - status = snprintf (buffer + offset, buffer_len - offset, + status = ssnprintf (buffer + offset, buffer_len - offset, "%s-%s/", vl->plugin, vl->plugin_instance); else - status = snprintf (buffer + offset, buffer_len - offset, + status = ssnprintf (buffer + offset, buffer_len - offset, "%s/", vl->plugin); if ((status < 1) || (status >= buffer_len - offset)) return (-1); offset += status; if (strlen (vl->type_instance) > 0) - status = snprintf (buffer + offset, buffer_len - offset, - "%s-%s.rrd", ds->type, vl->type_instance); + status = ssnprintf (buffer + offset, buffer_len - offset, + "%s-%s.rrd", vl->type, vl->type_instance); else - status = snprintf (buffer + offset, buffer_len - offset, - "%s.rrd", ds->type); + status = ssnprintf (buffer + offset, buffer_len - offset, + "%s.rrd", vl->type); if ((status < 1) || (status >= buffer_len - offset)) return (-1); offset += status; @@ -639,7 +642,7 @@ static void *rrd_queue_thread (void *data) * we make a copy of it's values */ pthread_mutex_lock (&cache_lock); - avl_get (cache, queue_entry->filename, (void *) &cache_entry); + c_avl_get (cache, queue_entry->filename, (void *) &cache_entry); values = cache_entry->values; values_num = cache_entry->values_num; @@ -651,7 +654,8 @@ static void *rrd_queue_thread (void *data) pthread_mutex_unlock (&cache_lock); /* Write the values to the RRD-file */ - srrd_update (queue_entry->filename, NULL, values_num, values); + srrd_update (queue_entry->filename, NULL, + values_num, (const char **)values); DEBUG ("rrdtool plugin: queue thread: Wrote %i values to %s", values_num, queue_entry->filename); @@ -665,7 +669,7 @@ static void *rrd_queue_thread (void *data) } /* while (42) */ pthread_mutex_lock (&cache_lock); - avl_destroy (cache); + c_avl_destroy (cache); cache = NULL; pthread_mutex_unlock (&cache_lock); @@ -673,37 +677,74 @@ static void *rrd_queue_thread (void *data) return ((void *) 0); } /* void *rrd_queue_thread */ -static int rrd_queue_cache_entry (const char *filename) +static int rrd_queue_cache_entry (const char *filename, rrd_queue_dir_t dir) { - rrd_queue_t *queue_entry; - - queue_entry = (rrd_queue_t *) malloc (sizeof (rrd_queue_t)); - if (queue_entry == NULL) - return (-1); - - queue_entry->filename = strdup (filename); - if (queue_entry->filename == NULL) - { - free (queue_entry); - return (-1); - } - - queue_entry->next = NULL; - - pthread_mutex_lock (&queue_lock); - if (queue_tail == NULL) - queue_head = queue_entry; - else - queue_tail->next = queue_entry; - queue_tail = queue_entry; - pthread_cond_signal (&queue_cond); - pthread_mutex_unlock (&queue_lock); - - DEBUG ("rrdtool plugin: Put `%s' into the update queue", filename); - - return (0); + rrd_queue_t *queue_entry; + + queue_entry = (rrd_queue_t *) malloc (sizeof (rrd_queue_t)); + if (queue_entry == NULL) + return (-1); + + queue_entry->filename = strdup (filename); + if (queue_entry->filename == NULL) + { + free (queue_entry); + return (-1); + } + + queue_entry->next = NULL; + + pthread_mutex_lock (&queue_lock); + if (dir == QUEUE_INSERT_FRONT) + { + queue_entry->next = queue_head; + queue_head = queue_entry; + if (queue_tail == NULL) + queue_tail = queue_head; + } + else /* (dir == QUEUE_INSERT_BACK) */ + { + if (queue_tail == NULL) + queue_head = queue_entry; + else + queue_tail->next = queue_entry; + queue_tail = queue_entry; + } + pthread_cond_signal (&queue_cond); + pthread_mutex_unlock (&queue_lock); + + DEBUG ("rrdtool plugin: Put `%s' into the update queue", filename); + + return (0); } /* int rrd_queue_cache_entry */ +static int rrd_queue_move_to_front (const char *filename) +{ + rrd_queue_t *this; + rrd_queue_t *prev; + + this = NULL; + prev = NULL; + pthread_mutex_lock (&queue_lock); + for (this = queue_head; this != NULL; this = this->next) + { + if (strcmp (this->filename, filename) == 0) + break; + prev = this; + } + + /* Check if we found the entry and if it is NOT the first entry. */ + if ((this != NULL) && (prev != NULL)) + { + prev->next = this->next; + this->next = queue_head; + queue_head = this; + } + pthread_mutex_unlock (&queue_lock); + + return (0); +} /* int rrd_queue_move_to_front */ + static void rrd_cache_flush (int timeout) { rrd_cache_t *rc; @@ -713,7 +754,7 @@ static void rrd_cache_flush (int timeout) int keys_num = 0; char *key; - avl_iterator_t *iter; + c_avl_iterator_t *iter; int i; DEBUG ("rrdtool plugin: Flushing cache, timeout = %i", timeout); @@ -721,8 +762,8 @@ static void rrd_cache_flush (int timeout) now = time (NULL); /* Build a list of entries to be flushed */ - iter = avl_get_iterator (cache); - while (avl_iterator_next (iter, (void *) &key, (void *) &rc) == 0) + iter = c_avl_get_iterator (cache); + while (c_avl_iterator_next (iter, (void *) &key, (void *) &rc) == 0) { if (rc->flags == FLAG_QUEUED) continue; @@ -730,34 +771,36 @@ static void rrd_cache_flush (int timeout) continue; else if (rc->values_num > 0) { - if (rrd_queue_cache_entry (key) == 0) + if (rrd_queue_cache_entry (key, QUEUE_INSERT_BACK) == 0) rc->flags = FLAG_QUEUED; } else /* ancient and no values -> waste of memory */ { - keys = (char **) realloc ((void *) keys, + char **tmp = (char **) realloc ((void *) keys, (keys_num + 1) * sizeof (char *)); - if (keys == NULL) + if (tmp == NULL) { char errbuf[1024]; ERROR ("rrdtool plugin: " "realloc failed: %s", sstrerror (errno, errbuf, sizeof (errbuf))); - avl_iterator_destroy (iter); + c_avl_iterator_destroy (iter); + sfree (keys); return; } + keys = tmp; keys[keys_num] = key; keys_num++; } - } /* while (avl_iterator_next) */ - avl_iterator_destroy (iter); + } /* while (c_avl_iterator_next) */ + c_avl_iterator_destroy (iter); for (i = 0; i < keys_num; i++) { - if (avl_remove (cache, keys[i], (void *) &key, (void *) &rc) != 0) + if (c_avl_remove (cache, keys[i], (void *) &key, (void *) &rc) != 0) { - DEBUG ("rrdtool plugin: avl_remove (%s) failed.", keys[i]); + DEBUG ("rrdtool plugin: c_avl_remove (%s) failed.", keys[i]); continue; } @@ -769,11 +812,57 @@ static void rrd_cache_flush (int timeout) keys[i] = NULL; } /* for (i = 0..keys_num) */ - free (keys); + sfree (keys); cache_flush_last = now; } /* void rrd_cache_flush */ +static int rrd_cache_flush_identifier (int timeout, const char *identifier) +{ + rrd_cache_t *rc; + time_t now; + int status; + char key[2048]; + + if (identifier == NULL) + { + rrd_cache_flush (timeout); + return (0); + } + + now = time (NULL); + + if (datadir == NULL) + snprintf (key, sizeof (key), "%s.rrd", + identifier); + else + snprintf (key, sizeof (key), "%s/%s.rrd", + datadir, identifier); + key[sizeof (key) - 1] = 0; + + status = c_avl_get (cache, key, (void *) &rc); + if (status != 0) + { + WARNING ("rrdtool plugin: rrd_cache_flush_identifier: " + "c_avl_get (%s) failed. Does that file really exist?", + key); + return (status); + } + + if (rc->flags == FLAG_QUEUED) + status = rrd_queue_move_to_front (key); + else if ((now - rc->first_value) < timeout) + status = 0; + else if (rc->values_num > 0) + { + status = rrd_queue_cache_entry (key, QUEUE_INSERT_FRONT); + if (status == 0) + rc->flags = FLAG_QUEUED; + } + + return (status); +} /* int rrd_cache_flush_identifier */ + static int rrd_cache_insert (const char *filename, const char *value, time_t value_time) { @@ -783,7 +872,7 @@ static int rrd_cache_insert (const char *filename, pthread_mutex_lock (&cache_lock); - avl_get (cache, filename, (void *) &rc); + c_avl_get (cache, filename, (void *) &rc); if (rc == NULL) { @@ -816,7 +905,7 @@ static int rrd_cache_insert (const char *filename, sstrerror (errno, errbuf, sizeof (errbuf)); - avl_remove (cache, filename, &cache_key, NULL); + c_avl_remove (cache, filename, &cache_key, NULL); pthread_mutex_unlock (&cache_lock); ERROR ("rrdtool plugin: realloc failed: %s", errbuf); @@ -856,13 +945,13 @@ static int rrd_cache_insert (const char *filename, return (-1); } - avl_insert (cache, cache_key, rc); + c_avl_insert (cache, cache_key, rc); } DEBUG ("rrdtool plugin: rrd_cache_insert: file = %s; " - "values_num = %i; age = %u;", + "values_num = %i; age = %lu;", filename, rc->values_num, - rc->last_value - rc->first_value); + (unsigned long)(rc->last_value - rc->first_value)); if ((rc->last_value - rc->first_value) >= cache_timeout) { @@ -870,7 +959,7 @@ static int rrd_cache_insert (const char *filename, * the same time, ALWAYS lock `cache_lock' first! */ if (rc->flags != FLAG_QUEUED) { - if (rrd_queue_cache_entry (filename) == 0) + if (rrd_queue_cache_entry (filename, QUEUE_INSERT_BACK) == 0) rc->flags = FLAG_QUEUED; } else @@ -889,6 +978,19 @@ static int rrd_cache_insert (const char *filename, return (0); } /* int rrd_cache_insert */ +static int rrd_compare_numeric (const void *a_ptr, const void *b_ptr) +{ + int a = *((int *) a_ptr); + int b = *((int *) b_ptr); + + if (a < b) + return (-1); + else if (a > b) + return (1); + else + return (0); +} /* int rrd_compare_numeric */ + static int rrd_write (const data_set_t *ds, const value_list_t *vl) { struct stat statbuf; @@ -896,6 +998,11 @@ static int rrd_write (const data_set_t *ds, const value_list_t *vl) char values[512]; int status; + if (0 != strcmp (ds->type, vl->type)) { + ERROR ("rrdtool plugin: DS type does not match value list type"); + return -1; + } + if (value_list_to_filename (filename, sizeof (filename), ds, vl) != 0) return (-1); @@ -930,6 +1037,21 @@ static int rrd_write (const data_set_t *ds, const value_list_t *vl) return (status); } /* int rrd_write */ +static int rrd_flush (int timeout, const char *identifier) +{ + pthread_mutex_lock (&cache_lock); + + if (cache == NULL) { + pthread_mutex_unlock (&cache_lock); + return (0); + } + + rrd_cache_flush_identifier (timeout, identifier); + + pthread_mutex_unlock (&cache_lock); + return (0); +} /* int rrd_flush */ + static int rrd_config (const char *key, const char *value) { if (strcasecmp ("CacheTimeout", key) == 0) @@ -1027,6 +1149,12 @@ static int rrd_config (const char *key, const char *value) if (rra_timespans_custom[rra_timespans_custom_num] != 0) rra_timespans_custom_num++; } /* while (strtok_r) */ + + qsort (/* base = */ rra_timespans_custom, + /* nmemb = */ rra_timespans_custom_num, + /* size = */ sizeof (rra_timespans_custom[0]), + /* compar = */ rrd_compare_numeric); + free (value_copy); } else if (strcasecmp ("XFF", key) == 0) @@ -1076,12 +1204,7 @@ static int rrd_init (void) if (stepsize < 0) stepsize = 0; if (heartbeat <= 0) - { - if (stepsize > 0) - heartbeat = 2 * stepsize; - else - heartbeat = 0; - } + heartbeat = 2 * stepsize; if ((heartbeat > 0) && (heartbeat < interval_g)) WARNING ("rrdtool plugin: Your `heartbeat' is " @@ -1095,10 +1218,10 @@ static int rrd_init (void) /* Set the cache up */ pthread_mutex_lock (&cache_lock); - cache = avl_create ((int (*) (const void *, const void *)) strcmp); + cache = c_avl_create ((int (*) (const void *, const void *)) strcmp); if (cache == NULL) { - ERROR ("rrdtool plugin: avl_create failed."); + ERROR ("rrdtool plugin: c_avl_create failed."); return (-1); } @@ -1134,5 +1257,6 @@ void module_register (void) config_keys, config_keys_num); plugin_register_init ("rrdtool", rrd_init); plugin_register_write ("rrdtool", rrd_write); + plugin_register_flush ("rrdtool", rrd_flush); plugin_register_shutdown ("rrdtool", rrd_shutdown); }