#include "utils_llist.h"
#include "utils_heap.h"
#include "utils_time.h"
+#include "utils_random.h"
#if HAVE_PTHREAD_H
# include <pthread.h>
static write_queue_t *write_queue_head;
static write_queue_t *write_queue_tail;
+static long write_queue_length = 0;
static _Bool write_loop = 1;
static pthread_mutex_t write_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t write_cond = PTHREAD_COND_INITIALIZER;
static pthread_key_t plugin_ctx_key;
static _Bool plugin_ctx_key_initialized = 0;
+static long write_limit_high = 0;
+static long write_limit_low = 0;
+
/*
* Static functions
*/
{
write_queue_head = q;
write_queue_tail = q;
+ write_queue_length = 1;
}
else
{
write_queue_tail->next = q;
write_queue_tail = q;
+ write_queue_length += 1;
}
pthread_cond_signal (&write_cond);
q = write_queue_head;
write_queue_head = q->next;
- if (write_queue_head == NULL)
+ write_queue_length -= 1;
+ if (write_queue_head == NULL) {
write_queue_tail = NULL;
+ assert(0 == write_queue_length);
+ }
pthread_mutex_unlock (&write_lock);
}
write_queue_head = NULL;
write_queue_tail = NULL;
+ write_queue_length = 0;
pthread_mutex_unlock (&write_lock);
if (i > 0)
void plugin_init_all (void)
{
- const char *chain_name;
+ char const *chain_name;
+ long write_threads_num;
llentry_t *le;
int status;
chain_name = global_option_get ("PostCacheChain");
post_cache_chain = fc_chain_get_by_name (chain_name);
+ write_limit_high = global_option_get_long ("WriteQueueLimitHigh",
+ /* default = */ 0);
+ if (write_limit_high < 0)
{
- char const *tmp = global_option_get ("WriteThreads");
- int num = atoi (tmp);
+ ERROR ("WriteQueueLimitHigh must be positive or zero.");
+ write_limit_high = 0;
+ }
- if (num < 1)
- num = 5;
+ write_limit_low = global_option_get_long ("WriteQueueLimitLow",
+ /* default = */ write_limit_high / 2);
+ if (write_limit_low < 0)
+ {
+ ERROR ("WriteQueueLimitLow must be positive or zero.");
+ write_limit_low = write_limit_high / 2;
+ }
+ else if (write_limit_low > write_limit_high)
+ {
+ ERROR ("WriteQueueLimitLow must not be larger than "
+ "WriteQueueLimitHigh.");
+ write_limit_low = write_limit_high;
+ }
- start_write_threads ((size_t) num);
+ write_threads_num = global_option_get_long ("WriteThreads",
+ /* default = */ 5);
+ if (write_threads_num < 1)
+ {
+ ERROR ("WriteThreads must be positive.");
+ write_threads_num = 5;
}
+ start_write_threads ((size_t) write_threads_num);
+
if ((list_init == NULL) && (read_heap == NULL))
return;
return (0);
} /* int plugin_dispatch_values_internal */
+static double get_drop_probability (void) /* {{{ */
+{
+ long pos;
+ long size;
+ long wql;
+
+ pthread_mutex_lock (&write_lock);
+ wql = write_queue_length;
+ pthread_mutex_unlock (&write_lock);
+
+ if (wql < write_limit_low)
+ return (0.0);
+ if (wql >= write_limit_high)
+ return (1.0);
+
+ pos = 1 + wql - write_limit_low;
+ size = 1 + write_limit_high - write_limit_low;
+
+ return (((double) pos) / ((double) size));
+} /* }}} double get_drop_probability */
+
+static _Bool check_drop_value (void) /* {{{ */
+{
+ static cdtime_t last_message_time = 0;
+ static pthread_mutex_t last_message_lock = PTHREAD_MUTEX_INITIALIZER;
+
+ double p;
+ double q;
+ int status;
+
+ if (write_limit_high == 0)
+ return (0);
+
+ p = get_drop_probability ();
+ if (p == 0.0)
+ return (0);
+
+ status = pthread_mutex_trylock (&last_message_lock);
+ if (status == 0)
+ {
+ cdtime_t now;
+
+ now = cdtime ();
+ if ((now - last_message_time) > TIME_T_TO_CDTIME_T (1))
+ {
+ last_message_time = now;
+ ERROR ("plugin_dispatch_values: Low water mark "
+ "reached. Dropping %.0f%% of metrics.",
+ 100.0 * p);
+ }
+ pthread_mutex_unlock (&last_message_lock);
+ }
+
+ if (p == 1.0)
+ return (1);
+
+ q = cdrand_d ();
+ if (q > p)
+ return (1);
+ else
+ return (0);
+} /* }}} _Bool check_drop_value */
+
int plugin_dispatch_values (value_list_t const *vl)
{
int status;
+ if (check_drop_value ())
+ return (0);
+
status = plugin_write_enqueue (vl);
if (status != 0)
{