src/rrdd.[ch]: Implemented flushing of dead values once in a while. master
authorFlorian Forster <octo@leeloo.home.verplant.org>
Sun, 22 Jun 2008 13:51:11 +0000 (15:51 +0200)
committerFlorian Forster <octo@leeloo.home.verplant.org>
Sun, 22 Jun 2008 13:51:11 +0000 (15:51 +0200)
src/rrdd.c
src/rrdd.h

index 272b859..03b9bbc 100644 (file)
@@ -96,22 +96,114 @@ static void sig_term_handler (int signal) /* {{{ */
   do_shutdown++;
 } /* }}} void sig_term_handler */
 
+/*
+ * enqueue_cache_item:
+ * `cache_lock' must be acquired before calling this function!
+ */
+static int enqueue_cache_item (cache_item_t *ci) /* {{{ */
+{
+  RRDD_LOG (LOG_DEBUG, "handle_request_update: Adding %s to the update queue.",
+      ci->file);
+
+  if (ci == NULL)
+    return (-1);
+
+  if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
+    return (-1);
+
+  assert (ci->next == NULL);
+
+  if (cache_queue_tail == NULL)
+    cache_queue_head = ci;
+  else
+    cache_queue_tail->next = ci;
+  cache_queue_tail = ci;
+
+  return (0);
+} /* }}} int enqueue_cache_item */
+
+/*
+ * tree_callback_flush:
+ * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
+ * while this is in progress.
+ */
+static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
+    gpointer data)
+{
+  cache_item_t *ci;
+  time_t now;
+
+  ci = (cache_item_t *) value;
+  now = *((time_t *) data);
+
+  if (((now - ci->last_flush_time) >= config_write_interval)
+      && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0))
+    enqueue_cache_item (ci);
+
+  return (TRUE);
+} /* }}} gboolean tree_callback_flush */
+
 static void *queue_thread_main (void *args) /* {{{ */
 {
+  struct timeval now;
+  struct timespec next_flush;
+
+  gettimeofday (&now, NULL);
+  next_flush.tv_sec = now.tv_sec + config_flush_interval;
+  next_flush.tv_nsec = 1000 * now.tv_usec;
+
   pthread_mutex_lock (&cache_lock);
   while ((do_shutdown == 0) || (cache_queue_head != NULL))
   {
     cache_item_t *ci;
-
     char *file;
     char **values;
     int values_num;
     int status;
     int i;
 
+    /* First, check if it's time to do the cache flush. */
+    gettimeofday (&now, NULL);
+    if ((now.tv_sec > next_flush.tv_sec)
+        || ((now.tv_sec == next_flush.tv_sec)
+          && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
+    {
+      time_t time_now;
+
+      /* Pass the current time as user data so that we don't need to call
+       * `time' for each node. */
+      time_now = time (NULL);
+
+      g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &time_now);
+
+      /* Determine the time of the next cache flush. */
+      while (next_flush.tv_sec < now.tv_sec)
+        next_flush.tv_sec += config_flush_interval;
+    }
+
+    /* Now, check if there's something to store away. If not, wait until
+     * something comes in or it's time to do the cache flush. */
     if (cache_queue_head == NULL)
-      pthread_cond_wait (&cache_cond, &cache_lock);
+    {
+      struct timespec timeout;
 
+      timeout.tv_sec = next_flush.tv_sec - now.tv_sec;
+      if (next_flush.tv_nsec < (1000 * now.tv_usec))
+      {
+        timeout.tv_sec--;
+        timeout.tv_nsec = 1000000000 + next_flush.tv_nsec
+          - (1000 * now.tv_usec);
+      }
+      else
+      {
+        timeout.tv_nsec = next_flush.tv_nsec - (1000 * now.tv_usec);
+      }
+
+      pthread_cond_timedwait (&cache_cond, &cache_lock, &timeout);
+    }
+
+    /* Check if a value has arrived. This may be NULL if we timed out or there
+     * was an interrupt such as a signal. */
     if (cache_queue_head == NULL)
       continue;
 
@@ -254,17 +346,7 @@ static int handle_request_update (int fd, /* {{{ */
   if (((now - ci->last_flush_time) >= config_write_interval)
       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0))
   {
-    RRDD_LOG (LOG_DEBUG, "handle_request_update: Adding %s to the update queue.",
-        ci->file);
-
-    assert (ci->next == NULL);
-
-    if (cache_queue_tail == NULL)
-      cache_queue_head = ci;
-    else
-      cache_queue_tail->next = ci;
-    cache_queue_tail = ci;
-
+    enqueue_cache_item (ci);
     pthread_cond_signal (&cache_cond);
   }
 
index 21801b3..75c1e1b 100644 (file)
@@ -68,6 +68,8 @@
 #include <pthread.h>
 #include <errno.h>
 #include <assert.h>
+#include <sys/time.h>
+#include <time.h>
 
 #include "config.h"