From: Florian Forster Date: Sun, 22 Jun 2008 13:51:11 +0000 (+0200) Subject: src/rrdd.[ch]: Implemented flushing of dead values once in a while. X-Git-Url: https://git.octo.it/?p=rrdd.git;a=commitdiff_plain;h=057ea5766358de9f0a821bad3a5ab86ee8886e71 src/rrdd.[ch]: Implemented flushing of dead values once in a while. --- diff --git a/src/rrdd.c b/src/rrdd.c index 272b859..03b9bbc 100644 --- a/src/rrdd.c +++ b/src/rrdd.c @@ -96,22 +96,114 @@ static void sig_term_handler (int signal) /* {{{ */ do_shutdown++; } /* }}} void sig_term_handler */ +/* + * enqueue_cache_item: + * `cache_lock' must be acquired before calling this function! + */ +static int enqueue_cache_item (cache_item_t *ci) /* {{{ */ +{ + RRDD_LOG (LOG_DEBUG, "handle_request_update: Adding %s to the update queue.", + ci->file); + + if (ci == NULL) + return (-1); + + if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0) + return (-1); + + assert (ci->next == NULL); + + if (cache_queue_tail == NULL) + cache_queue_head = ci; + else + cache_queue_tail->next = ci; + cache_queue_tail = ci; + + return (0); +} /* }}} int enqueue_cache_item */ + +/* + * tree_callback_flush: + * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held + * while this is in progress. + */ +static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */ + gpointer data) +{ + cache_item_t *ci; + time_t now; + + ci = (cache_item_t *) value; + now = *((time_t *) data); + + if (((now - ci->last_flush_time) >= config_write_interval) + && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)) + enqueue_cache_item (ci); + + return (TRUE); +} /* }}} gboolean tree_callback_flush */ + static void *queue_thread_main (void *args) /* {{{ */ { + struct timeval now; + struct timespec next_flush; + + gettimeofday (&now, NULL); + next_flush.tv_sec = now.tv_sec + config_flush_interval; + next_flush.tv_nsec = 1000 * now.tv_usec; + pthread_mutex_lock (&cache_lock); while ((do_shutdown == 0) || (cache_queue_head != NULL)) { cache_item_t *ci; - char *file; char **values; int values_num; int status; int i; + /* First, check if it's time to do the cache flush. */ + gettimeofday (&now, NULL); + if ((now.tv_sec > next_flush.tv_sec) + || ((now.tv_sec == next_flush.tv_sec) + && ((1000 * now.tv_usec) > next_flush.tv_nsec))) + { + time_t time_now; + + /* Pass the current time as user data so that we don't need to call + * `time' for each node. */ + time_now = time (NULL); + + g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &time_now); + + /* Determine the time of the next cache flush. */ + while (next_flush.tv_sec < now.tv_sec) + next_flush.tv_sec += config_flush_interval; + } + + /* Now, check if there's something to store away. If not, wait until + * something comes in or it's time to do the cache flush. */ if (cache_queue_head == NULL) - pthread_cond_wait (&cache_cond, &cache_lock); + { + struct timespec timeout; + timeout.tv_sec = next_flush.tv_sec - now.tv_sec; + if (next_flush.tv_nsec < (1000 * now.tv_usec)) + { + timeout.tv_sec--; + timeout.tv_nsec = 1000000000 + next_flush.tv_nsec + - (1000 * now.tv_usec); + } + else + { + timeout.tv_nsec = next_flush.tv_nsec - (1000 * now.tv_usec); + } + + pthread_cond_timedwait (&cache_cond, &cache_lock, &timeout); + } + + /* Check if a value has arrived. This may be NULL if we timed out or there + * was an interrupt such as a signal. */ if (cache_queue_head == NULL) continue; @@ -254,17 +346,7 @@ static int handle_request_update (int fd, /* {{{ */ if (((now - ci->last_flush_time) >= config_write_interval) && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)) { - RRDD_LOG (LOG_DEBUG, "handle_request_update: Adding %s to the update queue.", - ci->file); - - assert (ci->next == NULL); - - if (cache_queue_tail == NULL) - cache_queue_head = ci; - else - cache_queue_tail->next = ci; - cache_queue_tail = ci; - + enqueue_cache_item (ci); pthread_cond_signal (&cache_cond); } diff --git a/src/rrdd.h b/src/rrdd.h index 21801b3..75c1e1b 100644 --- a/src/rrdd.h +++ b/src/rrdd.h @@ -68,6 +68,8 @@ #include #include #include +#include +#include #include "config.h"