src/rrd_daemon.c: Flush ALL values when/before shutting down.
authorFlorian Forster <octo@verplant.org>
Fri, 4 Jul 2008 10:19:14 +0000 (12:19 +0200)
committerFlorian Forster <octo@leeloo.home.verplant.org>
Tue, 15 Jul 2008 18:23:53 +0000 (20:23 +0200)
src/rrd_daemon.c

index dfdd0e3..bc299f8 100644 (file)
@@ -122,6 +122,7 @@ struct cache_item_s
 struct callback_flush_data_s
 {
   time_t now;
+  time_t abs_timeout;
   char **keys;
   size_t keys_num;
 };
@@ -396,7 +397,13 @@ static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
   ci = (cache_item_t *) value;
   cfd = (callback_flush_data_t *) data;
 
-  if (((cfd->now - ci->last_flush_time) >= config_write_interval)
+  if ((ci->last_flush_time <= cfd->abs_timeout)
+      && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
+      && (ci->values_num > 0))
+  {
+    enqueue_cache_item (ci, TAIL);
+  }
+  else if ((do_shutdown != 0)
       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
       && (ci->values_num > 0))
   {
@@ -425,6 +432,61 @@ static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
   return (FALSE);
 } /* }}} gboolean tree_callback_flush */
 
+static int flush_old_values (int max_age)
+{
+  callback_flush_data_t cfd;
+  size_t k;
+
+  memset (&cfd, 0, sizeof (cfd));
+  /* Pass the current time as user data so that we don't need to call
+   * `time' for each node. */
+  cfd.now = time (NULL);
+  cfd.keys = NULL;
+  cfd.keys_num = 0;
+
+  if (max_age > 0)
+    cfd.abs_timeout = cfd.now - max_age;
+  else
+    cfd.abs_timeout = cfd.now + 1;
+
+  /* `tree_callback_flush' will return the keys of all values that haven't
+   * been touched in the last `config_flush_interval' seconds in `cfd'.
+   * The char*'s in this array point to the same memory as ci->file, so we
+   * don't need to free them separately. */
+  g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
+
+  for (k = 0; k < cfd.keys_num; k++)
+  {
+    cache_item_t *ci;
+
+    /* This must not fail. */
+    ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
+    assert (ci != NULL);
+
+    /* If we end up here with values available, something's seriously
+     * messed up. */
+    assert (ci->values_num == 0);
+
+    /* Remove the node from the tree */
+    g_tree_remove (cache_tree, cfd.keys[k]);
+    cfd.keys[k] = NULL;
+
+    /* Now free and clean up `ci'. */
+    free (ci->file);
+    ci->file = NULL;
+    free (ci);
+    ci = NULL;
+  } /* for (k = 0; k < cfd.keys_num; k++) */
+
+  if (cfd.keys != NULL)
+  {
+    free (cfd.keys);
+    cfd.keys = NULL;
+  }
+
+  return (0);
+} /* int flush_old_values */
+
 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
 {
   struct timeval now;
@@ -450,48 +512,9 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
         || ((now.tv_sec == next_flush.tv_sec)
           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
     {
-      callback_flush_data_t cfd;
-      size_t k;
-
-      memset (&cfd, 0, sizeof (cfd));
-      /* Pass the current time as user data so that we don't need to call
-       * `time' for each node. */
-      cfd.now = time (NULL);
-      cfd.keys = NULL;
-      cfd.keys_num = 0;
-
-      /* `tree_callback_flush' will return the keys of all values that haven't
-       * been touched in the last `config_flush_interval' seconds in `cfd'.
-       * The char*'s in this array point to the same memory as ci->file, so we
-       * don't need to free them separately. */
-      g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
-
-      for (k = 0; k < cfd.keys_num; k++)
-      {
-        /* This must not fail. */
-        ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
-        assert (ci != NULL);
-
-        /* If we end up here with values available, something's seriously
-         * messed up. */
-        assert (ci->values_num == 0);
-
-        /* Remove the node from the tree */
-        g_tree_remove (cache_tree, cfd.keys[k]);
-        cfd.keys[k] = NULL;
-
-        /* Now free and clean up `ci'. */
-        free (ci->file);
-        ci->file = NULL;
-        free (ci);
-        ci = NULL;
-      } /* for (k = 0; k < cfd.keys_num; k++) */
-
-      if (cfd.keys != NULL)
-      {
-        free (cfd.keys);
-        cfd.keys = NULL;
-      }
+      /* Flush all values that haven't been written in the last
+       * `config_write_interval' seconds. */
+      flush_old_values (config_write_interval);
 
       /* Determine the time of the next cache flush. */
       while (next_flush.tv_sec < now.tv_sec)
@@ -510,6 +533,10 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
       }
     }
 
+    /* We're about to shut down, so lets flush the entire tree. */
+    if ((do_shutdown != 0) && (cache_queue_head == NULL))
+      flush_old_values (/* max age = */ -1);
+
     /* Check if a value has arrived. This may be NULL if we timed out or there
      * was an interrupt such as a signal. */
     if (cache_queue_head == NULL)
@@ -568,7 +595,11 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
 
     pthread_mutex_lock (&cache_lock);
     pthread_cond_broadcast (&flush_cond);
-  } /* while (do_shutdown == 0) */
+
+    /* We're about to shut down, so lets flush the entire tree. */
+    if ((do_shutdown != 0) && (cache_queue_head == NULL))
+      flush_old_values (/* max age = */ -1);
+  } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
   pthread_mutex_unlock (&cache_lock);
 
   return (NULL);