X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Frrdtool.c;h=e356bf19d6d91fb4a275fffb840df0c6f3827a4b;hb=939a113b288dfdee7df2263c682d27d7146c6013;hp=4d4b87cbeaa38b63201dcb12153596caaa04a1dc;hpb=e4080f7aea79ffa3ad21fb2c8e6be311b1a404b9;p=collectd.git diff --git a/src/rrdtool.c b/src/rrdtool.c index 4d4b87cb..e356bf19 100644 --- a/src/rrdtool.c +++ b/src/rrdtool.c @@ -24,6 +24,8 @@ #include "common.h" #include "utils_avltree.h" +#include + #if HAVE_PTHREAD_H # include #endif @@ -112,6 +114,10 @@ static pthread_t queue_thread = 0; static pthread_mutex_t queue_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER; +#if !HAVE_THREADSAFE_LIBRRD +static pthread_mutex_t librrd_lock = PTHREAD_MUTEX_INITIALIZER; +#endif + static int do_shutdown = 0; /* * * * * * * * * * @@ -327,6 +333,143 @@ static int ds_get (char ***ret, const data_set_t *ds, const value_list_t *vl) return (ds_num); } +#if HAVE_THREADSAFE_LIBRRD +static int srrd_create (char *filename, unsigned long pdp_step, time_t last_up, + int argc, char **argv) +{ + int status; + + optind = 0; /* bug in librrd? */ + rrd_clear_error (); + + status = rrd_create_r (filename, pdp_step, last_up, argc, argv); + + if (status != 0) + { + WARNING ("rrdtool plugin: rrd_create_r (%s) failed: %s", + filename, rrd_get_error ()); + } + + return (status); +} /* int srrd_create */ + +static int srrd_update (char *filename, char *template, int argc, char **argv) +{ + int status; + + optind = 0; /* bug in librrd? */ + rrd_clear_error (); + + status = rrd_update_r (filename, template, argc, argv); + + if (status != 0) + { + WARNING ("rrdtool plugin: rrd_update_r (%s) failed: %s", + filename, rrd_get_error ()); + } + + return (status); +} /* int srrd_update */ +/* #endif HAVE_THREADSAFE_LIBRRD */ + +#else /* !HAVE_THREADSAFE_LIBRRD */ +static int srrd_create (char *filename, unsigned long pdp_step, time_t last_up, + int argc, char **argv) +{ + int status; + + int new_argc; + char **new_argv; + + char pdp_step_str[16]; + char last_up_str[16]; + + new_argc = 6 + argc; + new_argv = (char **) malloc ((new_argc + 1) * sizeof (char *)); + if (new_argv == NULL) + { + ERROR ("rrdtool plugin: malloc failed."); + return (-1); + } + + if (last_up == 0) + last_up = time (NULL) - 10; + + snprintf (pdp_step_str, sizeof (pdp_step_str), "%lu", pdp_step); + pdp_step_str[sizeof (pdp_step_str) - 1] = '\0'; + snprintf (last_up_str, sizeof (last_up_str), "%u", (unsigned int) last_up); + last_up_str[sizeof (last_up_str) - 1] = '\0'; + + new_argv[0] = "create"; + new_argv[1] = filename; + new_argv[2] = "-s"; + new_argv[3] = pdp_step_str; + new_argv[4] = "-b"; + new_argv[5] = last_up_str; + + memcpy (new_argv + 6, argv, argc * sizeof (char *)); + new_argv[new_argc] = NULL; + + pthread_mutex_lock (&librrd_lock); + optind = 0; /* bug in librrd? */ + rrd_clear_error (); + + status = rrd_create (new_argc, new_argv); + pthread_mutex_unlock (&librrd_lock); + + if (status != 0) + { + WARNING ("rrdtool plugin: rrd_create (%s) failed: %s", + filename, rrd_get_error ()); + } + + sfree (new_argv); + + return (status); +} /* int srrd_create */ + +static int srrd_update (char *filename, char *template, int argc, char **argv) +{ + int status; + + int new_argc; + char **new_argv; + + assert (template == NULL); + + new_argc = 2 + argc; + new_argv = (char **) malloc ((new_argc + 1) * sizeof (char *)); + if (new_argv == NULL) + { + ERROR ("rrdtool plugin: malloc failed."); + return (-1); + } + + new_argv[0] = "update"; + new_argv[1] = filename; + + memcpy (new_argv + 2, argv, argc * sizeof (char *)); + new_argv[new_argc] = NULL; + + pthread_mutex_lock (&librrd_lock); + optind = 0; /* bug in librrd? */ + rrd_clear_error (); + + status = rrd_update (new_argc, new_argv); + pthread_mutex_unlock (&librrd_lock); + + if (status != 0) + { + WARNING ("rrdtool plugin: rrd_update_r failed: %s: %s", + argv[1], rrd_get_error ()); + } + + sfree (new_argv); + + return (status); +} /* int srrd_update */ +#endif /* !HAVE_THREADSAFE_LIBRRD */ + static int rrd_create_file (char *filename, const data_set_t *ds, const value_list_t *vl) { char **argv; @@ -335,8 +478,6 @@ static int rrd_create_file (char *filename, const data_set_t *ds, const value_li int rra_num; char **ds_def; int ds_num; - int i, j; - char stepsize_str[16]; int status = 0; if (check_create_dir (filename)) @@ -354,7 +495,7 @@ static int rrd_create_file (char *filename, const data_set_t *ds, const value_li return (-1); } - argc = ds_num + rra_num + 4; + argc = ds_num + rra_num; if ((argv = (char **) malloc (sizeof (char *) * (argc + 1))) == NULL) { @@ -364,36 +505,15 @@ static int rrd_create_file (char *filename, const data_set_t *ds, const value_li return (-1); } - status = snprintf (stepsize_str, sizeof (stepsize_str), - "%i", (stepsize > 0) ? stepsize : vl->interval); - if ((status < 1) || (status >= sizeof (stepsize_str))) - { - ERROR ("rrdtool plugin: snprintf failed."); - free (argv); - ds_free (ds_num, ds_def); - rra_free (rra_num, rra_def); - return (-1); - } - - argv[0] = "create"; - argv[1] = filename; - argv[2] = "-s"; - argv[3] = stepsize_str; + memcpy (argv, ds_def, ds_num * sizeof (char *)); + memcpy (argv + ds_num, rra_def, rra_num * sizeof (char *)); + argv[ds_num + rra_num] = NULL; - j = 4; - for (i = 0; i < ds_num; i++) - argv[j++] = ds_def[i]; - for (i = 0; i < rra_num; i++) - argv[j++] = rra_def[i]; - argv[j] = NULL; - - optind = 0; /* bug in librrd? */ - rrd_clear_error (); - if (rrd_create (argc, argv) == -1) - { - ERROR ("rrd_create failed: %s: %s", filename, rrd_get_error ()); - status = -1; - } + assert (vl->time > 10); + status = srrd_create (filename, + (stepsize > 0) ? stepsize : vl->interval, + vl->time - 10, + argc, argv); free (argv); ds_free (ds_num, ds_def); @@ -482,42 +602,6 @@ static int value_list_to_filename (char *buffer, int buffer_len, return (0); } /* int value_list_to_filename */ -static int rrd_write_to_file (char *filename, char **values, int values_num) -{ - char **argv; - int argc; - int status; - - if (values_num < 1) - return (0); - - argc = values_num + 2; - argv = (char **) malloc ((argc + 1) * sizeof (char *)); - if (argv == NULL) - return (-1); - - argv[0] = "update"; - argv[1] = filename; - memcpy (argv + 2, values, values_num * sizeof (char *)); - argv[argc] = NULL; - - DEBUG ("rrd_update (argc = %i, argv = %p)", argc, (void *) argv); - - optind = 0; /* bug in librrd? */ - rrd_clear_error (); - status = rrd_update (argc, argv); - if (status != 0) - { - WARNING ("rrd_update failed: %s: %s", - filename, rrd_get_error ()); - status = -1; - } - - sfree (argv); - - return (status); -} /* int rrd_write_cache_entry */ - static void *rrd_queue_thread (void *data) { while (42) @@ -569,7 +653,9 @@ static void *rrd_queue_thread (void *data) pthread_mutex_unlock (&cache_lock); /* Write the values to the RRD-file */ - rrd_write_to_file (queue_entry->filename, values, values_num); + srrd_update (queue_entry->filename, NULL, values_num, values); + DEBUG ("rrdtool plugin: queue thread: Wrote %i values to %s", + values_num, queue_entry->filename); for (i = 0; i < values_num; i++) { @@ -632,7 +718,7 @@ static void rrd_cache_flush (int timeout) avl_iterator_t *iter; int i; - DEBUG ("Flushing cache, timeout = %i", timeout); + DEBUG ("rrdtool plugin: Flushing cache, timeout = %i", timeout); now = time (NULL); @@ -640,8 +726,6 @@ static void rrd_cache_flush (int timeout) iter = avl_get_iterator (cache); while (avl_iterator_next (iter, (void *) &key, (void *) &rc) == 0) { - DEBUG ("key = %s; age = %i;", key, now - rc->first_value); - if (rc->flags == FLAG_QUEUED) continue; else if ((now - rc->first_value) < timeout) @@ -675,7 +759,7 @@ static void rrd_cache_flush (int timeout) { if (avl_remove (cache, keys[i], (void *) &key, (void *) &rc) != 0) { - DEBUG ("avl_remove (%s) failed.", keys[i]); + DEBUG ("rrdtool plugin: avl_remove (%s) failed.", keys[i]); continue; } @@ -688,7 +772,6 @@ static void rrd_cache_flush (int timeout) } /* for (i = 0..keys_num) */ free (keys); - DEBUG ("Flushed %i value(s)", keys_num); cache_flush_last = now; } /* void rrd_cache_flush */ @@ -778,8 +861,10 @@ static int rrd_cache_insert (const char *filename, avl_insert (cache, cache_key, rc); } - DEBUG ("rrd_cache_insert (%s, %s, %u) = %p", filename, value, - (unsigned int) value_time, (void *) rc); + DEBUG ("rrdtool plugin: rrd_cache_insert: file = %s; " + "values_num = %i; age = %u;", + filename, rc->values_num, + rc->last_value - rc->first_value); if ((rc->last_value - rc->first_value) >= cache_timeout) { @@ -975,6 +1060,14 @@ static int rrd_shutdown (void) pthread_cond_signal (&queue_cond); pthread_mutex_unlock (&queue_lock); + /* Wait for all the values to be written to disk before returning. */ + if (queue_thread != 0) + { + pthread_join (queue_thread, NULL); + queue_thread = 0; + DEBUG ("rrdtool plugin: queue_thread exited."); + } + return (0); } /* int rrd_shutdown */