/**
* collectd - src/plugin.c
- * Copyright (C) 2005-2013 Florian octo Forster
+ * Copyright (C) 2005-2014 Florian octo Forster
*
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License as published by the
- * Free Software Foundation; only version 2 of the License is applicable.
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
*
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * General Public License for more details.
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
*
- * You should have received a copy of the GNU General Public License along
- * with this program; if not, write to the Free Software Foundation, Inc.,
- * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
*
* Authors:
* Florian octo Forster <octo at collectd.org>
#include "utils_llist.h"
#include "utils_heap.h"
#include "utils_time.h"
+#include "utils_random.h"
#if HAVE_PTHREAD_H
# include <pthread.h>
#define rf_ctx rf_super.cf_ctx
callback_func_t rf_super;
char rf_group[DATA_MAX_NAME_LEN];
- char rf_name[DATA_MAX_NAME_LEN];
+ char *rf_name;
int rf_type;
cdtime_t rf_interval;
cdtime_t rf_effective_interval;
/*
* Private variables
*/
+static c_avl_tree_t *plugins_loaded = NULL;
+
static llist_t *list_init;
static llist_t *list_write;
static llist_t *list_flush;
static char *plugindir = NULL;
+#ifndef DEFAULT_MAX_READ_INTERVAL
+# define DEFAULT_MAX_READ_INTERVAL TIME_T_TO_CDTIME_T (86400)
+#endif
static c_heap_t *read_heap = NULL;
static llist_t *read_list;
static int read_loop = 1;
static pthread_cond_t read_cond = PTHREAD_COND_INITIALIZER;
static pthread_t *read_threads = NULL;
static int read_threads_num = 0;
+static cdtime_t max_read_interval = DEFAULT_MAX_READ_INTERVAL;
static write_queue_t *write_queue_head;
static write_queue_t *write_queue_tail;
+static long write_queue_length = 0;
static _Bool write_loop = 1;
static pthread_mutex_t write_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t write_cond = PTHREAD_COND_INITIALIZER;
static pthread_key_t plugin_ctx_key;
static _Bool plugin_ctx_key_initialized = 0;
+static long write_limit_high = 0;
+static long write_limit_low = 0;
+
+static derive_t stats_values_dropped = 0;
+static _Bool record_statistics = 0;
+
/*
* Static functions
*/
return (plugindir);
}
+static void plugin_update_internal_statistics (void) { /* {{{ */
+ derive_t copy_write_queue_length;
+ value_list_t vl = VALUE_LIST_INIT;
+ value_t values[2];
+
+ copy_write_queue_length = write_queue_length;
+
+ /* Initialize `vl' */
+ vl.values = values;
+ vl.values_len = 2;
+ vl.time = 0;
+ sstrncpy (vl.host, hostname_g, sizeof (vl.host));
+ sstrncpy (vl.plugin, "collectd", sizeof (vl.plugin));
+
+ vl.type_instance[0] = 0;
+ vl.values_len = 1;
+
+ /* Write queue */
+ sstrncpy (vl.plugin_instance, "write_queue",
+ sizeof (vl.plugin_instance));
+
+ /* Write queue : queue length */
+ vl.values[0].gauge = (gauge_t) copy_write_queue_length;
+ sstrncpy (vl.type, "queue_length", sizeof (vl.type));
+ vl.type_instance[0] = 0;
+ plugin_dispatch_values (&vl);
+
+ /* Write queue : Values dropped (queue length > low limit) */
+ vl.values[0].derive = (derive_t) stats_values_dropped;
+ sstrncpy (vl.type, "derive", sizeof (vl.type));
+ sstrncpy (vl.type_instance, "dropped", sizeof (vl.type_instance));
+ plugin_dispatch_values (&vl);
+
+ /* Cache */
+ sstrncpy (vl.plugin_instance, "cache",
+ sizeof (vl.plugin_instance));
+
+ /* Cache : Nb entry in cache tree */
+ vl.values[0].gauge = (gauge_t) uc_get_size();
+ sstrncpy (vl.type, "cache_size", sizeof (vl.type));
+ vl.type_instance[0] = 0;
+ plugin_dispatch_values (&vl);
+
+ return;
+} /* }}} void plugin_update_internal_statistics */
+
static void destroy_callback (callback_func_t *cf) /* {{{ */
{
if (cf == NULL)
{
DEBUG ("plugin_read_thread: Destroying the `%s' "
"callback.", rf->rf_name);
+ sfree (rf->rf_name);
destroy_callback ((callback_func_t *) rf);
rf = NULL;
continue;
if (status != 0)
{
rf->rf_effective_interval *= 2;
- if (rf->rf_effective_interval > TIME_T_TO_CDTIME_T (86400))
- rf->rf_effective_interval = TIME_T_TO_CDTIME_T (86400);
+ if (rf->rf_effective_interval > max_read_interval)
+ rf->rf_effective_interval = max_read_interval;
NOTICE ("read-function of plugin `%s' failed. "
"Will suspend it for %.3f seconds.",
{
write_queue_head = q;
write_queue_tail = q;
+ write_queue_length = 1;
}
else
{
write_queue_tail->next = q;
write_queue_tail = q;
+ write_queue_length += 1;
}
pthread_cond_signal (&write_cond);
q = write_queue_head;
write_queue_head = q->next;
- if (write_queue_head == NULL)
+ write_queue_length -= 1;
+ if (write_queue_head == NULL) {
write_queue_tail = NULL;
+ assert(0 == write_queue_length);
+ }
pthread_mutex_unlock (&write_lock);
}
write_queue_head = NULL;
write_queue_tail = NULL;
+ write_queue_length = 0;
pthread_mutex_unlock (&write_lock);
if (i > 0)
}
}
+static _Bool plugin_is_loaded (char const *name)
+{
+ int status;
+
+ if (plugins_loaded == NULL)
+ plugins_loaded = c_avl_create ((void *) strcasecmp);
+ assert (plugins_loaded != NULL);
+
+ status = c_avl_get (plugins_loaded, name, /* ret_value = */ NULL);
+ return (status == 0);
+}
+
+static int plugin_mark_loaded (char const *name)
+{
+ char *name_copy;
+ int status;
+
+ name_copy = strdup (name);
+ if (name_copy == NULL)
+ return (ENOMEM);
+
+ status = c_avl_insert (plugins_loaded,
+ /* key = */ name_copy, /* value = */ NULL);
+ return (status);
+}
+
+static void plugin_free_loaded ()
+{
+ void *key;
+ void *value;
+
+ if (plugins_loaded == NULL)
+ return;
+
+ while (c_avl_pick (plugins_loaded, &key, &value) == 0)
+ {
+ sfree (key);
+ assert (value == NULL);
+ }
+
+ c_avl_destroy (plugins_loaded);
+ plugins_loaded = NULL;
+}
+
#define BUFSIZE 512
-int plugin_load (const char *type, uint32_t flags)
+int plugin_load (char const *plugin_name, uint32_t flags)
{
DIR *dh;
const char *dir;
struct dirent *de;
int status;
+ if (plugin_name == NULL)
+ return (EINVAL);
+
+ /* Check if plugin is already loaded and don't do anything in this
+ * case. */
+ if (plugin_is_loaded (plugin_name))
+ return (0);
+
dir = plugin_get_dir ();
ret = 1;
+ /*
+ * XXX: Magic at work:
+ *
+ * Some of the language bindings, for example the Python and Perl
+ * plugins, need to be able to export symbols to the scripts they run.
+ * For this to happen, the "Globals" flag needs to be set.
+ * Unfortunately, this technical detail is hard to explain to the
+ * average user and she shouldn't have to worry about this, ideally.
+ * So in order to save everyone's sanity use a different default for a
+ * handful of special plugins. --octo
+ */
+ if ((strcasecmp ("perl", plugin_name) == 0)
+ || (strcasecmp ("python", plugin_name) == 0))
+ flags |= PLUGIN_FLAGS_GLOBAL;
+
/* `cpu' should not match `cpufreq'. To solve this we add `.so' to the
* type when matching the filename */
- status = ssnprintf (typename, sizeof (typename), "%s.so", type);
+ status = ssnprintf (typename, sizeof (typename), "%s.so", plugin_name);
if ((status < 0) || ((size_t) status >= sizeof (typename)))
{
- WARNING ("plugin_load: Filename too long: \"%s.so\"", type);
+ WARNING ("plugin_load: Filename too long: \"%s.so\"", plugin_name);
return (-1);
}
typename_len = strlen (typename);
if (status == 0)
{
/* success */
+ plugin_mark_loaded (plugin_name);
ret = 0;
break;
}
else
{
ERROR ("plugin_load: Load plugin \"%s\" failed with "
- "status %i.", type, status);
+ "status %i.", plugin_name, status);
}
}
if (filename[0] == 0)
ERROR ("plugin_load: Could not find plugin \"%s\" in %s",
- type, dir);
+ plugin_name, dir);
return (ret);
}
rf->rf_udata.free_func = NULL;
rf->rf_ctx = plugin_get_ctx ();
rf->rf_group[0] = '\0';
- sstrncpy (rf->rf_name, name, sizeof (rf->rf_name));
+ rf->rf_name = strdup (name);
rf->rf_type = RF_SIMPLE;
rf->rf_interval = plugin_get_interval ();
sstrncpy (rf->rf_group, group, sizeof (rf->rf_group));
else
rf->rf_group[0] = '\0';
- sstrncpy (rf->rf_name, name, sizeof (rf->rf_name));
+ rf->rf_name = strdup (name);
rf->rf_type = RF_COMPLEX;
if (interval != NULL)
rf->rf_interval = TIMESPEC_TO_CDTIME_T (interval);
void plugin_init_all (void)
{
- const char *chain_name;
+ char const *chain_name;
+ long write_threads_num;
llentry_t *le;
int status;
/* Init the value cache */
uc_init ();
+ if (IS_TRUE (global_option_get ("CollectInternalStats")))
+ record_statistics = 1;
+
chain_name = global_option_get ("PreCacheChain");
pre_cache_chain = fc_chain_get_by_name (chain_name);
chain_name = global_option_get ("PostCacheChain");
post_cache_chain = fc_chain_get_by_name (chain_name);
+ write_limit_high = global_option_get_long ("WriteQueueLimitHigh",
+ /* default = */ 0);
+ if (write_limit_high < 0)
{
- char const *tmp = global_option_get ("WriteThreads");
- int num = atoi (tmp);
+ ERROR ("WriteQueueLimitHigh must be positive or zero.");
+ write_limit_high = 0;
+ }
- if (num < 1)
- num = 5;
+ write_limit_low = global_option_get_long ("WriteQueueLimitLow",
+ /* default = */ write_limit_high / 2);
+ if (write_limit_low < 0)
+ {
+ ERROR ("WriteQueueLimitLow must be positive or zero.");
+ write_limit_low = write_limit_high / 2;
+ }
+ else if (write_limit_low > write_limit_high)
+ {
+ ERROR ("WriteQueueLimitLow must not be larger than "
+ "WriteQueueLimitHigh.");
+ write_limit_low = write_limit_high;
+ }
- start_write_threads ((size_t) num);
+ write_threads_num = global_option_get_long ("WriteThreads",
+ /* default = */ 5);
+ if (write_threads_num < 1)
+ {
+ ERROR ("WriteThreads must be positive.");
+ write_threads_num = 5;
}
+ start_write_threads ((size_t) write_threads_num);
+
if ((list_init == NULL) && (read_heap == NULL))
return;
le = le->next;
}
+ max_read_interval = global_option_get_time ("MaxReadInterval",
+ DEFAULT_MAX_READ_INTERVAL);
+
/* Start read-threads */
if (read_heap != NULL)
{
const char *rt;
int num;
+
rt = global_option_get ("ReadThreads");
num = atoi (rt);
if (num != -1)
/* TODO: Rename this function. */
void plugin_read_all (void)
{
+ if(record_statistics) {
+ plugin_update_internal_statistics ();
+ }
uc_check_timeout ();
return;
destroy_all_callbacks (&list_shutdown);
destroy_all_callbacks (&list_log);
+ plugin_free_loaded ();
plugin_free_data_sets ();
} /* void plugin_shutdown_all */
return (0);
} /* int plugin_dispatch_values_internal */
+static double get_drop_probability (void) /* {{{ */
+{
+ long pos;
+ long size;
+ long wql;
+
+ pthread_mutex_lock (&write_lock);
+ wql = write_queue_length;
+ pthread_mutex_unlock (&write_lock);
+
+ if (wql < write_limit_low)
+ return (0.0);
+ if (wql >= write_limit_high)
+ return (1.0);
+
+ pos = 1 + wql - write_limit_low;
+ size = 1 + write_limit_high - write_limit_low;
+
+ return (((double) pos) / ((double) size));
+} /* }}} double get_drop_probability */
+
+static _Bool check_drop_value (void) /* {{{ */
+{
+ static cdtime_t last_message_time = 0;
+ static pthread_mutex_t last_message_lock = PTHREAD_MUTEX_INITIALIZER;
+
+ double p;
+ double q;
+ int status;
+
+ if (write_limit_high == 0)
+ return (0);
+
+ p = get_drop_probability ();
+ if (p == 0.0)
+ return (0);
+
+ status = pthread_mutex_trylock (&last_message_lock);
+ if (status == 0)
+ {
+ cdtime_t now;
+
+ now = cdtime ();
+ if ((now - last_message_time) > TIME_T_TO_CDTIME_T (1))
+ {
+ last_message_time = now;
+ ERROR ("plugin_dispatch_values: Low water mark "
+ "reached. Dropping %.0f%% of metrics.",
+ 100.0 * p);
+ }
+ pthread_mutex_unlock (&last_message_lock);
+ }
+
+ if (p == 1.0)
+ return (1);
+
+ q = cdrand_d ();
+ if (q > p)
+ return (1);
+ else
+ return (0);
+} /* }}} _Bool check_drop_value */
+
int plugin_dispatch_values (value_list_t const *vl)
{
int status;
+ static pthread_mutex_t statistics_lock = PTHREAD_MUTEX_INITIALIZER;
+
+ if (check_drop_value ()) {
+ if(record_statistics) {
+ pthread_mutex_lock(&statistics_lock);
+ stats_values_dropped++;
+ pthread_mutex_unlock(&statistics_lock);
+ }
+ return (0);
+ }
status = plugin_write_enqueue (vl);
if (status != 0)
return (0);
}
+__attribute__((sentinel))
+int plugin_dispatch_multivalue (value_list_t const *template, /* {{{ */
+ _Bool store_percentage, ...)
+{
+ value_list_t *vl;
+ int failed = 0;
+ gauge_t sum = 0.0;
+ va_list ap;
+
+ assert (template->values_len == 1);
+
+ va_start (ap, store_percentage);
+ while (42)
+ {
+ char const *name;
+ gauge_t value;
+
+ name = va_arg (ap, char const *);
+ if (name == NULL)
+ break;
+
+ value = va_arg (ap, gauge_t);
+ if (!isnan (value))
+ sum += value;
+ }
+ va_end (ap);
+
+ vl = plugin_value_list_clone (template);
+ /* plugin_value_list_clone makes sure vl->time is set to non-zero. */
+ if (store_percentage)
+ sstrncpy (vl->type, "percent", sizeof (vl->type));
+
+ va_start (ap, store_percentage);
+ while (42)
+ {
+ char const *name;
+ int status;
+
+ /* Set the type instance. */
+ name = va_arg (ap, char const *);
+ if (name == NULL)
+ break;
+ sstrncpy (vl->type_instance, name, sizeof (vl->type_instance));
+
+ /* Set the value. */
+ vl->values[0].gauge = va_arg (ap, gauge_t);
+ if (store_percentage)
+ vl->values[0].gauge *= 100.0 / sum;
+
+ status = plugin_write_enqueue (vl);
+ if (status != 0)
+ failed++;
+ }
+ va_end (ap);
+
+ plugin_value_list_free (vl);
+ return (failed);
+} /* }}} int plugin_dispatch_multivalue */
+
int plugin_dispatch_notification (const notification_t *notif)
{
llentry_t *le;