static long write_limit_high = 0;
static long write_limit_low = 0;
-cdtime_t last_drop_time = 0;
/*
* Static functions
while (42)
{
- callback_func_t *cf;
+ read_func_t *rf;
- cf = c_heap_get_root (read_heap);
- if (cf == NULL)
+ rf = c_heap_get_root (read_heap);
+ if (rf == NULL)
break;
-
- destroy_callback (cf);
+ sfree (rf->rf_name);
+ destroy_callback ((callback_func_t *) rf);
}
c_heap_destroy (read_heap);
ssnprintf (errbuf, sizeof (errbuf),
"lt_dlopen (\"%s\") failed: %s. "
- "The most common cause for this problem are "
+ "The most common cause for this problem is "
"missing dependencies. Use ldd(1) to check "
"the dependencies of the plugin "
"/ shared object.",
return (status);
}
-static void plugin_free_loaded ()
+static void plugin_free_loaded (void)
{
void *key;
void *value;
const char *dir;
char filename[BUFSIZE] = "";
char typename[BUFSIZE];
- int typename_len;
int ret;
struct stat statbuf;
struct dirent *de;
WARNING ("plugin_load: Filename too long: \"%s.so\"", plugin_name);
return (-1);
}
- typename_len = strlen (typename);
if ((dh = opendir (dir)) == NULL)
{
while ((de = readdir (dh)) != NULL)
{
- if (strncasecmp (de->d_name, typename, typename_len))
+ if (strcasecmp (de->d_name, typename))
continue;
status = ssnprintf (filename, sizeof (filename),
rf->rf_interval = plugin_get_interval ();
status = plugin_insert_read (rf);
- if (status != 0)
+ if (status != 0) {
+ sfree (rf->rf_name);
sfree (rf);
+ }
return (status);
} /* int plugin_register_read */
rf->rf_ctx = plugin_get_ctx ();
status = plugin_insert_read (rf);
- if (status != 0)
+ if (status != 0) {
+ sfree (rf->rf_name);
sfree (rf);
+ }
return (status);
} /* int plugin_register_complex_read */
(void *) callback, /* user_data = */ NULL));
} /* int plugin_register_shutdown */
+static void plugin_free_data_sets (void)
+{
+ void *key;
+ void *value;
+
+ if (data_sets == NULL)
+ return;
+
+ while (c_avl_pick (data_sets, &key, &value) == 0)
+ {
+ data_set_t *ds = value;
+ /* key is a pointer to ds->type */
+
+ sfree (ds->ds);
+ sfree (ds);
+ }
+
+ c_avl_destroy (data_sets);
+ data_sets = NULL;
+} /* void plugin_free_data_sets */
+
int plugin_register_data_set (const data_set_t *ds)
{
data_set_t *ds_copy;
void plugin_init_all (void)
{
char const *chain_name;
- long write_threads_num;
llentry_t *le;
int status;
write_threads_num = 5;
}
- start_write_threads ((size_t) write_threads_num);
-
if ((list_init == NULL) && (read_heap == NULL))
return;
le = le->next;
}
+ start_write_threads ((size_t) write_threads_num);
+
/* Start read-threads */
if (read_heap != NULL)
{
return_status = -1;
}
+ sfree (rf->rf_name);
destroy_callback ((void *) rf);
}
destroy_all_callbacks (&list_log);
plugin_free_loaded ();
+ plugin_free_data_sets ();
} /* void plugin_shutdown_all */
int plugin_dispatch_missing (const value_list_t *vl) /* {{{ */
return (0);
} /* int plugin_dispatch_values_internal */
-static _Bool drop_metric(void) {
- _Bool drop = 0;
- int wq_len = write_queue_length;
- /* We store write_queue_length in a local variable because other threads may update write_queue_length.
- * Having this in a local variable (like a cache) is better : we do not need a lock */
-
- if(wq_len < write_limit_low) return(0);
-
- if((write_limit_high > 0) && (wq_len > write_limit_low)) {
- if(wq_len >= write_limit_high) {
- /* if high == low, we come here too */
- drop = 1;
- } else {
- /* here, high != low */
- long probability_to_drop;
- long n;
-
- probability_to_drop = (wq_len - write_limit_low);
-
- n = cdrand_range(write_limit_low, write_limit_high);
-
- /* Let's have X = high - low.
- * n is in range [0..X]
- * probability_to_drop is in range [1..X[
- * probability_to_drop gets bigger when wq_len gets bigger.
- */
- if(n <= probability_to_drop) {
- drop = 1;
- }
- }
- }
- if(drop) {
- cdtime_t now = cdtime();
- if((now - last_drop_time) > TIME_T_TO_CDTIME_T (60)) {
- last_drop_time = now;
- /* If you want to count dropped metrics, don't forget to add a lock here */
- /* dropped_metrics++; */
- ERROR ("plugin_dispatch_values : Low water mark reached, dropping a metric");
+static double get_drop_probability (void) /* {{{ */
+{
+ long pos;
+ long size;
+ long wql;
+
+ pthread_mutex_lock (&write_lock);
+ wql = write_queue_length;
+ pthread_mutex_unlock (&write_lock);
+
+ if (wql < write_limit_low)
+ return (0.0);
+ if (wql >= write_limit_high)
+ return (1.0);
+
+ pos = 1 + wql - write_limit_low;
+ size = 1 + write_limit_high - write_limit_low;
+
+ return (((double) pos) / ((double) size));
+} /* }}} double get_drop_probability */
+
+static _Bool check_drop_value (void) /* {{{ */
+{
+ static cdtime_t last_message_time = 0;
+ static pthread_mutex_t last_message_lock = PTHREAD_MUTEX_INITIALIZER;
+
+ double p;
+ double q;
+ int status;
+
+ if (write_limit_high == 0)
+ return (0);
+
+ p = get_drop_probability ();
+ if (p == 0.0)
+ return (0);
+
+ status = pthread_mutex_trylock (&last_message_lock);
+ if (status == 0)
+ {
+ cdtime_t now;
+
+ now = cdtime ();
+ if ((now - last_message_time) > TIME_T_TO_CDTIME_T (1))
+ {
+ last_message_time = now;
+ ERROR ("plugin_dispatch_values: Low water mark "
+ "reached. Dropping %.0f%% of metrics.",
+ 100.0 * p);
}
+ pthread_mutex_unlock (&last_message_lock);
}
- return(drop);
-}
+
+ if (p == 1.0)
+ return (1);
+
+ q = cdrand_d ();
+ if (q > p)
+ return (1);
+ else
+ return (0);
+} /* }}} _Bool check_drop_value */
int plugin_dispatch_values (value_list_t const *vl)
{
int status;
- if(drop_metric ()) return(0);
+ if (check_drop_value ())
+ return (0);
status = plugin_write_enqueue (vl);
if (status != 0)