X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Fplugin.c;h=9c455417c82c35f0f0a87d320e12a79f9ccf8a8c;hb=89fc8d1d16779841bd693020d9035aa5f35f127d;hp=0aee64667d200651b548573bb30138e5c04fb8bb;hpb=4088bccc40644f45971304f4c73bc40dcfb87957;p=collectd.git diff --git a/src/plugin.c b/src/plugin.c index 0aee6466..9c455417 100644 --- a/src/plugin.c +++ b/src/plugin.c @@ -121,7 +121,6 @@ static _Bool plugin_ctx_key_initialized = 0; static long write_limit_high = 0; static long write_limit_low = 0; -cdtime_t last_drop_time = 0; /* * Static functions @@ -182,13 +181,13 @@ static void destroy_read_heap (void) /* {{{ */ while (42) { - callback_func_t *cf; + read_func_t *rf; - cf = c_heap_get_root (read_heap); - if (cf == NULL) + rf = c_heap_get_root (read_heap); + if (rf == NULL) break; - - destroy_callback (cf); + sfree (rf->rf_name); + destroy_callback ((callback_func_t *) rf); } c_heap_destroy (read_heap); @@ -340,7 +339,7 @@ static int plugin_load_file (char *file, uint32_t flags) ssnprintf (errbuf, sizeof (errbuf), "lt_dlopen (\"%s\") failed: %s. " - "The most common cause for this problem are " + "The most common cause for this problem is " "missing dependencies. Use ldd(1) to check " "the dependencies of the plugin " "/ shared object.", @@ -896,7 +895,6 @@ int plugin_load (char const *plugin_name, uint32_t flags) const char *dir; char filename[BUFSIZE] = ""; char typename[BUFSIZE]; - int typename_len; int ret; struct stat statbuf; struct dirent *de; @@ -936,7 +934,6 @@ int plugin_load (char const *plugin_name, uint32_t flags) WARNING ("plugin_load: Filename too long: \"%s.so\"", plugin_name); return (-1); } - typename_len = strlen (typename); if ((dh = opendir (dir)) == NULL) { @@ -948,7 +945,7 @@ int plugin_load (char const *plugin_name, uint32_t flags) while ((de = readdir (dh)) != NULL) { - if (strncasecmp (de->d_name, typename, typename_len)) + if (strcasecmp (de->d_name, typename)) continue; status = ssnprintf (filename, sizeof (filename), @@ -1136,8 +1133,10 @@ int plugin_register_read (const char *name, rf->rf_interval = plugin_get_interval (); status = plugin_insert_read (rf); - if (status != 0) + if (status != 0) { + sfree (rf->rf_name); sfree (rf); + } return (status); } /* int plugin_register_read */ @@ -1184,8 +1183,10 @@ int plugin_register_complex_read (const char *group, const char *name, rf->rf_ctx = plugin_get_ctx (); status = plugin_insert_read (rf); - if (status != 0) + if (status != 0) { + sfree (rf->rf_name); sfree (rf); + } return (status); } /* int plugin_register_complex_read */ @@ -1218,6 +1219,27 @@ int plugin_register_shutdown (const char *name, (void *) callback, /* user_data = */ NULL)); } /* int plugin_register_shutdown */ +static void plugin_free_data_sets (void) +{ + void *key; + void *value; + + if (data_sets == NULL) + return; + + while (c_avl_pick (data_sets, &key, &value) == 0) + { + data_set_t *ds = value; + /* key is a pointer to ds->type */ + + sfree (ds->ds); + sfree (ds); + } + + c_avl_destroy (data_sets); + data_sets = NULL; +} /* void plugin_free_data_sets */ + int plugin_register_data_set (const data_set_t *ds) { data_set_t *ds_copy; @@ -1435,7 +1457,7 @@ int plugin_unregister_notification (const char *name) void plugin_init_all (void) { - const char *chain_name; + char const *chain_name; llentry_t *le; int status; @@ -1448,17 +1470,34 @@ void plugin_init_all (void) chain_name = global_option_get ("PostCacheChain"); post_cache_chain = fc_chain_get_by_name (chain_name); - write_limit_high = global_option_get_long_in_range("WriteQueueLengthLimitHigh",0, 0, LONG_MAX); - write_limit_low = global_option_get_long_in_range("WriteQueueLengthLimitLow", (write_limit_high+1)/2, 0, (write_limit_high == 0) ? 0 : write_limit_high-1 ); - + write_limit_high = global_option_get_long ("WriteQueueLimitHigh", + /* default = */ 0); + if (write_limit_high < 0) { - char const *tmp = global_option_get ("WriteThreads"); - int num = atoi (tmp); + ERROR ("WriteQueueLimitHigh must be positive or zero."); + write_limit_high = 0; + } - if (num < 1) - num = 5; + write_limit_low = global_option_get_long ("WriteQueueLimitLow", + /* default = */ write_limit_high / 2); + if (write_limit_low < 0) + { + ERROR ("WriteQueueLimitLow must be positive or zero."); + write_limit_low = write_limit_high / 2; + } + else if (write_limit_low > write_limit_high) + { + ERROR ("WriteQueueLimitLow must not be larger than " + "WriteQueueLimitHigh."); + write_limit_low = write_limit_high; + } - start_write_threads ((size_t) num); + write_threads_num = global_option_get_long ("WriteThreads", + /* default = */ 5); + if (write_threads_num < 1) + { + ERROR ("WriteThreads must be positive."); + write_threads_num = 5; } if ((list_init == NULL) && (read_heap == NULL)) @@ -1496,6 +1535,8 @@ void plugin_init_all (void) le = le->next; } + start_write_threads ((size_t) write_threads_num); + /* Start read-threads */ if (read_heap != NULL) { @@ -1563,6 +1604,7 @@ int plugin_read_all_once (void) return_status = -1; } + sfree (rf->rf_name); destroy_callback ((void *) rf); } @@ -1745,6 +1787,7 @@ void plugin_shutdown_all (void) destroy_all_callbacks (&list_log); plugin_free_loaded (); + plugin_free_data_sets (); } /* void plugin_shutdown_all */ int plugin_dispatch_missing (const value_list_t *vl) /* {{{ */ @@ -1965,54 +2008,75 @@ static int plugin_dispatch_values_internal (value_list_t *vl) return (0); } /* int plugin_dispatch_values_internal */ -static _Bool drop_metric(void) { - _Bool drop = 0; - int wq_len = write_queue_length; - /* We store write_queue_length in a local variable because other threads may update write_queue_length. - * Having this in a local variable (like a cache) is better : we do not need a lock */ - - if(wq_len < write_limit_low) return(0); - - if((write_limit_high > 0) && (wq_len > write_limit_low)) { - if(wq_len >= write_limit_high) { - /* if high == low, we come here too */ - drop = 1; - } else { - /* here, high != low */ - long probability_to_drop; - long n; - - probability_to_drop = (wq_len - write_limit_low); - - n = cdrand_range(write_limit_low, write_limit_high); - - /* Let's have X = high - low. - * n is in range [0..X] - * probability_to_drop is in range [1..X[ - * probability_to_drop gets bigger when wq_len gets bigger. - */ - if(n <= probability_to_drop) { - drop = 1; - } - } - } - if(drop) { - cdtime_t now = cdtime(); - if((now - last_drop_time) > TIME_T_TO_CDTIME_T (60)) { - last_drop_time = now; - /* If you want to count dropped metrics, don't forget to add a lock here */ - /* dropped_metrics++; */ - ERROR ("plugin_dispatch_values : Low water mark reached, dropping a metric"); +static double get_drop_probability (void) /* {{{ */ +{ + long pos; + long size; + long wql; + + pthread_mutex_lock (&write_lock); + wql = write_queue_length; + pthread_mutex_unlock (&write_lock); + + if (wql < write_limit_low) + return (0.0); + if (wql >= write_limit_high) + return (1.0); + + pos = 1 + wql - write_limit_low; + size = 1 + write_limit_high - write_limit_low; + + return (((double) pos) / ((double) size)); +} /* }}} double get_drop_probability */ + +static _Bool check_drop_value (void) /* {{{ */ +{ + static cdtime_t last_message_time = 0; + static pthread_mutex_t last_message_lock = PTHREAD_MUTEX_INITIALIZER; + + double p; + double q; + int status; + + if (write_limit_high == 0) + return (0); + + p = get_drop_probability (); + if (p == 0.0) + return (0); + + status = pthread_mutex_trylock (&last_message_lock); + if (status == 0) + { + cdtime_t now; + + now = cdtime (); + if ((now - last_message_time) > TIME_T_TO_CDTIME_T (1)) + { + last_message_time = now; + ERROR ("plugin_dispatch_values: Low water mark " + "reached. Dropping %.0f%% of metrics.", + 100.0 * p); } + pthread_mutex_unlock (&last_message_lock); } - return(drop); -} + + if (p == 1.0) + return (1); + + q = cdrand_d (); + if (q > p) + return (1); + else + return (0); +} /* }}} _Bool check_drop_value */ int plugin_dispatch_values (value_list_t const *vl) { int status; - if(drop_metric ()) return(0); + if (check_drop_value ()) + return (0); status = plugin_write_enqueue (vl); if (status != 0)