From 27aec36864d1882bf628b9c71c233d70e994a202 Mon Sep 17 00:00:00 2001 From: Florian Forster Date: Tue, 4 Aug 2009 13:02:57 +0200 Subject: [PATCH] network plugin: Use the meta data to implement the `Forward' option. Previously, a cache in the network plugin was used to keep track of which values were received via the network in order to distinguish between ``forwarded'' values and values that were received from somewhere else. The same cache was also used to avoid loops when forwarding packages by keeping track of the highest timestamp that was sent by the plugin and discard received data that was older or as old as that. This information is not kept in the meta data of the global cache (what is the last timestamp sent) and the meta data of the value list (was this value list received via the network?). The cache that was maintained in the network plugin has been removed. --- src/network.c | 260 +++++++++++++++++++++------------------------------------- 1 file changed, 94 insertions(+), 166 deletions(-) diff --git a/src/network.c b/src/network.c index 75f52df6..0e416bd5 100644 --- a/src/network.c +++ b/src/network.c @@ -27,6 +27,7 @@ #include "configfile.h" #include "utils_fbhash.h" #include "utils_avltree.h" +#include "utils_cache.h" #include "network.h" @@ -283,127 +284,98 @@ static int send_buffer_fill; static value_list_t send_buffer_vl = VALUE_LIST_STATIC; static pthread_mutex_t send_buffer_lock = PTHREAD_MUTEX_INITIALIZER; -/* In this cache we store all the values we received, so we can send out only - * those values which were *not* received via the network plugin, too. This is - * used for the `Forward false' option. */ -static c_avl_tree_t *cache_tree = NULL; -static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER; -static time_t cache_flush_last = 0; -static int cache_flush_interval = 1800; - /* * Private functions */ -static int cache_flush (void) +static _Bool check_receive_okay (const value_list_t *vl) /* {{{ */ { - char **keys = NULL; - int keys_num = 0; + uint64_t time_sent = 0; + int status; - char **tmp; - int i; + status = uc_meta_data_get_unsigned_int (vl, + "network:time_sent", &time_sent); - char *key; - time_t *value; - c_avl_iterator_t *iter; + /* This is a value we already sent. Don't allow it to be received again in + * order to avoid looping. */ + if ((status == 0) && (time_sent >= ((uint64_t) vl->time))) + return (false); - time_t curtime = time (NULL); + return (true); +} /* }}} _Bool check_receive_okay */ - iter = c_avl_get_iterator (cache_tree); - while (c_avl_iterator_next (iter, (void *) &key, (void *) &value) == 0) - { - if ((curtime - *value) <= cache_flush_interval) - continue; - tmp = (char **) realloc (keys, - (keys_num + 1) * sizeof (char *)); - if (tmp == NULL) - { - sfree (keys); - c_avl_iterator_destroy (iter); - ERROR ("network plugin: cache_flush: realloc" - " failed."); - return (-1); - } - keys = tmp; - keys[keys_num] = key; - keys_num++; - } /* while (c_avl_iterator_next) */ - c_avl_iterator_destroy (iter); +static _Bool check_send_okay (const value_list_t *vl) /* {{{ */ +{ + _Bool received = false; + int status; - for (i = 0; i < keys_num; i++) - { - if (c_avl_remove (cache_tree, keys[i], (void *) &key, - (void *) &value) != 0) - { - WARNING ("network plugin: cache_flush: c_avl_remove" - " (%s) failed.", keys[i]); - continue; - } + if (network_config_forward != 0) + return (true); - sfree (key); - sfree (value); - } + if (vl->meta == NULL) + return (true); - sfree (keys); + status = meta_data_get_boolean (vl->meta, "network:received", &received); + if (status == -ENOENT) + return (true); + else if (status != 0) + { + ERROR ("network plugin: check_send_okay: meta_data_get_boolean failed " + "with status %i.", status); + return (true); + } - DEBUG ("network plugin: cache_flush: Removed %i %s", - keys_num, (keys_num == 1) ? "entry" : "entries"); - cache_flush_last = curtime; - return (0); -} /* int cache_flush */ + /* By default, only *send* value lists that were not *received* by the + * network plugin. */ + return (!received); +} /* }}} _Bool check_send_okay */ -static int cache_check (const value_list_t *vl) +static int network_dispatch_values (value_list_t *vl) /* {{{ */ { - char key[1024]; - time_t *value = NULL; - int retval = -1; + int status; - if (cache_tree == NULL) - return (-1); + if ((vl->time <= 0) + || (strlen (vl->host) <= 0) + || (strlen (vl->plugin) <= 0) + || (strlen (vl->type) <= 0)) + return (-EINVAL); - if (format_name (key, sizeof (key), vl->host, vl->plugin, - vl->plugin_instance, vl->type, vl->type_instance)) - return (-1); + if (!check_receive_okay (vl)) + { +#if COLLECT_DEBUG + char name[6*DATA_MAX_NAME_LEN]; + FORMAT_VL (name, sizeof (name), vl); + name[sizeof (name) - 1] = 0; + DEBUG ("network plugin: network_dispatch_values: " + "NOT dispatching %s.", name); +#endif + return (0); + } - pthread_mutex_lock (&cache_lock); + assert (vl->meta == NULL); - if (c_avl_get (cache_tree, key, (void *) &value) == 0) - { - if (*value < vl->time) - { - *value = vl->time; - retval = 0; - } - else - { - DEBUG ("network plugin: cache_check: *value = %i >= vl->time = %i", - (int) *value, (int) vl->time); - retval = 1; - } - } - else - { - char *key_copy = strdup (key); - value = malloc (sizeof (time_t)); - if ((key_copy != NULL) && (value != NULL)) - { - *value = vl->time; - c_avl_insert (cache_tree, key_copy, value); - retval = 0; - } - else - { - sfree (key_copy); - sfree (value); - } - } + vl->meta = meta_data_create (); + if (vl->meta == NULL) + { + ERROR ("network plugin: meta_data_create failed."); + return (-ENOMEM); + } - if ((time (NULL) - cache_flush_last) > cache_flush_interval) - cache_flush (); + status = meta_data_add_boolean (vl->meta, "network:received", true); + if (status != 0) + { + ERROR ("network plugin: meta_data_add_boolean failed."); + meta_data_destroy (vl->meta); + vl->meta = NULL; + return (status); + } - pthread_mutex_unlock (&cache_lock); + plugin_dispatch_values (vl); - return (retval); -} /* int cache_check */ + meta_data_destroy (vl->meta); + vl->meta = NULL; + + return (0); +} /* }}} int network_dispatch_values */ #if HAVE_LIBGCRYPT static gcry_cipher_hd_t network_get_aes256_cypher (sockent_t *se, /* {{{ */ @@ -1341,23 +1313,10 @@ static int parse_packet (sockent_t *se, /* {{{ */ { status = parse_part_values (&buffer, &buffer_size, &vl.values, &vl.values_len); - if (status != 0) break; - if ((vl.time > 0) - && (strlen (vl.host) > 0) - && (strlen (vl.plugin) > 0) - && (strlen (vl.type) > 0) - && (cache_check (&vl) == 0)) - { - plugin_dispatch_values (&vl); - } - else - { - DEBUG ("network plugin: parse_packet:" - " NOT dispatching values"); - } + network_dispatch_values (&vl); sfree (vl.values); } @@ -2473,13 +2432,20 @@ static int network_write (const data_set_t *ds, const value_list_t *vl, { int status; - /* If the value is already in the cache, we have received it via the - * network. We write it again if forwarding is activated. It's then in - * the cache and should we receive it again we will ignore it. */ - status = cache_check (vl); - if ((network_config_forward == 0) - && (status != 0)) - return (0); + if (!check_send_okay (vl)) + { +#if COLLECT_DEBUG + char name[6*DATA_MAX_NAME_LEN]; + FORMAT_VL (name, sizeof (name), vl); + name[sizeof (name) - 1] = 0; + DEBUG ("network plugin: network_write: " + "NOT sending %s.", name); +#endif + return (0); + } + + uc_meta_data_add_unsigned_int (vl, + "network:time_sent", (uint64_t) vl->time); pthread_mutex_lock (&send_buffer_lock); @@ -2794,24 +2760,6 @@ static int network_config_add_server (const oconfig_item_t *ci) /* {{{ */ return (0); } /* }}} int network_config_add_server */ -static int network_config_set_cache_flush (const oconfig_item_t *ci) /* {{{ */ -{ - int tmp; - if ((ci->values_num != 1) - || (ci->values[0].type != OCONFIG_TYPE_NUMBER)) - { - WARNING ("network plugin: The `CacheFlush' config option needs exactly " - "one numeric argument."); - return (-1); - } - - tmp = (int) ci->values[0].value.number; - if (tmp > 0) - network_config_ttl = tmp; - - return (0); -} /* }}} int network_config_set_cache_flush */ - static int network_config (oconfig_item_t *ci) /* {{{ */ { int i; @@ -2829,7 +2777,7 @@ static int network_config (oconfig_item_t *ci) /* {{{ */ else if (strcasecmp ("Forward", child->key) == 0) network_config_set_boolean (child, &network_config_forward); else if (strcasecmp ("CacheFlush", child->key) == 0) - network_config_set_cache_flush (child); + /* no op for backwards compatibility only */; else { WARNING ("network plugin: Option `%s' is not allowed here.", @@ -2942,20 +2890,6 @@ static int network_shutdown (void) if (send_buffer_fill > 0) flush_buffer (); - if (cache_tree != NULL) - { - void *key; - void *value; - - while (c_avl_pick (cache_tree, &key, &value) == 0) - { - sfree (key); - sfree (value); - } - c_avl_destroy (cache_tree); - cache_tree = NULL; - } - /* TODO: Close `sending_sockets' */ plugin_unregister_config ("network"); @@ -2963,26 +2897,23 @@ static int network_shutdown (void) plugin_unregister_write ("network"); plugin_unregister_shutdown ("network"); - /* Let the init function do it's move again ;) */ - cache_flush_last = 0; - return (0); } /* int network_shutdown */ static int network_init (void) { + static _Bool have_init = false; + /* Check if we were already initialized. If so, just return - there's * nothing more to do (for now, that is). */ - if (cache_flush_last != 0) + if (have_init) return (0); + have_init = true; plugin_register_shutdown ("network", network_shutdown); network_init_buffer (); - cache_tree = c_avl_create ((int (*) (const void *, const void *)) strcmp); - cache_flush_last = time (NULL); - /* setup socket(s) and so on */ if (sending_sockets != NULL) { @@ -3054,11 +2985,8 @@ static int network_flush (int timeout, { pthread_mutex_lock (&send_buffer_lock); - if (((time (NULL) - cache_flush_last) >= timeout) - && (send_buffer_fill > 0)) - { - flush_buffer (); - } + if (send_buffer_fill > 0) + flush_buffer (); pthread_mutex_unlock (&send_buffer_lock); -- 2.11.0