src/rrd_daemon.c: Impemented the `stats' command.
[rrdtool.git] / src / rrd_daemon.c
index 87ac8e9..024738b 100644 (file)
@@ -66,6 +66,8 @@
 #include <stdio.h>
 #include <unistd.h>
 #include <string.h>
+#include <stdint.h>
+#include <inttypes.h>
 
 #include <sys/types.h>
 #include <sys/stat.h>
@@ -116,6 +118,14 @@ struct cache_item_s
   cache_item_t *next;
 };
 
+struct callback_flush_data_s
+{
+  time_t now;
+  char **keys;
+  size_t keys_num;
+};
+typedef struct callback_flush_data_s callback_flush_data_t;
+
 enum queue_side_e
 {
   HEAD,
@@ -154,6 +164,11 @@ static char *config_base_dir = NULL;
 static char **config_listen_address_list = NULL;
 static int config_listen_address_list_len = 0;
 
+static uint64_t stats_queue_length = 0;
+static uint64_t stats_updates_total = 0;
+static uint64_t stats_values_total = 0;
+static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
+
 /* 
  * Functions
  */
@@ -214,6 +229,8 @@ static int remove_pidfile (void) /* {{{ */
 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
     queue_side_t side)
 {
+  int did_insert = 0;
+
   RRDD_LOG (LOG_DEBUG, "enqueue_cache_item: Adding %s to the update queue.",
       ci->file);
 
@@ -233,6 +250,8 @@ static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
 
       if (cache_queue_tail == NULL)
         cache_queue_tail = cache_queue_head;
+
+      did_insert = 1;
     }
     else if (cache_queue_head == ci)
     {
@@ -271,10 +290,19 @@ static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
     else
       cache_queue_tail->next = ci;
     cache_queue_tail = ci;
+
+    did_insert = 1;
   }
 
   ci->flags |= CI_FLAGS_IN_QUEUE;
 
+  if (did_insert)
+  {
+    pthread_mutex_lock (&stats_lock);
+    stats_queue_length++;
+    pthread_mutex_unlock (&stats_lock);
+  }
+
   return (0);
 } /* }}} int enqueue_cache_item */
 
@@ -283,23 +311,42 @@ static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
  * while this is in progress.
  */
-static gboolean tree_callback_flush (gpointer key /* {{{ */
-    __attribute__((unused)), gpointer value, gpointer data)
+static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
+    gpointer data)
 {
   cache_item_t *ci;
-  time_t now;
-
-  key = NULL; /* make compiler happy */
+  callback_flush_data_t *cfd;
 
   ci = (cache_item_t *) value;
-  now = *((time_t *) data);
+  cfd = (callback_flush_data_t *) data;
 
-  if (((now - ci->last_flush_time) >= config_write_interval)
+  if (((cfd->now - ci->last_flush_time) >= config_write_interval)
       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
       && (ci->values_num > 0))
+  {
     enqueue_cache_item (ci, TAIL);
+  }
+  else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
+      && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
+      && (ci->values_num <= 0))
+  {
+    char **temp;
 
-  return (TRUE);
+    temp = (char **) realloc (cfd->keys,
+        sizeof (char *) * (cfd->keys_num + 1));
+    if (temp == NULL)
+    {
+      RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
+      return (FALSE);
+    }
+    cfd->keys = temp;
+    /* Make really sure this points to the _same_ place */
+    assert ((char *) key == ci->file);
+    cfd->keys[cfd->keys_num] = (char *) key;
+    cfd->keys_num++;
+  }
+
+  return (FALSE);
 } /* }}} gboolean tree_callback_flush */
 
 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
@@ -327,13 +374,48 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
         || ((now.tv_sec == next_flush.tv_sec)
           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
     {
-      time_t time_now;
+      callback_flush_data_t cfd;
+      size_t k;
 
+      memset (&cfd, 0, sizeof (cfd));
       /* Pass the current time as user data so that we don't need to call
        * `time' for each node. */
-      time_now = time (NULL);
+      cfd.now = time (NULL);
+      cfd.keys = NULL;
+      cfd.keys_num = 0;
+
+      /* `tree_callback_flush' will return the keys of all values that haven't
+       * been touched in the last `config_flush_interval' seconds in `cfd'.
+       * The char*'s in this array point to the same memory as ci->file, so we
+       * don't need to free them separately. */
+      g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
 
-      g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &time_now);
+      for (k = 0; k < cfd.keys_num; k++)
+      {
+        /* This must not fail. */
+        ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
+        assert (ci != NULL);
+
+        /* If we end up here with values available, something's seriously
+         * messed up. */
+        assert (ci->values_num == 0);
+
+        /* Remove the node from the tree */
+        g_tree_remove (cache_tree, cfd.keys[k]);
+        cfd.keys[k] = NULL;
+
+        /* Now free and clean up `ci'. */
+        free (ci->file);
+        ci->file = NULL;
+        free (ci);
+        ci = NULL;
+      } /* for (k = 0; k < cfd.keys_num; k++) */
+
+      if (cfd.keys != NULL)
+      {
+        free (cfd.keys);
+        cfd.keys = NULL;
+      }
 
       /* Determine the time of the next cache flush. */
       while (next_flush.tv_sec < now.tv_sec)
@@ -381,6 +463,11 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
       cache_queue_tail = NULL;
     ci->next = NULL;
 
+    pthread_mutex_lock (&stats_lock);
+    assert (stats_queue_length > 0);
+    stats_queue_length--;
+    pthread_mutex_unlock (&stats_lock);
+
     pthread_mutex_unlock (&cache_lock);
 
     RRDD_LOG (LOG_DEBUG, "queue_thread_main: rrd_update (%s, %i, %p)",
@@ -398,6 +485,11 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
     for (i = 0; i < values_num; i++)
       free (values[i]);
 
+    pthread_mutex_lock (&stats_lock);
+    stats_updates_total++;
+    stats_values_total += values_num;
+    pthread_mutex_unlock (&stats_lock);
+
     pthread_mutex_lock (&cache_lock);
     pthread_cond_broadcast (&flush_cond);
   } /* while (do_shutdown == 0) */
@@ -474,7 +566,7 @@ static int flush_file (const char *filename) /* {{{ */
 
   pthread_mutex_lock (&cache_lock);
 
-  ci = g_tree_lookup (cache_tree, filename);
+  ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
   if (ci == NULL)
   {
     pthread_mutex_unlock (&cache_lock);
@@ -505,6 +597,67 @@ static int flush_file (const char *filename) /* {{{ */
   return (0);
 } /* }}} int flush_file */
 
+static int handle_request_stats (int fd, /* {{{ */
+    char *buffer __attribute__((unused)),
+    size_t buffer_size __attribute__((unused)))
+{
+  int status;
+  char outbuf[4096];
+
+  uint64_t copy_queue_length;
+  uint64_t copy_updates_total;
+  uint64_t copy_values_total;
+
+  uint64_t tree_nodes;
+  uint64_t tree_depth;
+
+  pthread_mutex_lock (&stats_lock);
+  copy_queue_length  = stats_queue_length;
+  copy_updates_total = stats_updates_total;
+  copy_values_total  = stats_values_total;
+  pthread_mutex_unlock (&stats_lock);
+
+  pthread_mutex_lock (&cache_lock);
+  tree_nodes = (uint64_t) g_tree_nnodes (cache_tree);
+  tree_depth = (uint64_t) g_tree_height (cache_tree);
+  pthread_mutex_unlock (&cache_lock);
+
+#define RRDD_STATS_SEND \
+  outbuf[sizeof (outbuf) - 1] = 0; \
+  status = write (fd, outbuf, strlen (outbuf)); \
+  if (status < 0) \
+  { \
+    status = errno; \
+    RRDD_LOG (LOG_INFO, "handle_request_stats: write(2) returned an error."); \
+    return (status); \
+  }
+
+  strncpy (outbuf, "5 Statistics follow\n", sizeof (outbuf));
+  RRDD_STATS_SEND;
+
+  snprintf (outbuf, sizeof (outbuf),
+      "QueueLength: %"PRIu64"\n", copy_queue_length);
+  RRDD_STATS_SEND;
+
+  snprintf (outbuf, sizeof (outbuf),
+      "UpdatesWritten: %"PRIu64"\n", copy_updates_total);
+  RRDD_STATS_SEND;
+
+  snprintf (outbuf, sizeof (outbuf),
+      "ValuesWritten: %"PRIu64"\n", copy_values_total);
+  RRDD_STATS_SEND;
+
+  snprintf (outbuf, sizeof (outbuf),
+      "TreeNodesNumber: %"PRIu64"\n", tree_nodes);
+  RRDD_STATS_SEND;
+
+  snprintf (outbuf, sizeof (outbuf),
+      "TreeDepth: %"PRIu64"\n", tree_depth);
+  RRDD_STATS_SEND;
+
+  return (0);
+} /* }}} int handle_request_stats */
+
 static int handle_request_flush (int fd, /* {{{ */
     char *buffer, size_t buffer_size)
 {
@@ -708,6 +861,10 @@ static int handle_request (int fd) /* {{{ */
   {
     return (handle_request_flush (fd, buffer_ptr, buffer_size));
   }
+  else if (strcmp (command, "stats") == 0)
+  {
+    return (handle_request_stats (fd, buffer_ptr, buffer_size));
+  }
   else
   {
     RRDD_LOG (LOG_INFO, "handle_request: unknown command: %s.", buffer);