Fixed a bug with WriteQueueLengthLimitHigh is null/uninitialized
[collectd.git] / src / plugin.c
index 386be32..0aee646 100644 (file)
@@ -31,6 +31,7 @@
 #include "utils_llist.h"
 #include "utils_heap.h"
 #include "utils_time.h"
+#include "utils_random.h"
 
 #if HAVE_PTHREAD_H
 # include <pthread.h>
@@ -108,6 +109,7 @@ static int             read_threads_num = 0;
 
 static write_queue_t  *write_queue_head;
 static write_queue_t  *write_queue_tail;
+static long            write_queue_length = 0;
 static _Bool           write_loop = 1;
 static pthread_mutex_t write_lock = PTHREAD_MUTEX_INITIALIZER;
 static pthread_cond_t  write_cond = PTHREAD_COND_INITIALIZER;
@@ -117,6 +119,10 @@ static size_t          write_threads_num = 0;
 static pthread_key_t   plugin_ctx_key;
 static _Bool           plugin_ctx_key_initialized = 0;
 
+static long            write_limit_high = 0;
+static long            write_limit_low = 0;
+cdtime_t               last_drop_time = 0;
+
 /*
  * Static functions
  */
@@ -670,11 +676,13 @@ static int plugin_write_enqueue (value_list_t const *vl) /* {{{ */
        {
                write_queue_head = q;
                write_queue_tail = q;
+               write_queue_length = 1;
        }
        else
        {
                write_queue_tail->next = q;
                write_queue_tail = q;
+               write_queue_length += 1;
        }
 
        pthread_cond_signal (&write_cond);
@@ -701,8 +709,11 @@ static value_list_t *plugin_write_dequeue (void) /* {{{ */
 
        q = write_queue_head;
        write_queue_head = q->next;
-       if (write_queue_head == NULL)
+       write_queue_length -= 1;
+       if (write_queue_head == NULL) {
                write_queue_tail = NULL;
+               assert(0 == write_queue_length);
+               }
 
        pthread_mutex_unlock (&write_lock);
 
@@ -805,6 +816,7 @@ static void stop_write_threads (void) /* {{{ */
        }
        write_queue_head = NULL;
        write_queue_tail = NULL;
+       write_queue_length = 0;
        pthread_mutex_unlock (&write_lock);
 
        if (i > 0)
@@ -901,6 +913,21 @@ int plugin_load (char const *plugin_name, uint32_t flags)
        dir = plugin_get_dir ();
        ret = 1;
 
+       /*
+        * XXX: Magic at work:
+        *
+        * Some of the language bindings, for example the Python and Perl
+        * plugins, need to be able to export symbols to the scripts they run.
+        * For this to happen, the "Globals" flag needs to be set.
+        * Unfortunately, this technical detail is hard to explain to the
+        * average user and she shouldn't have to worry about this, ideally.
+        * So in order to save everyone's sanity use a different default for a
+        * handful of special plugins. --octo
+        */
+       if ((strcasecmp ("perl", plugin_name) == 0)
+                       || (strcasecmp ("python", plugin_name) == 0))
+               flags |= PLUGIN_FLAGS_GLOBAL;
+
        /* `cpu' should not match `cpufreq'. To solve this we add `.so' to the
         * type when matching the filename */
        status = ssnprintf (typename, sizeof (typename), "%s.so", plugin_name);
@@ -1421,6 +1448,9 @@ void plugin_init_all (void)
        chain_name = global_option_get ("PostCacheChain");
        post_cache_chain = fc_chain_get_by_name (chain_name);
 
+       write_limit_high = global_option_get_long_in_range("WriteQueueLengthLimitHigh",0, 0, LONG_MAX);
+       write_limit_low = global_option_get_long_in_range("WriteQueueLengthLimitLow", (write_limit_high+1)/2, 0, (write_limit_high == 0) ? 0 : write_limit_high-1 );
+
        {
                char const *tmp = global_option_get ("WriteThreads");
                int num = atoi (tmp);
@@ -1935,10 +1965,55 @@ static int plugin_dispatch_values_internal (value_list_t *vl)
        return (0);
 } /* int plugin_dispatch_values_internal */
 
+static _Bool drop_metric(void) {
+       _Bool drop = 0;
+       int wq_len = write_queue_length;
+       /* We store write_queue_length in a local variable because other threads may update write_queue_length.
+        * Having this in a local variable (like a cache) is better : we do not need a lock */
+
+       if(wq_len < write_limit_low) return(0);
+
+       if((write_limit_high > 0) && (wq_len > write_limit_low)) {
+               if(wq_len >= write_limit_high) {
+                       /* if high == low, we come here too */
+                       drop = 1;
+               } else {
+                       /* here, high != low */
+                       long probability_to_drop;
+                       long n;
+
+                       probability_to_drop = (wq_len - write_limit_low);
+
+                       n = cdrand_range(write_limit_low, write_limit_high);
+
+                       /* Let's have X = high - low.
+                        *   n is in range [0..X]
+                        *   probability_to_drop is in range [1..X[
+                        *   probability_to_drop gets bigger when wq_len gets bigger.
+                        */
+                       if(n <= probability_to_drop) {
+                               drop = 1;
+                       }
+               }
+       }
+       if(drop) {
+               cdtime_t now = cdtime();
+               if((now - last_drop_time) > TIME_T_TO_CDTIME_T (60)) {
+                       last_drop_time = now;
+                       /* If you want to count dropped metrics, don't forget to add a lock here */
+                       /* dropped_metrics++; */
+                       ERROR ("plugin_dispatch_values : Low water mark reached, dropping a metric");
+               }
+       }
+       return(drop);
+}
+
 int plugin_dispatch_values (value_list_t const *vl)
 {
        int status;
 
+       if(drop_metric ()) return(0);
+
        status = plugin_write_enqueue (vl);
        if (status != 0)
        {