#include <stdio.h>
#include <unistd.h>
#include <string.h>
+#include <stdint.h>
+#include <inttypes.h>
#include <sys/types.h>
#include <sys/stat.h>
cache_item_t *next;
};
+struct callback_flush_data_s
+{
+ time_t now;
+ char **keys;
+ size_t keys_num;
+};
+typedef struct callback_flush_data_s callback_flush_data_t;
+
enum queue_side_e
{
HEAD,
static char **config_listen_address_list = NULL;
static int config_listen_address_list_len = 0;
+static uint64_t stats_queue_length = 0;
+static uint64_t stats_updates_total = 0;
+static uint64_t stats_values_total = 0;
+static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
+
/*
* Functions
*/
static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
queue_side_t side)
{
+ int did_insert = 0;
+
RRDD_LOG (LOG_DEBUG, "enqueue_cache_item: Adding %s to the update queue.",
ci->file);
if (cache_queue_tail == NULL)
cache_queue_tail = cache_queue_head;
+
+ did_insert = 1;
}
else if (cache_queue_head == ci)
{
else
cache_queue_tail->next = ci;
cache_queue_tail = ci;
+
+ did_insert = 1;
}
ci->flags |= CI_FLAGS_IN_QUEUE;
+ if (did_insert)
+ {
+ pthread_mutex_lock (&stats_lock);
+ stats_queue_length++;
+ pthread_mutex_unlock (&stats_lock);
+ }
+
return (0);
} /* }}} int enqueue_cache_item */
* Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
* while this is in progress.
*/
-static gboolean tree_callback_flush (gpointer key /* {{{ */
- __attribute__((unused)), gpointer value, gpointer data)
+static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
+ gpointer data)
{
cache_item_t *ci;
- time_t now;
-
- key = NULL; /* make compiler happy */
+ callback_flush_data_t *cfd;
ci = (cache_item_t *) value;
- now = *((time_t *) data);
+ cfd = (callback_flush_data_t *) data;
- if (((now - ci->last_flush_time) >= config_write_interval)
+ if (((cfd->now - ci->last_flush_time) >= config_write_interval)
&& ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
&& (ci->values_num > 0))
+ {
enqueue_cache_item (ci, TAIL);
+ }
+ else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
+ && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
+ && (ci->values_num <= 0))
+ {
+ char **temp;
- return (TRUE);
+ temp = (char **) realloc (cfd->keys,
+ sizeof (char *) * (cfd->keys_num + 1));
+ if (temp == NULL)
+ {
+ RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
+ return (FALSE);
+ }
+ cfd->keys = temp;
+ /* Make really sure this points to the _same_ place */
+ assert ((char *) key == ci->file);
+ cfd->keys[cfd->keys_num] = (char *) key;
+ cfd->keys_num++;
+ }
+
+ return (FALSE);
} /* }}} gboolean tree_callback_flush */
static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
|| ((now.tv_sec == next_flush.tv_sec)
&& ((1000 * now.tv_usec) > next_flush.tv_nsec)))
{
- time_t time_now;
+ callback_flush_data_t cfd;
+ size_t k;
+ memset (&cfd, 0, sizeof (cfd));
/* Pass the current time as user data so that we don't need to call
* `time' for each node. */
- time_now = time (NULL);
+ cfd.now = time (NULL);
+ cfd.keys = NULL;
+ cfd.keys_num = 0;
+
+ /* `tree_callback_flush' will return the keys of all values that haven't
+ * been touched in the last `config_flush_interval' seconds in `cfd'.
+ * The char*'s in this array point to the same memory as ci->file, so we
+ * don't need to free them separately. */
+ g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
- g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &time_now);
+ for (k = 0; k < cfd.keys_num; k++)
+ {
+ /* This must not fail. */
+ ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
+ assert (ci != NULL);
+
+ /* If we end up here with values available, something's seriously
+ * messed up. */
+ assert (ci->values_num == 0);
+
+ /* Remove the node from the tree */
+ g_tree_remove (cache_tree, cfd.keys[k]);
+ cfd.keys[k] = NULL;
+
+ /* Now free and clean up `ci'. */
+ free (ci->file);
+ ci->file = NULL;
+ free (ci);
+ ci = NULL;
+ } /* for (k = 0; k < cfd.keys_num; k++) */
+
+ if (cfd.keys != NULL)
+ {
+ free (cfd.keys);
+ cfd.keys = NULL;
+ }
/* Determine the time of the next cache flush. */
while (next_flush.tv_sec < now.tv_sec)
cache_queue_tail = NULL;
ci->next = NULL;
+ pthread_mutex_lock (&stats_lock);
+ assert (stats_queue_length > 0);
+ stats_queue_length--;
+ pthread_mutex_unlock (&stats_lock);
+
pthread_mutex_unlock (&cache_lock);
RRDD_LOG (LOG_DEBUG, "queue_thread_main: rrd_update (%s, %i, %p)",
for (i = 0; i < values_num; i++)
free (values[i]);
+ pthread_mutex_lock (&stats_lock);
+ stats_updates_total++;
+ stats_values_total += values_num;
+ pthread_mutex_unlock (&stats_lock);
+
pthread_mutex_lock (&cache_lock);
pthread_cond_broadcast (&flush_cond);
} /* while (do_shutdown == 0) */
pthread_mutex_lock (&cache_lock);
- ci = g_tree_lookup (cache_tree, filename);
+ ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
if (ci == NULL)
{
pthread_mutex_unlock (&cache_lock);
return (0);
} /* }}} int flush_file */
+static int handle_request_stats (int fd, /* {{{ */
+ char *buffer __attribute__((unused)),
+ size_t buffer_size __attribute__((unused)))
+{
+ int status;
+ char outbuf[4096];
+
+ uint64_t copy_queue_length;
+ uint64_t copy_updates_total;
+ uint64_t copy_values_total;
+
+ uint64_t tree_nodes;
+ uint64_t tree_depth;
+
+ pthread_mutex_lock (&stats_lock);
+ copy_queue_length = stats_queue_length;
+ copy_updates_total = stats_updates_total;
+ copy_values_total = stats_values_total;
+ pthread_mutex_unlock (&stats_lock);
+
+ pthread_mutex_lock (&cache_lock);
+ tree_nodes = (uint64_t) g_tree_nnodes (cache_tree);
+ tree_depth = (uint64_t) g_tree_height (cache_tree);
+ pthread_mutex_unlock (&cache_lock);
+
+#define RRDD_STATS_SEND \
+ outbuf[sizeof (outbuf) - 1] = 0; \
+ status = write (fd, outbuf, strlen (outbuf)); \
+ if (status < 0) \
+ { \
+ status = errno; \
+ RRDD_LOG (LOG_INFO, "handle_request_stats: write(2) returned an error."); \
+ return (status); \
+ }
+
+ strncpy (outbuf, "5 Statistics follow\n", sizeof (outbuf));
+ RRDD_STATS_SEND;
+
+ snprintf (outbuf, sizeof (outbuf),
+ "QueueLength: %"PRIu64"\n", copy_queue_length);
+ RRDD_STATS_SEND;
+
+ snprintf (outbuf, sizeof (outbuf),
+ "UpdatesWritten: %"PRIu64"\n", copy_updates_total);
+ RRDD_STATS_SEND;
+
+ snprintf (outbuf, sizeof (outbuf),
+ "ValuesWritten: %"PRIu64"\n", copy_values_total);
+ RRDD_STATS_SEND;
+
+ snprintf (outbuf, sizeof (outbuf),
+ "TreeNodesNumber: %"PRIu64"\n", tree_nodes);
+ RRDD_STATS_SEND;
+
+ snprintf (outbuf, sizeof (outbuf),
+ "TreeDepth: %"PRIu64"\n", tree_depth);
+ RRDD_STATS_SEND;
+
+ return (0);
+} /* }}} int handle_request_stats */
+
static int handle_request_flush (int fd, /* {{{ */
char *buffer, size_t buffer_size)
{
{
return (handle_request_flush (fd, buffer_ptr, buffer_size));
}
+ else if (strcmp (command, "stats") == 0)
+ {
+ return (handle_request_stats (fd, buffer_ptr, buffer_size));
+ }
else
{
RRDD_LOG (LOG_INFO, "handle_request: unknown command: %s.", buffer);