X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Fplugin.c;h=0aee64667d200651b548573bb30138e5c04fb8bb;hb=4088bccc40644f45971304f4c73bc40dcfb87957;hp=894b0e51d72731ba1f7faab5c032a122063e85be;hpb=e9cc06595b478d522773be7cd03423f84f8d43b6;p=collectd.git diff --git a/src/plugin.c b/src/plugin.c index 894b0e51..0aee6466 100644 --- a/src/plugin.c +++ b/src/plugin.c @@ -31,6 +31,7 @@ #include "utils_llist.h" #include "utils_heap.h" #include "utils_time.h" +#include "utils_random.h" #if HAVE_PTHREAD_H # include @@ -108,6 +109,7 @@ static int read_threads_num = 0; static write_queue_t *write_queue_head; static write_queue_t *write_queue_tail; +static long write_queue_length = 0; static _Bool write_loop = 1; static pthread_mutex_t write_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t write_cond = PTHREAD_COND_INITIALIZER; @@ -117,6 +119,10 @@ static size_t write_threads_num = 0; static pthread_key_t plugin_ctx_key; static _Bool plugin_ctx_key_initialized = 0; +static long write_limit_high = 0; +static long write_limit_low = 0; +cdtime_t last_drop_time = 0; + /* * Static functions */ @@ -670,11 +676,13 @@ static int plugin_write_enqueue (value_list_t const *vl) /* {{{ */ { write_queue_head = q; write_queue_tail = q; + write_queue_length = 1; } else { write_queue_tail->next = q; write_queue_tail = q; + write_queue_length += 1; } pthread_cond_signal (&write_cond); @@ -701,8 +709,11 @@ static value_list_t *plugin_write_dequeue (void) /* {{{ */ q = write_queue_head; write_queue_head = q->next; - if (write_queue_head == NULL) + write_queue_length -= 1; + if (write_queue_head == NULL) { write_queue_tail = NULL; + assert(0 == write_queue_length); + } pthread_mutex_unlock (&write_lock); @@ -805,6 +816,7 @@ static void stop_write_threads (void) /* {{{ */ } write_queue_head = NULL; write_queue_tail = NULL; + write_queue_length = 0; pthread_mutex_unlock (&write_lock); if (i > 0) @@ -1436,6 +1448,9 @@ void plugin_init_all (void) chain_name = global_option_get ("PostCacheChain"); post_cache_chain = fc_chain_get_by_name (chain_name); + write_limit_high = global_option_get_long_in_range("WriteQueueLengthLimitHigh",0, 0, LONG_MAX); + write_limit_low = global_option_get_long_in_range("WriteQueueLengthLimitLow", (write_limit_high+1)/2, 0, (write_limit_high == 0) ? 0 : write_limit_high-1 ); + { char const *tmp = global_option_get ("WriteThreads"); int num = atoi (tmp); @@ -1950,10 +1965,55 @@ static int plugin_dispatch_values_internal (value_list_t *vl) return (0); } /* int plugin_dispatch_values_internal */ +static _Bool drop_metric(void) { + _Bool drop = 0; + int wq_len = write_queue_length; + /* We store write_queue_length in a local variable because other threads may update write_queue_length. + * Having this in a local variable (like a cache) is better : we do not need a lock */ + + if(wq_len < write_limit_low) return(0); + + if((write_limit_high > 0) && (wq_len > write_limit_low)) { + if(wq_len >= write_limit_high) { + /* if high == low, we come here too */ + drop = 1; + } else { + /* here, high != low */ + long probability_to_drop; + long n; + + probability_to_drop = (wq_len - write_limit_low); + + n = cdrand_range(write_limit_low, write_limit_high); + + /* Let's have X = high - low. + * n is in range [0..X] + * probability_to_drop is in range [1..X[ + * probability_to_drop gets bigger when wq_len gets bigger. + */ + if(n <= probability_to_drop) { + drop = 1; + } + } + } + if(drop) { + cdtime_t now = cdtime(); + if((now - last_drop_time) > TIME_T_TO_CDTIME_T (60)) { + last_drop_time = now; + /* If you want to count dropped metrics, don't forget to add a lock here */ + /* dropped_metrics++; */ + ERROR ("plugin_dispatch_values : Low water mark reached, dropping a metric"); + } + } + return(drop); +} + int plugin_dispatch_values (value_list_t const *vl) { int status; + if(drop_metric ()) return(0); + status = plugin_write_enqueue (vl); if (status != 0) {