static char *plugindir = NULL;
+#ifndef DEFAULT_MAX_READ_INTERVAL
+# define DEFAULT_MAX_READ_INTERVAL TIME_T_TO_CDTIME_T (86400)
+#endif
static c_heap_t *read_heap = NULL;
static llist_t *read_list;
static int read_loop = 1;
static pthread_cond_t read_cond = PTHREAD_COND_INITIALIZER;
static pthread_t *read_threads = NULL;
static int read_threads_num = 0;
+static cdtime_t max_read_interval = DEFAULT_MAX_READ_INTERVAL;
static write_queue_t *write_queue_head;
static write_queue_t *write_queue_tail;
static long write_limit_high = 0;
static long write_limit_low = 0;
+static derive_t stats_values_dropped = 0;
+static _Bool record_statistics = 0;
+
/*
* Static functions
*/
return (plugindir);
}
+static void plugin_update_internal_statistics (void) { /* {{{ */
+ derive_t copy_write_queue_length;
+ value_list_t vl = VALUE_LIST_INIT;
+ value_t values[2];
+
+ copy_write_queue_length = write_queue_length;
+
+ /* Initialize `vl' */
+ vl.values = values;
+ vl.values_len = 2;
+ vl.time = 0;
+ sstrncpy (vl.host, hostname_g, sizeof (vl.host));
+ sstrncpy (vl.plugin, "collectd", sizeof (vl.plugin));
+
+ vl.type_instance[0] = 0;
+ vl.values_len = 1;
+
+ /* Write queue */
+ sstrncpy (vl.plugin_instance, "write_queue",
+ sizeof (vl.plugin_instance));
+
+ /* Write queue : queue length */
+ vl.values[0].gauge = (gauge_t) copy_write_queue_length;
+ sstrncpy (vl.type, "queue_length", sizeof (vl.type));
+ vl.type_instance[0] = 0;
+ plugin_dispatch_values (&vl);
+
+ /* Write queue : Values dropped (queue length > low limit) */
+ vl.values[0].derive = (derive_t) stats_values_dropped;
+ sstrncpy (vl.type, "derive", sizeof (vl.type));
+ sstrncpy (vl.type_instance, "dropped", sizeof (vl.type_instance));
+ plugin_dispatch_values (&vl);
+
+ /* Cache */
+ sstrncpy (vl.plugin_instance, "cache",
+ sizeof (vl.plugin_instance));
+
+ /* Cache : Nb entry in cache tree */
+ vl.values[0].gauge = (gauge_t) uc_get_size();
+ sstrncpy (vl.type, "cache_size", sizeof (vl.type));
+ vl.type_instance[0] = 0;
+ plugin_dispatch_values (&vl);
+
+ return;
+} /* }}} void plugin_update_internal_statistics */
+
static void destroy_callback (callback_func_t *cf) /* {{{ */
{
if (cf == NULL)
if (status != 0)
{
rf->rf_effective_interval *= 2;
- if (rf->rf_effective_interval > TIME_T_TO_CDTIME_T (86400))
- rf->rf_effective_interval = TIME_T_TO_CDTIME_T (86400);
+ if (rf->rf_effective_interval > max_read_interval)
+ rf->rf_effective_interval = max_read_interval;
NOTICE ("read-function of plugin `%s' failed. "
"Will suspend it for %.3f seconds.",
/* Init the value cache */
uc_init ();
+ if (IS_TRUE (global_option_get ("CollectInternalStats")))
+ record_statistics = 1;
+
chain_name = global_option_get ("PreCacheChain");
pre_cache_chain = fc_chain_get_by_name (chain_name);
le = le->next;
}
+ max_read_interval = global_option_get_time ("MaxReadInterval",
+ DEFAULT_MAX_READ_INTERVAL);
+
/* Start read-threads */
if (read_heap != NULL)
{
const char *rt;
int num;
+
rt = global_option_get ("ReadThreads");
num = atoi (rt);
if (num != -1)
/* TODO: Rename this function. */
void plugin_read_all (void)
{
+ if(record_statistics) {
+ plugin_update_internal_statistics ();
+ }
uc_check_timeout ();
return;
int plugin_dispatch_values (value_list_t const *vl)
{
int status;
+ static pthread_mutex_t statistics_lock = PTHREAD_MUTEX_INITIALIZER;
- if (check_drop_value ())
+ if (check_drop_value ()) {
+ if(record_statistics) {
+ pthread_mutex_lock(&statistics_lock);
+ stats_values_dropped++;
+ pthread_mutex_unlock(&statistics_lock);
+ }
return (0);
+ }
status = plugin_write_enqueue (vl);
if (status != 0)