/**
* collectd - src/plugin.c
- * Copyright (C) 2005-2013 Florian octo Forster
+ * Copyright (C) 2005-2014 Florian octo Forster
*
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License as published by the
- * Free Software Foundation; only version 2 of the License is applicable.
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
*
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * General Public License for more details.
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
*
- * You should have received a copy of the GNU General Public License along
- * with this program; if not, write to the Free Software Foundation, Inc.,
- * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
*
* Authors:
* Florian octo Forster <octo at collectd.org>
static char *plugindir = NULL;
+#ifndef DEFAULT_MAX_READ_INTERVAL
+# define DEFAULT_MAX_READ_INTERVAL TIME_T_TO_CDTIME_T (86400)
+#endif
static c_heap_t *read_heap = NULL;
static llist_t *read_list;
static int read_loop = 1;
static pthread_cond_t read_cond = PTHREAD_COND_INITIALIZER;
static pthread_t *read_threads = NULL;
static int read_threads_num = 0;
+static cdtime_t max_read_interval = DEFAULT_MAX_READ_INTERVAL;
static write_queue_t *write_queue_head;
static write_queue_t *write_queue_tail;
static long write_limit_high = 0;
static long write_limit_low = 0;
-cdtime_t last_drop_time = 0;
+
+static derive_t stats_values_dropped = 0;
+static _Bool record_statistics = 0;
/*
* Static functions
return (plugindir);
}
+static void plugin_update_internal_statistics (void) { /* {{{ */
+ derive_t copy_write_queue_length;
+ value_list_t vl = VALUE_LIST_INIT;
+ value_t values[2];
+
+ copy_write_queue_length = write_queue_length;
+
+ /* Initialize `vl' */
+ vl.values = values;
+ vl.values_len = 2;
+ vl.time = 0;
+ sstrncpy (vl.host, hostname_g, sizeof (vl.host));
+ sstrncpy (vl.plugin, "collectd", sizeof (vl.plugin));
+
+ vl.type_instance[0] = 0;
+ vl.values_len = 1;
+
+ /* Write queue */
+ sstrncpy (vl.plugin_instance, "write_queue",
+ sizeof (vl.plugin_instance));
+
+ /* Write queue : queue length */
+ vl.values[0].gauge = (gauge_t) copy_write_queue_length;
+ sstrncpy (vl.type, "queue_length", sizeof (vl.type));
+ vl.type_instance[0] = 0;
+ plugin_dispatch_values (&vl);
+
+ /* Write queue : Values dropped (queue length > low limit) */
+ vl.values[0].derive = (derive_t) stats_values_dropped;
+ sstrncpy (vl.type, "derive", sizeof (vl.type));
+ sstrncpy (vl.type_instance, "dropped", sizeof (vl.type_instance));
+ plugin_dispatch_values (&vl);
+
+ /* Cache */
+ sstrncpy (vl.plugin_instance, "cache",
+ sizeof (vl.plugin_instance));
+
+ /* Cache : Nb entry in cache tree */
+ vl.values[0].gauge = (gauge_t) uc_get_size();
+ sstrncpy (vl.type, "cache_size", sizeof (vl.type));
+ vl.type_instance[0] = 0;
+ plugin_dispatch_values (&vl);
+
+ return;
+} /* }}} void plugin_update_internal_statistics */
+
static void destroy_callback (callback_func_t *cf) /* {{{ */
{
if (cf == NULL)
if (status != 0)
{
rf->rf_effective_interval *= 2;
- if (rf->rf_effective_interval > TIME_T_TO_CDTIME_T (86400))
- rf->rf_effective_interval = TIME_T_TO_CDTIME_T (86400);
+ if (rf->rf_effective_interval > max_read_interval)
+ rf->rf_effective_interval = max_read_interval;
NOTICE ("read-function of plugin `%s' failed. "
"Will suspend it for %.3f seconds.",
(void *) callback, /* user_data = */ NULL));
} /* int plugin_register_shutdown */
+static void plugin_free_data_sets (void)
+{
+ void *key;
+ void *value;
+
+ if (data_sets == NULL)
+ return;
+
+ while (c_avl_pick (data_sets, &key, &value) == 0)
+ {
+ data_set_t *ds = value;
+ /* key is a pointer to ds->type */
+
+ sfree (ds->ds);
+ sfree (ds);
+ }
+
+ c_avl_destroy (data_sets);
+ data_sets = NULL;
+} /* void plugin_free_data_sets */
+
int plugin_register_data_set (const data_set_t *ds)
{
data_set_t *ds_copy;
void plugin_init_all (void)
{
- const char *chain_name;
+ char const *chain_name;
+ long write_threads_num;
llentry_t *le;
int status;
/* Init the value cache */
uc_init ();
+ if (IS_TRUE (global_option_get ("CollectInternalStats")))
+ record_statistics = 1;
+
chain_name = global_option_get ("PreCacheChain");
pre_cache_chain = fc_chain_get_by_name (chain_name);
chain_name = global_option_get ("PostCacheChain");
post_cache_chain = fc_chain_get_by_name (chain_name);
- write_limit_high = global_option_get_long_in_range("WriteQueueLengthLimitHigh",0, 0, LONG_MAX);
- write_limit_low = global_option_get_long_in_range("WriteQueueLengthLimitLow", (write_limit_high+1)/2, 0, (write_limit_high == 0) ? 0 : write_limit_high-1 );
-
+ write_limit_high = global_option_get_long ("WriteQueueLimitHigh",
+ /* default = */ 0);
+ if (write_limit_high < 0)
{
- char const *tmp = global_option_get ("WriteThreads");
- int num = atoi (tmp);
+ ERROR ("WriteQueueLimitHigh must be positive or zero.");
+ write_limit_high = 0;
+ }
- if (num < 1)
- num = 5;
+ write_limit_low = global_option_get_long ("WriteQueueLimitLow",
+ /* default = */ write_limit_high / 2);
+ if (write_limit_low < 0)
+ {
+ ERROR ("WriteQueueLimitLow must be positive or zero.");
+ write_limit_low = write_limit_high / 2;
+ }
+ else if (write_limit_low > write_limit_high)
+ {
+ ERROR ("WriteQueueLimitLow must not be larger than "
+ "WriteQueueLimitHigh.");
+ write_limit_low = write_limit_high;
+ }
- start_write_threads ((size_t) num);
+ write_threads_num = global_option_get_long ("WriteThreads",
+ /* default = */ 5);
+ if (write_threads_num < 1)
+ {
+ ERROR ("WriteThreads must be positive.");
+ write_threads_num = 5;
}
+ start_write_threads ((size_t) write_threads_num);
+
if ((list_init == NULL) && (read_heap == NULL))
return;
le = le->next;
}
+ max_read_interval = global_option_get_time ("MaxReadInterval",
+ DEFAULT_MAX_READ_INTERVAL);
+
/* Start read-threads */
if (read_heap != NULL)
{
const char *rt;
int num;
+
rt = global_option_get ("ReadThreads");
num = atoi (rt);
if (num != -1)
/* TODO: Rename this function. */
void plugin_read_all (void)
{
+ if(record_statistics) {
+ plugin_update_internal_statistics ();
+ }
uc_check_timeout ();
return;
destroy_all_callbacks (&list_log);
plugin_free_loaded ();
+ plugin_free_data_sets ();
} /* void plugin_shutdown_all */
int plugin_dispatch_missing (const value_list_t *vl) /* {{{ */
return (0);
} /* int plugin_dispatch_values_internal */
-static _Bool drop_metric(void) {
- _Bool drop = 0;
- int wq_len = write_queue_length;
- /* We store write_queue_length in a local variable because other threads may update write_queue_length.
- * Having this in a local variable (like a cache) is better : we do not need a lock */
-
- if(wq_len < write_limit_low) return(0);
-
- if((write_limit_high > 0) && (wq_len > write_limit_low)) {
- if(wq_len >= write_limit_high) {
- /* if high == low, we come here too */
- drop = 1;
- } else {
- /* here, high != low */
- long probability_to_drop;
- long n;
-
- probability_to_drop = (wq_len - write_limit_low);
-
- n = cdrand_range(write_limit_low, write_limit_high);
-
- /* Let's have X = high - low.
- * n is in range [0..X]
- * probability_to_drop is in range [1..X[
- * probability_to_drop gets bigger when wq_len gets bigger.
- */
- if(n <= probability_to_drop) {
- drop = 1;
- }
- }
- }
- if(drop) {
- cdtime_t now = cdtime();
- if((now - last_drop_time) > TIME_T_TO_CDTIME_T (60)) {
- last_drop_time = now;
- /* If you want to count dropped metrics, don't forget to add a lock here */
- /* dropped_metrics++; */
- ERROR ("plugin_dispatch_values : Low water mark reached, dropping a metric");
+static double get_drop_probability (void) /* {{{ */
+{
+ long pos;
+ long size;
+ long wql;
+
+ pthread_mutex_lock (&write_lock);
+ wql = write_queue_length;
+ pthread_mutex_unlock (&write_lock);
+
+ if (wql < write_limit_low)
+ return (0.0);
+ if (wql >= write_limit_high)
+ return (1.0);
+
+ pos = 1 + wql - write_limit_low;
+ size = 1 + write_limit_high - write_limit_low;
+
+ return (((double) pos) / ((double) size));
+} /* }}} double get_drop_probability */
+
+static _Bool check_drop_value (void) /* {{{ */
+{
+ static cdtime_t last_message_time = 0;
+ static pthread_mutex_t last_message_lock = PTHREAD_MUTEX_INITIALIZER;
+
+ double p;
+ double q;
+ int status;
+
+ if (write_limit_high == 0)
+ return (0);
+
+ p = get_drop_probability ();
+ if (p == 0.0)
+ return (0);
+
+ status = pthread_mutex_trylock (&last_message_lock);
+ if (status == 0)
+ {
+ cdtime_t now;
+
+ now = cdtime ();
+ if ((now - last_message_time) > TIME_T_TO_CDTIME_T (1))
+ {
+ last_message_time = now;
+ ERROR ("plugin_dispatch_values: Low water mark "
+ "reached. Dropping %.0f%% of metrics.",
+ 100.0 * p);
}
+ pthread_mutex_unlock (&last_message_lock);
}
- return(drop);
-}
+
+ if (p == 1.0)
+ return (1);
+
+ q = cdrand_d ();
+ if (q > p)
+ return (1);
+ else
+ return (0);
+} /* }}} _Bool check_drop_value */
int plugin_dispatch_values (value_list_t const *vl)
{
int status;
+ static pthread_mutex_t statistics_lock = PTHREAD_MUTEX_INITIALIZER;
- if(drop_metric ()) return(0);
+ if (check_drop_value ()) {
+ if(record_statistics) {
+ pthread_mutex_lock(&statistics_lock);
+ stats_values_dropped++;
+ pthread_mutex_unlock(&statistics_lock);
+ }
+ return (0);
+ }
status = plugin_write_enqueue (vl);
if (status != 0)
return (0);
}
+__attribute__((sentinel))
+int plugin_dispatch_multivalue (value_list_t const *template, /* {{{ */
+ _Bool store_percentage, ...)
+{
+ value_list_t *vl;
+ int failed = 0;
+ gauge_t sum = 0.0;
+ va_list ap;
+
+ assert (template->values_len == 1);
+
+ va_start (ap, store_percentage);
+ while (42)
+ {
+ char const *name;
+ gauge_t value;
+
+ name = va_arg (ap, char const *);
+ if (name == NULL)
+ break;
+
+ value = va_arg (ap, gauge_t);
+ if (!isnan (value))
+ sum += value;
+ }
+ va_end (ap);
+
+ vl = plugin_value_list_clone (template);
+ /* plugin_value_list_clone makes sure vl->time is set to non-zero. */
+ if (store_percentage)
+ sstrncpy (vl->type, "percent", sizeof (vl->type));
+
+ va_start (ap, store_percentage);
+ while (42)
+ {
+ char const *name;
+ int status;
+
+ /* Set the type instance. */
+ name = va_arg (ap, char const *);
+ if (name == NULL)
+ break;
+ sstrncpy (vl->type_instance, name, sizeof (vl->type_instance));
+
+ /* Set the value. */
+ vl->values[0].gauge = va_arg (ap, gauge_t);
+ if (store_percentage)
+ vl->values[0].gauge *= 100.0 / sum;
+
+ status = plugin_write_enqueue (vl);
+ if (status != 0)
+ failed++;
+ }
+ va_end (ap);
+
+ plugin_value_list_free (vl);
+ return (failed);
+} /* }}} int plugin_dispatch_multivalue */
+
int plugin_dispatch_notification (const notification_t *notif)
{
llentry_t *le;