network plugin: Implemented duplicate detection and a `Forward' option.
authorFlorian Forster <octo@leeloo.lan.home.verplant.org>
Tue, 27 Mar 2007 16:58:56 +0000 (18:58 +0200)
committerFlorian Forster <octo@leeloo.lan.home.verplant.org>
Tue, 27 Mar 2007 16:58:56 +0000 (18:58 +0200)
The plugin will now only send values received via the network, if the `Forward'
option is set to `true'. Also, duplicates are detected and discarded,
preventing loops, duplicate entries and errors from RRDTool.

src/collectd.conf.in
src/collectd.conf.pod
src/network.c

index d8432c5..2b34946 100644 (file)
 #      Listen "ff18::efc0:4a42" "25826"
 #      Listen "239.192.74.66" "25826"
 #      TimeToLive "128"
+#      Forward false
 #</Plugin>
 
 #<Plugin ntpd>
index 6d42f07..02e6def 100644 (file)
@@ -370,6 +370,15 @@ multicast, and IPv4 and IPv6 packets. The default is to not change this value.
 That means that multicast packets will be sent with a TTL of C<1> (one) on most
 operating systems.
 
+=item B<Forward> I<true|false>
+
+If set to I<true>, write packets that were received via the network plugin to
+the sending sockets. This should only be activated when the B<Listen>- and
+B<Server>-statements differ. Otherwise packets may be send multiple times to
+the same multicast group. While this results in more network traffic than
+neccessary it's not a huge problem since the plugin has a duplicate detection,
+so the values will not loop.
+
 =back
 
 =head2 Plugin C<ntpd>
index 35ffb39..3c258f4 100644 (file)
@@ -23,6 +23,7 @@
 #include "plugin.h"
 #include "common.h"
 #include "configfile.h"
+#include "utils_avltree.h"
 
 #include "network.h"
 
@@ -143,11 +144,12 @@ static const char *config_keys[] =
        "Listen",
        "Server",
        "TimeToLive",
-       NULL
+       "Forward"
 };
-static int config_keys_num = 3;
+static int config_keys_num = STATIC_ARRAY_SIZE (config_keys);
 
 static int network_config_ttl = 0;
+static int network_config_forward = 0;
 
 static sockent_t *sending_sockets = NULL;
 
@@ -163,9 +165,68 @@ static value_list_t send_buffer_vl = VALUE_LIST_INIT;
 static char         send_buffer_type[DATA_MAX_NAME_LEN];
 static pthread_mutex_t send_buffer_lock = PTHREAD_MUTEX_INITIALIZER;
 
+static avl_tree_t      *cache_tree = NULL;
+static pthread_mutex_t  cache_lock = PTHREAD_MUTEX_INITIALIZER;
+
 /*
  * Private functions
  */
+static int cache_check (const char *type, const value_list_t *vl)
+{
+       char key[1024];
+       time_t *value = NULL;
+       int retval = -1;
+
+       if (cache_tree == NULL)
+               return (-1);
+
+       if (format_name (key, sizeof (key), vl->host, vl->plugin,
+                               vl->plugin_instance, type, vl->type_instance))
+               return (-1);
+
+       pthread_mutex_lock (&cache_lock);
+
+       if (avl_get (cache_tree, key, (void *) &value) == 0)
+       {
+               if (*value < vl->time)
+               {
+                       *value = vl->time;
+                       retval = 0;
+               }
+               else
+               {
+                       DEBUG ("network plugin: cache_check: *value = %i >= vl->time = %i",
+                                       (int) *value, (int) vl->time);
+                       retval = 1;
+               }
+       }
+       else
+       {
+               char *key_copy = strdup (key);
+               value = malloc (sizeof (time_t));
+               if ((key_copy != NULL) && (value != NULL))
+               {
+                       *value = vl->time;
+                       avl_insert (cache_tree, key_copy, value);
+                       retval = 0;
+               }
+               else
+               {
+                       sfree (key_copy);
+                       sfree (value);
+               }
+       }
+
+       /* TODO: Flush cache */
+
+       pthread_mutex_unlock (&cache_lock);
+
+       DEBUG ("network plugin: cache_check: key = %s; time = %i; retval = %i",
+                       key, (int) vl->time, retval);
+
+       return (retval);
+} /* int cache_check */
+
 static int write_part_values (char **ret_buffer, int *ret_buffer_len,
                const data_set_t *ds, const value_list_t *vl)
 {
@@ -413,7 +474,8 @@ static int parse_packet (void *buffer, int buffer_len)
                        if ((vl.time > 0)
                                        && (strlen (vl.host) > 0)
                                        && (strlen (vl.plugin) > 0)
-                                       && (strlen (type) > 0))
+                                       && (strlen (type) > 0)
+                                       && (cache_check (type, &vl) == 0))
                        {
                                DEBUG ("dispatching values");
                                plugin_dispatch_values (type, &vl);
@@ -458,7 +520,7 @@ static int parse_packet (void *buffer, int buffer_len)
                {
                        status = parse_part_string (&buffer, &buffer_len,
                                        vl.type_instance, sizeof (vl.type_instance));
-                       DEBUG ("network type: parse_packet: vl.type_instance = %s", vl.type_instance);
+                       DEBUG ("network plugin: parse_packet: vl.type_instance = %s", vl.type_instance);
                }
                else
                {
@@ -956,7 +1018,7 @@ static int add_to_buffer (char *buffer, int buffer_size,
                                        vl->host, strlen (vl->host)) != 0)
                        return (-1);
                strcpy (vl_def->host, vl->host);
-               DEBUG ("host = %s", vl->host);
+               DEBUG ("network plugin: add_to_buffer: host = %s", vl->host);
        }
 
        if (vl_def->time != vl->time)
@@ -965,7 +1027,8 @@ static int add_to_buffer (char *buffer, int buffer_size,
                                        (uint64_t) vl->time))
                        return (-1);
                vl_def->time = vl->time;
-               DEBUG ("time = %u", (unsigned int) vl->time);
+               DEBUG ("network plugin: add_to_buffer: time = %u",
+                               (unsigned int) vl->time);
        }
 
        if (strcmp (vl_def->plugin, vl->plugin) != 0)
@@ -974,7 +1037,8 @@ static int add_to_buffer (char *buffer, int buffer_size,
                                        vl->plugin, strlen (vl->plugin)) != 0)
                        return (-1);
                strcpy (vl_def->plugin, vl->plugin);
-               DEBUG ("plugin = %s", vl->plugin);
+               DEBUG ("network plugin: add_to_buffer: plugin = %s",
+                               vl->plugin);
        }
 
        if (strcmp (vl_def->plugin_instance, vl->plugin_instance) != 0)
@@ -984,7 +1048,8 @@ static int add_to_buffer (char *buffer, int buffer_size,
                                        strlen (vl->plugin_instance)) != 0)
                        return (-1);
                strcpy (vl_def->plugin_instance, vl->plugin_instance);
-               DEBUG ("plugin_instance = %s", vl->plugin_instance);
+               DEBUG ("network plugin: add_to_buffer: plugin_instance = %s",
+                               vl->plugin_instance);
        }
 
        if (strcmp (type_def, ds->type) != 0)
@@ -993,7 +1058,7 @@ static int add_to_buffer (char *buffer, int buffer_size,
                                        ds->type, strlen (ds->type)) != 0)
                        return (-1);
                strcpy (type_def, ds->type);
-               DEBUG ("type = %s", ds->type);
+               DEBUG ("network plugin: add_to_buffer: type = %s", ds->type);
        }
 
        if (strcmp (vl_def->type_instance, vl->type_instance) != 0)
@@ -1003,7 +1068,8 @@ static int add_to_buffer (char *buffer, int buffer_size,
                                        strlen (vl->type_instance)) != 0)
                        return (-1);
                strcpy (vl_def->type_instance, vl->type_instance);
-               DEBUG ("type_instance = %s", vl->type_instance);
+               DEBUG ("network plugin: add_to_buffer: type_instance = %s",
+                               vl->type_instance);
        }
        
        if (write_part_values (&buffer, &buffer_size, ds, vl) != 0)
@@ -1025,6 +1091,14 @@ static int network_write (const data_set_t *ds, const value_list_t *vl)
 {
        int status;
 
+       /* If the value is already in the cache, we have received it via the
+        * network. We write it again if forwarding is activated. It's then in
+        * the cache and should we receive it again we will ignore it. */
+       status = cache_check (ds->type, vl);
+       if ((network_config_forward == 0)
+                       && (status != 0))
+               return (0);
+
        pthread_mutex_lock (&send_buffer_lock);
 
        status = add_to_buffer (send_buffer_ptr,
@@ -1105,6 +1179,15 @@ static int network_config (const char *key, const char *val)
                else
                        return (1);
        }
+       else if (strcasecmp ("Forward", key) == 0)
+       {
+               if ((strcasecmp ("true", value) == 0)
+                               || (strcasecmp ("yes", value) == 0)
+                               || (strcasecmp ("on", value) == 0))
+                       network_config_forward = 1;
+               else
+                       network_config_forward = 0;
+       }
        else
        {
                return (-1);
@@ -1127,6 +1210,20 @@ static int network_shutdown (void)
 
        listen_thread = 0;
 
+       if (cache_tree != NULL)
+       {
+               void *key;
+               void *value;
+
+               while (avl_pick (cache_tree, &key, &value) == 0)
+               {
+                       sfree (key);
+                       sfree (value);
+               }
+               avl_destroy (cache_tree);
+               cache_tree = NULL;
+       }
+
        /* TODO: Close `sending_sockets' */
 
        plugin_unregister_config ("network");
@@ -1146,6 +1243,8 @@ static int network_init (void)
        memset (&send_buffer_vl, '\0', sizeof (send_buffer_vl));
        memset (send_buffer_type, '\0', sizeof (send_buffer_type));
 
+       cache_tree = avl_create ((int (*) (const void *, const void *)) strcmp);
+
        /* setup socket(s) and so on */
        if (sending_sockets != NULL)
                plugin_register_write ("network", network_write);