X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Fplugin.c;h=b60c04ba127f32c4603dd4f08ecfc64ef155d67e;hb=ecbcaadde7dff838f09dfb04c7eb90f126e35abf;hp=894b0e51d72731ba1f7faab5c032a122063e85be;hpb=e9cc06595b478d522773be7cd03423f84f8d43b6;p=collectd.git diff --git a/src/plugin.c b/src/plugin.c index 894b0e51..b60c04ba 100644 --- a/src/plugin.c +++ b/src/plugin.c @@ -31,6 +31,7 @@ #include "utils_llist.h" #include "utils_heap.h" #include "utils_time.h" +#include "utils_random.h" #if HAVE_PTHREAD_H # include @@ -108,6 +109,7 @@ static int read_threads_num = 0; static write_queue_t *write_queue_head; static write_queue_t *write_queue_tail; +static long write_queue_length = 0; static _Bool write_loop = 1; static pthread_mutex_t write_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t write_cond = PTHREAD_COND_INITIALIZER; @@ -117,6 +119,9 @@ static size_t write_threads_num = 0; static pthread_key_t plugin_ctx_key; static _Bool plugin_ctx_key_initialized = 0; +static long write_limit_high = 0; +static long write_limit_low = 0; + /* * Static functions */ @@ -334,7 +339,7 @@ static int plugin_load_file (char *file, uint32_t flags) ssnprintf (errbuf, sizeof (errbuf), "lt_dlopen (\"%s\") failed: %s. " - "The most common cause for this problem are " + "The most common cause for this problem is " "missing dependencies. Use ldd(1) to check " "the dependencies of the plugin " "/ shared object.", @@ -670,11 +675,13 @@ static int plugin_write_enqueue (value_list_t const *vl) /* {{{ */ { write_queue_head = q; write_queue_tail = q; + write_queue_length = 1; } else { write_queue_tail->next = q; write_queue_tail = q; + write_queue_length += 1; } pthread_cond_signal (&write_cond); @@ -701,8 +708,11 @@ static value_list_t *plugin_write_dequeue (void) /* {{{ */ q = write_queue_head; write_queue_head = q->next; - if (write_queue_head == NULL) + write_queue_length -= 1; + if (write_queue_head == NULL) { write_queue_tail = NULL; + assert(0 == write_queue_length); + } pthread_mutex_unlock (&write_lock); @@ -805,6 +815,7 @@ static void stop_write_threads (void) /* {{{ */ } write_queue_head = NULL; write_queue_tail = NULL; + write_queue_length = 0; pthread_mutex_unlock (&write_lock); if (i > 0) @@ -1206,6 +1217,27 @@ int plugin_register_shutdown (const char *name, (void *) callback, /* user_data = */ NULL)); } /* int plugin_register_shutdown */ +static void plugin_free_data_sets (void) +{ + void *key; + void *value; + + if (data_sets == NULL) + return; + + while (c_avl_pick (data_sets, &key, &value) == 0) + { + data_set_t *ds = value; + /* key is a pointer to ds->type */ + + sfree (ds->ds); + sfree (ds); + } + + c_avl_destroy (data_sets); + data_sets = NULL; +} /* void plugin_free_data_sets */ + int plugin_register_data_set (const data_set_t *ds) { data_set_t *ds_copy; @@ -1423,7 +1455,8 @@ int plugin_unregister_notification (const char *name) void plugin_init_all (void) { - const char *chain_name; + char const *chain_name; + long write_threads_num; llentry_t *le; int status; @@ -1436,16 +1469,38 @@ void plugin_init_all (void) chain_name = global_option_get ("PostCacheChain"); post_cache_chain = fc_chain_get_by_name (chain_name); + write_limit_high = global_option_get_long ("WriteQueueLimitHigh", + /* default = */ 0); + if (write_limit_high < 0) { - char const *tmp = global_option_get ("WriteThreads"); - int num = atoi (tmp); + ERROR ("WriteQueueLimitHigh must be positive or zero."); + write_limit_high = 0; + } - if (num < 1) - num = 5; + write_limit_low = global_option_get_long ("WriteQueueLimitLow", + /* default = */ write_limit_high / 2); + if (write_limit_low < 0) + { + ERROR ("WriteQueueLimitLow must be positive or zero."); + write_limit_low = write_limit_high / 2; + } + else if (write_limit_low > write_limit_high) + { + ERROR ("WriteQueueLimitLow must not be larger than " + "WriteQueueLimitHigh."); + write_limit_low = write_limit_high; + } - start_write_threads ((size_t) num); + write_threads_num = global_option_get_long ("WriteThreads", + /* default = */ 5); + if (write_threads_num < 1) + { + ERROR ("WriteThreads must be positive."); + write_threads_num = 5; } + start_write_threads ((size_t) write_threads_num); + if ((list_init == NULL) && (read_heap == NULL)) return; @@ -1730,6 +1785,7 @@ void plugin_shutdown_all (void) destroy_all_callbacks (&list_log); plugin_free_loaded (); + plugin_free_data_sets (); } /* void plugin_shutdown_all */ int plugin_dispatch_missing (const value_list_t *vl) /* {{{ */ @@ -1950,10 +2006,76 @@ static int plugin_dispatch_values_internal (value_list_t *vl) return (0); } /* int plugin_dispatch_values_internal */ +static double get_drop_probability (void) /* {{{ */ +{ + long pos; + long size; + long wql; + + pthread_mutex_lock (&write_lock); + wql = write_queue_length; + pthread_mutex_unlock (&write_lock); + + if (wql < write_limit_low) + return (0.0); + if (wql >= write_limit_high) + return (1.0); + + pos = 1 + wql - write_limit_low; + size = 1 + write_limit_high - write_limit_low; + + return (((double) pos) / ((double) size)); +} /* }}} double get_drop_probability */ + +static _Bool check_drop_value (void) /* {{{ */ +{ + static cdtime_t last_message_time = 0; + static pthread_mutex_t last_message_lock = PTHREAD_MUTEX_INITIALIZER; + + double p; + double q; + int status; + + if (write_limit_high == 0) + return (0); + + p = get_drop_probability (); + if (p == 0.0) + return (0); + + status = pthread_mutex_trylock (&last_message_lock); + if (status == 0) + { + cdtime_t now; + + now = cdtime (); + if ((now - last_message_time) > TIME_T_TO_CDTIME_T (1)) + { + last_message_time = now; + ERROR ("plugin_dispatch_values: Low water mark " + "reached. Dropping %.0f%% of metrics.", + 100.0 * p); + } + pthread_mutex_unlock (&last_message_lock); + } + + if (p == 1.0) + return (1); + + q = cdrand_d (); + if (q > p) + return (1); + else + return (0); +} /* }}} _Bool check_drop_value */ + int plugin_dispatch_values (value_list_t const *vl) { int status; + if (check_drop_value ()) + return (0); + status = plugin_write_enqueue (vl); if (status != 0) {