network plugin: Use the meta data to implement the `Forward' option.
authorFlorian Forster <octo@leeloo.lan.home.verplant.org>
Tue, 4 Aug 2009 11:02:57 +0000 (13:02 +0200)
committerFlorian Forster <octo@leeloo.lan.home.verplant.org>
Tue, 4 Aug 2009 11:02:57 +0000 (13:02 +0200)
Previously, a cache in the network plugin was used to keep track of
which values were received via the network in order to distinguish
between ``forwarded'' values and values that were received from
somewhere else.

The same cache was also used to avoid loops when forwarding packages by
keeping track of the highest timestamp that was sent by the plugin and
discard received data that was older or as old as that.

This information is not kept in the meta data of the global cache (what
is the last timestamp sent) and the meta data of the value list (was
this value list received via the network?). The cache that was
maintained in the network plugin has been removed.

src/network.c

index 75f52df..0e416bd 100644 (file)
@@ -27,6 +27,7 @@
 #include "configfile.h"
 #include "utils_fbhash.h"
 #include "utils_avltree.h"
+#include "utils_cache.h"
 
 #include "network.h"
 
@@ -283,127 +284,98 @@ static int              send_buffer_fill;
 static value_list_t     send_buffer_vl = VALUE_LIST_STATIC;
 static pthread_mutex_t  send_buffer_lock = PTHREAD_MUTEX_INITIALIZER;
 
-/* In this cache we store all the values we received, so we can send out only
- * those values which were *not* received via the network plugin, too. This is
- * used for the `Forward false' option. */
-static c_avl_tree_t    *cache_tree = NULL;
-static pthread_mutex_t  cache_lock = PTHREAD_MUTEX_INITIALIZER;
-static time_t           cache_flush_last = 0;
-static int              cache_flush_interval = 1800;
-
 /*
  * Private functions
  */
-static int cache_flush (void)
+static _Bool check_receive_okay (const value_list_t *vl) /* {{{ */
 {
-       char **keys = NULL;
-       int    keys_num = 0;
+  uint64_t time_sent = 0;
+  int status;
 
-       char **tmp;
-       int    i;
+  status = uc_meta_data_get_unsigned_int (vl,
+      "network:time_sent", &time_sent);
 
-       char   *key;
-       time_t *value;
-       c_avl_iterator_t *iter;
+  /* This is a value we already sent. Don't allow it to be received again in
+   * order to avoid looping. */
+  if ((status == 0) && (time_sent >= ((uint64_t) vl->time)))
+    return (false);
 
-       time_t curtime = time (NULL);
+  return (true);
+} /* }}} _Bool check_receive_okay */
 
-       iter = c_avl_get_iterator (cache_tree);
-       while (c_avl_iterator_next (iter, (void *) &key, (void *) &value) == 0)
-       {
-               if ((curtime - *value) <= cache_flush_interval)
-                       continue;
-               tmp = (char **) realloc (keys,
-                               (keys_num + 1) * sizeof (char *));
-               if (tmp == NULL)
-               {
-                       sfree (keys);
-                       c_avl_iterator_destroy (iter);
-                       ERROR ("network plugin: cache_flush: realloc"
-                                       " failed.");
-                       return (-1);
-               }
-               keys = tmp;
-               keys[keys_num] = key;
-               keys_num++;
-       } /* while (c_avl_iterator_next) */
-       c_avl_iterator_destroy (iter);
+static _Bool check_send_okay (const value_list_t *vl) /* {{{ */
+{
+  _Bool received = false;
+  int status;
 
-       for (i = 0; i < keys_num; i++)
-       {
-               if (c_avl_remove (cache_tree, keys[i], (void *) &key,
-                                       (void *) &value) != 0)
-               {
-                       WARNING ("network plugin: cache_flush: c_avl_remove"
-                                       " (%s) failed.", keys[i]);
-                       continue;
-               }
+  if (network_config_forward != 0)
+    return (true);
 
-               sfree (key);
-               sfree (value);
-       }
+  if (vl->meta == NULL)
+    return (true);
 
-       sfree (keys);
+  status = meta_data_get_boolean (vl->meta, "network:received", &received);
+  if (status == -ENOENT)
+    return (true);
+  else if (status != 0)
+  {
+    ERROR ("network plugin: check_send_okay: meta_data_get_boolean failed "
+       "with status %i.", status);
+    return (true);
+  }
 
-       DEBUG ("network plugin: cache_flush: Removed %i %s",
-                       keys_num, (keys_num == 1) ? "entry" : "entries");
-       cache_flush_last = curtime;
-       return (0);
-} /* int cache_flush */
+  /* By default, only *send* value lists that were not *received* by the
+   * network plugin. */
+  return (!received);
+} /* }}} _Bool check_send_okay */
 
-static int cache_check (const value_list_t *vl)
+static int network_dispatch_values (value_list_t *vl) /* {{{ */
 {
-       char key[1024];
-       time_t *value = NULL;
-       int retval = -1;
+  int status;
 
-       if (cache_tree == NULL)
-               return (-1);
+  if ((vl->time <= 0)
+      || (strlen (vl->host) <= 0)
+      || (strlen (vl->plugin) <= 0)
+      || (strlen (vl->type) <= 0))
+    return (-EINVAL);
 
-       if (format_name (key, sizeof (key), vl->host, vl->plugin,
-                               vl->plugin_instance, vl->type, vl->type_instance))
-               return (-1);
+  if (!check_receive_okay (vl))
+  {
+#if COLLECT_DEBUG
+         char name[6*DATA_MAX_NAME_LEN];
+         FORMAT_VL (name, sizeof (name), vl);
+         name[sizeof (name) - 1] = 0;
+         DEBUG ("network plugin: network_dispatch_values: "
+             "NOT dispatching %s.", name);
+#endif
+    return (0);
+  }
 
-       pthread_mutex_lock (&cache_lock);
+  assert (vl->meta == NULL);
 
-       if (c_avl_get (cache_tree, key, (void *) &value) == 0)
-       {
-               if (*value < vl->time)
-               {
-                       *value = vl->time;
-                       retval = 0;
-               }
-               else
-               {
-                       DEBUG ("network plugin: cache_check: *value = %i >= vl->time = %i",
-                                       (int) *value, (int) vl->time);
-                       retval = 1;
-               }
-       }
-       else
-       {
-               char *key_copy = strdup (key);
-               value = malloc (sizeof (time_t));
-               if ((key_copy != NULL) && (value != NULL))
-               {
-                       *value = vl->time;
-                       c_avl_insert (cache_tree, key_copy, value);
-                       retval = 0;
-               }
-               else
-               {
-                       sfree (key_copy);
-                       sfree (value);
-               }
-       }
+  vl->meta = meta_data_create ();
+  if (vl->meta == NULL)
+  {
+    ERROR ("network plugin: meta_data_create failed.");
+    return (-ENOMEM);
+  }
 
-       if ((time (NULL) - cache_flush_last) > cache_flush_interval)
-               cache_flush ();
+  status = meta_data_add_boolean (vl->meta, "network:received", true);
+  if (status != 0)
+  {
+    ERROR ("network plugin: meta_data_add_boolean failed.");
+    meta_data_destroy (vl->meta);
+    vl->meta = NULL;
+    return (status);
+  }
 
-       pthread_mutex_unlock (&cache_lock);
+  plugin_dispatch_values (vl);
 
-       return (retval);
-} /* int cache_check */
+  meta_data_destroy (vl->meta);
+  vl->meta = NULL;
+
+  return (0);
+} /* }}} int network_dispatch_values */
 
 #if HAVE_LIBGCRYPT
 static gcry_cipher_hd_t network_get_aes256_cypher (sockent_t *se, /* {{{ */
@@ -1341,23 +1313,10 @@ static int parse_packet (sockent_t *se, /* {{{ */
                {
                        status = parse_part_values (&buffer, &buffer_size,
                                        &vl.values, &vl.values_len);
-
                        if (status != 0)
                                break;
 
-                       if ((vl.time > 0)
-                                       && (strlen (vl.host) > 0)
-                                       && (strlen (vl.plugin) > 0)
-                                       && (strlen (vl.type) > 0)
-                                       && (cache_check (&vl) == 0))
-                       {
-                               plugin_dispatch_values (&vl);
-                       }
-                       else
-                       {
-                               DEBUG ("network plugin: parse_packet:"
-                                               " NOT dispatching values");
-                       }
+                       network_dispatch_values (&vl);
 
                        sfree (vl.values);
                }
@@ -2473,13 +2432,20 @@ static int network_write (const data_set_t *ds, const value_list_t *vl,
 {
        int status;
 
-       /* If the value is already in the cache, we have received it via the
-        * network. We write it again if forwarding is activated. It's then in
-        * the cache and should we receive it again we will ignore it. */
-       status = cache_check (vl);
-       if ((network_config_forward == 0)
-                       && (status != 0))
-               return (0);
+       if (!check_send_okay (vl))
+       {
+#if COLLECT_DEBUG
+         char name[6*DATA_MAX_NAME_LEN];
+         FORMAT_VL (name, sizeof (name), vl);
+         name[sizeof (name) - 1] = 0;
+         DEBUG ("network plugin: network_write: "
+             "NOT sending %s.", name);
+#endif
+         return (0);
+       }
+
+       uc_meta_data_add_unsigned_int (vl,
+           "network:time_sent", (uint64_t) vl->time);
 
        pthread_mutex_lock (&send_buffer_lock);
 
@@ -2794,24 +2760,6 @@ static int network_config_add_server (const oconfig_item_t *ci) /* {{{ */
   return (0);
 } /* }}} int network_config_add_server */
 
-static int network_config_set_cache_flush (const oconfig_item_t *ci) /* {{{ */
-{
-  int tmp;
-  if ((ci->values_num != 1)
-      || (ci->values[0].type != OCONFIG_TYPE_NUMBER))
-  {
-    WARNING ("network plugin: The `CacheFlush' config option needs exactly "
-        "one numeric argument.");
-    return (-1);
-  }
-
-  tmp = (int) ci->values[0].value.number;
-  if (tmp > 0)
-    network_config_ttl = tmp;
-
-  return (0);
-} /* }}} int network_config_set_cache_flush */
-
 static int network_config (oconfig_item_t *ci) /* {{{ */
 {
   int i;
@@ -2829,7 +2777,7 @@ static int network_config (oconfig_item_t *ci) /* {{{ */
     else if (strcasecmp ("Forward", child->key) == 0)
       network_config_set_boolean (child, &network_config_forward);
     else if (strcasecmp ("CacheFlush", child->key) == 0)
-      network_config_set_cache_flush (child);
+      /* no op for backwards compatibility only */;
     else
     {
       WARNING ("network plugin: Option `%s' is not allowed here.",
@@ -2942,20 +2890,6 @@ static int network_shutdown (void)
        if (send_buffer_fill > 0)
                flush_buffer ();
 
-       if (cache_tree != NULL)
-       {
-               void *key;
-               void *value;
-
-               while (c_avl_pick (cache_tree, &key, &value) == 0)
-               {
-                       sfree (key);
-                       sfree (value);
-               }
-               c_avl_destroy (cache_tree);
-               cache_tree = NULL;
-       }
-
        /* TODO: Close `sending_sockets' */
 
        plugin_unregister_config ("network");
@@ -2963,26 +2897,23 @@ static int network_shutdown (void)
        plugin_unregister_write ("network");
        plugin_unregister_shutdown ("network");
 
-       /* Let the init function do it's move again ;) */
-       cache_flush_last = 0;
-
        return (0);
 } /* int network_shutdown */
 
 static int network_init (void)
 {
+       static _Bool have_init = false;
+
        /* Check if we were already initialized. If so, just return - there's
         * nothing more to do (for now, that is). */
-       if (cache_flush_last != 0)
+       if (have_init)
                return (0);
+       have_init = true;
 
        plugin_register_shutdown ("network", network_shutdown);
 
        network_init_buffer ();
 
-       cache_tree = c_avl_create ((int (*) (const void *, const void *)) strcmp);
-       cache_flush_last = time (NULL);
-
        /* setup socket(s) and so on */
        if (sending_sockets != NULL)
        {
@@ -3054,11 +2985,8 @@ static int network_flush (int timeout,
 {
        pthread_mutex_lock (&send_buffer_lock);
 
-       if (((time (NULL) - cache_flush_last) >= timeout)
-                       && (send_buffer_fill > 0))
-       {
-               flush_buffer ();
-       }
+       if (send_buffer_fill > 0)
+         flush_buffer ();
 
        pthread_mutex_unlock (&send_buffer_lock);