X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Fplugin.c;h=d3767d1283b9678bec65e8665d5feb7b8c64bef7;hb=96e0f2341bace029acefe0a88bab96ae326c0ff5;hp=70372342927c9fd2264cb70952fbd308d8317150;hpb=ea8ad6fe7f5b6af5889328b1fa87558cbc51d6ef;p=collectd.git diff --git a/src/plugin.c b/src/plugin.c index 70372342..d3767d12 100644 --- a/src/plugin.c +++ b/src/plugin.c @@ -21,22 +21,22 @@ **/ #include "collectd.h" -#include "utils_complain.h" - -#include - -#if HAVE_PTHREAD_H -# include -#endif - #include "common.h" #include "plugin.h" #include "configfile.h" +#include "filter_chain.h" #include "utils_avltree.h" +#include "utils_cache.h" +#include "utils_complain.h" #include "utils_llist.h" #include "utils_heap.h" -#include "utils_cache.h" -#include "filter_chain.h" +#include "utils_time.h" + +#if HAVE_PTHREAD_H +# include +#endif + +#include /* * Private structures @@ -63,9 +63,9 @@ struct read_func_s char rf_group[DATA_MAX_NAME_LEN]; char rf_name[DATA_MAX_NAME_LEN]; int rf_type; - struct timespec rf_interval; - struct timespec rf_effective_interval; - struct timespec rf_next_read; + cdtime_t rf_interval; + cdtime_t rf_effective_interval; + cdtime_t rf_next_read; }; typedef struct read_func_s read_func_t; @@ -360,13 +360,6 @@ static int plugin_load_file (char *file, uint32_t flags) return (0); } -static _Bool timeout_reached(struct timespec timeout) -{ - struct timeval now; - gettimeofday(&now, NULL); - return (now.tv_sec >= timeout.tv_sec && now.tv_usec >= (timeout.tv_nsec / 1000)); -} - static void *plugin_read_thread (void __attribute__((unused)) *args) { while (read_loop != 0) @@ -392,18 +385,15 @@ static void *plugin_read_thread (void __attribute__((unused)) *args) } pthread_mutex_unlock (&read_lock); - if ((rf->rf_interval.tv_sec == 0) && (rf->rf_interval.tv_nsec == 0)) + if (rf->rf_interval == 0) { /* this should not happen, because the interval is set * for each plugin when loading it * XXX: issue a warning? */ - now = cdtime (); - - CDTIME_T_TO_TIMESPEC (plugin_get_interval (), &rf->rf_interval); - + rf->rf_interval = plugin_get_interval (); rf->rf_effective_interval = rf->rf_interval; - CDTIME_T_TO_TIMESPEC (now, &rf->rf_next_read); + rf->rf_next_read = cdtime (); } /* sleep until this entry is due, @@ -415,11 +405,15 @@ static void *plugin_read_thread (void __attribute__((unused)) *args) * pthread_cond_timedwait returns. */ rc = 0; while ((read_loop != 0) - && !timeout_reached(rf->rf_next_read) + && (cdtime () < rf->rf_next_read) && rc == 0) { + struct timespec ts = { 0 }; + + CDTIME_T_TO_TIMESPEC (rf->rf_next_read, &ts); + rc = pthread_cond_timedwait (&read_cond, &read_lock, - &rf->rf_next_read); + &ts); } /* Must hold `read_lock' when accessing `rf->rf_type'. */ @@ -475,20 +469,14 @@ static void *plugin_read_thread (void __attribute__((unused)) *args) * intervals in which it will be called. */ if (status != 0) { - rf->rf_effective_interval.tv_sec *= 2; - rf->rf_effective_interval.tv_nsec *= 2; - NORMALIZE_TIMESPEC (rf->rf_effective_interval); - - if (rf->rf_effective_interval.tv_sec >= 86400) - { - rf->rf_effective_interval.tv_sec = 86400; - rf->rf_effective_interval.tv_nsec = 0; - } + rf->rf_effective_interval *= 2; + if (rf->rf_effective_interval > TIME_T_TO_CDTIME_T (86400)) + rf->rf_effective_interval = TIME_T_TO_CDTIME_T (86400); NOTICE ("read-function of plugin `%s' failed. " - "Will suspend it for %i seconds.", + "Will suspend it for %.3f seconds.", rf->rf_name, - (int) rf->rf_effective_interval.tv_sec); + CDTIME_T_TO_DOUBLE (rf->rf_effective_interval)); } else { @@ -500,32 +488,26 @@ static void *plugin_read_thread (void __attribute__((unused)) *args) now = cdtime (); DEBUG ("plugin_read_thread: Effective interval of the " - "%s plugin is %i.%09i.", + "%s plugin is %.3f seconds.", rf->rf_name, - (int) rf->rf_effective_interval.tv_sec, - (int) rf->rf_effective_interval.tv_nsec); + CDTIME_T_TO_DOUBLE (rf->rf_effective_interval)); /* Calculate the next (absolute) time at which this function * should be called. */ - rf->rf_next_read.tv_sec = rf->rf_next_read.tv_sec - + rf->rf_effective_interval.tv_sec; - rf->rf_next_read.tv_nsec = rf->rf_next_read.tv_nsec - + rf->rf_effective_interval.tv_nsec; - NORMALIZE_TIMESPEC (rf->rf_next_read); + rf->rf_next_read += rf->rf_effective_interval; /* Check, if `rf_next_read' is in the past. */ - if (TIMESPEC_TO_CDTIME_T (&rf->rf_next_read) < now) + if (rf->rf_next_read < now) { /* `rf_next_read' is in the past. Insert `now' * so this value doesn't trail off into the * past too much. */ - CDTIME_T_TO_TIMESPEC (now, &rf->rf_next_read); + rf->rf_next_read = now; } - DEBUG ("plugin_read_thread: Next read of the %s plugin at %i.%09i.", + DEBUG ("plugin_read_thread: Next read of the %s plugin at %.3f.", rf->rf_name, - (int) rf->rf_next_read.tv_sec, - (int) rf->rf_next_read.tv_nsec); + CDTIME_T_TO_DOUBLE (rf->rf_next_read)); /* Re-insert this read function into the heap again. */ c_heap_insert (read_heap, rf); @@ -810,10 +792,12 @@ static void stop_write_threads (void) /* {{{ */ pthread_mutex_lock (&write_lock); i = 0; - for (q = write_queue_head; q != NULL; q = q->next) + for (q = write_queue_head; q != NULL; ) { + write_queue_t *q1 = q; plugin_value_list_free (q->vl); - sfree (q); + q = q->next; + sfree (q1); i++; } write_queue_head = NULL; @@ -965,13 +949,9 @@ static int plugin_compare_read_func (const void *arg0, const void *arg1) rf0 = arg0; rf1 = arg1; - if (rf0->rf_next_read.tv_sec < rf1->rf_next_read.tv_sec) - return (-1); - else if (rf0->rf_next_read.tv_sec > rf1->rf_next_read.tv_sec) - return (1); - else if (rf0->rf_next_read.tv_nsec < rf1->rf_next_read.tv_nsec) + if (rf0->rf_next_read < rf1->rf_next_read) return (-1); - else if (rf0->rf_next_read.tv_nsec > rf1->rf_next_read.tv_nsec) + else if (rf0->rf_next_read > rf1->rf_next_read) return (1); else return (0); @@ -985,6 +965,9 @@ static int plugin_insert_read (read_func_t *rf) int status; llentry_t *le; + rf->rf_next_read = cdtime (); + rf->rf_effective_interval = rf->rf_interval; + pthread_mutex_lock (&read_lock); if (read_list == NULL) @@ -1046,43 +1029,12 @@ static int plugin_insert_read (read_func_t *rf) return (0); } /* int plugin_insert_read */ -static int read_cb_wrapper (user_data_t *ud) -{ - int (*callback) (void); - - if (ud == NULL) - return -1; - - callback = ud->data; - return callback(); -} /* int read_cb_wrapper */ - int plugin_register_read (const char *name, int (*callback) (void)) { read_func_t *rf; - plugin_ctx_t ctx = plugin_get_ctx (); int status; - if (ctx.interval != 0) { - /* If ctx.interval is not zero (== use the plugin or global - * interval), we need to use the "complex" read callback, - * because only that allows to specify a different interval. - * Wrap the callback using read_cb_wrapper(). */ - struct timespec interval; - user_data_t user_data; - - user_data.data = callback; - user_data.free_func = NULL; - - CDTIME_T_TO_TIMESPEC (ctx.interval, &interval); - return plugin_register_complex_read (/* group = */ NULL, - name, read_cb_wrapper, &interval, &user_data); - } - - DEBUG ("plugin_register_read: default_interval = %.3f", - CDTIME_T_TO_DOUBLE(plugin_get_interval ())); - rf = malloc (sizeof (*rf)); if (rf == NULL) { @@ -1094,13 +1046,11 @@ int plugin_register_read (const char *name, rf->rf_callback = (void *) callback; rf->rf_udata.data = NULL; rf->rf_udata.free_func = NULL; - rf->rf_ctx = ctx; + rf->rf_ctx = plugin_get_ctx (); rf->rf_group[0] = '\0'; sstrncpy (rf->rf_name, name, sizeof (rf->rf_name)); rf->rf_type = RF_SIMPLE; - rf->rf_interval.tv_sec = 0; - rf->rf_interval.tv_nsec = 0; - rf->rf_effective_interval = rf->rf_interval; + rf->rf_interval = plugin_get_interval (); status = plugin_insert_read (rf); if (status != 0) @@ -1115,7 +1065,6 @@ int plugin_register_complex_read (const char *group, const char *name, user_data_t *user_data) { read_func_t *rf; - plugin_ctx_t ctx = plugin_get_ctx (); int status; rf = malloc (sizeof (*rf)); @@ -1134,18 +1083,9 @@ int plugin_register_complex_read (const char *group, const char *name, sstrncpy (rf->rf_name, name, sizeof (rf->rf_name)); rf->rf_type = RF_COMPLEX; if (interval != NULL) - { - rf->rf_interval = *interval; - } - else if (ctx.interval != 0) - { - CDTIME_T_TO_TIMESPEC (ctx.interval, &rf->rf_interval); - } - rf->rf_effective_interval = rf->rf_interval; - - DEBUG ("plugin_register_read: interval = %i.%09i", - (int) rf->rf_interval.tv_sec, - (int) rf->rf_interval.tv_nsec); + rf->rf_interval = TIMESPEC_TO_CDTIME_T (interval); + else + rf->rf_interval = plugin_get_interval (); /* Set user data */ if (user_data == NULL) @@ -1158,7 +1098,7 @@ int plugin_register_complex_read (const char *group, const char *name, rf->rf_udata = *user_data; } - rf->rf_ctx = ctx; + rf->rf_ctx = plugin_get_ctx (); status = plugin_insert_read (rf); if (status != 0) @@ -1195,6 +1135,27 @@ int plugin_register_shutdown (const char *name, (void *) callback, /* user_data = */ NULL)); } /* int plugin_register_shutdown */ +static void plugin_free_data_sets (void) +{ + void *key; + void *value; + + if (data_sets == NULL) + return; + + while (c_avl_pick (data_sets, &key, &value) == 0) + { + data_set_t *ds = value; + /* key is a pointer to ds->type */ + + sfree (ds->ds); + sfree (ds); + } + + c_avl_destroy (data_sets); + data_sets = NULL; +} /* void plugin_free_data_sets */ + int plugin_register_data_set (const data_set_t *ds) { data_set_t *ds_copy; @@ -1717,6 +1678,8 @@ void plugin_shutdown_all (void) destroy_all_callbacks (&list_notification); destroy_all_callbacks (&list_shutdown); destroy_all_callbacks (&list_log); + + plugin_free_data_sets (); } /* void plugin_shutdown_all */ int plugin_dispatch_missing (const value_list_t *vl) /* {{{ */ @@ -2076,6 +2039,12 @@ const data_set_t *plugin_get_ds (const char *name) { data_set_t *ds; + if (data_sets == NULL) + { + ERROR ("plugin_get_ds: No data sets are defined yet."); + return (NULL); + } + if (c_avl_get (data_sets, name, (void *) &ds) != 0) { DEBUG ("No such dataset registered: %s", name);