#include <stdio.h>
#include <unistd.h>
#include <string.h>
+#include <strings.h>
+#include <stdint.h>
+#include <inttypes.h>
#include <sys/types.h>
#include <sys/stat.h>
cache_item_t *next;
};
+struct callback_flush_data_s
+{
+ time_t now;
+ char **keys;
+ size_t keys_num;
+};
+typedef struct callback_flush_data_s callback_flush_data_t;
+
enum queue_side_e
{
HEAD,
static char **config_listen_address_list = NULL;
static int config_listen_address_list_len = 0;
+static uint64_t stats_queue_length = 0;
+static uint64_t stats_updates_written = 0;
+static uint64_t stats_data_sets_written = 0;
+static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
+
/*
* Functions
*/
return (errno);
} /* }}} int remove_pidfile */
+static ssize_t sread (int fd, void *buffer_void, size_t buffer_size) /* {{{ */
+{
+ char *buffer;
+ size_t buffer_used;
+ size_t buffer_free;
+ ssize_t status;
+
+ buffer = (char *) buffer_void;
+ buffer_used = 0;
+ buffer_free = buffer_size;
+
+ while (buffer_free > 0)
+ {
+ status = read (fd, buffer + buffer_used, buffer_free);
+ if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
+ continue;
+
+ if (status < 0)
+ return (-1);
+
+ if (status == 0)
+ return (0);
+
+ assert ((0 > status) || (buffer_free >= (size_t) status));
+
+ buffer_free = buffer_free - status;
+ buffer_used = buffer_used + status;
+
+ if (buffer[buffer_used - 1] == '\n')
+ break;
+ }
+
+ assert (buffer_used > 0);
+
+ if (buffer[buffer_used - 1] != '\n')
+ {
+ errno = ENOBUFS;
+ return (-1);
+ }
+
+ buffer[buffer_used - 1] = 0;
+
+ /* Fix network line endings. */
+ if ((buffer_used > 1) && (buffer[buffer_used - 2] == '\r'))
+ {
+ buffer_used--;
+ buffer[buffer_used - 1] = 0;
+ }
+
+ return (buffer_used);
+} /* }}} ssize_t sread */
+
+static ssize_t swrite (int fd, const void *buf, size_t count) /* {{{ */
+{
+ const char *ptr;
+ size_t nleft;
+ ssize_t status;
+
+ ptr = (const char *) buf;
+ nleft = count;
+
+ while (nleft > 0)
+ {
+ status = write (fd, (const void *) ptr, nleft);
+
+ if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
+ continue;
+
+ if (status < 0)
+ return (status);
+
+ nleft = nleft - status;
+ ptr = ptr + status;
+ }
+
+ return (0);
+} /* }}} ssize_t swrite */
+
/*
* enqueue_cache_item:
* `cache_lock' must be acquired before calling this function!
static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
queue_side_t side)
{
+ int did_insert = 0;
+
RRDD_LOG (LOG_DEBUG, "enqueue_cache_item: Adding %s to the update queue.",
ci->file);
if (cache_queue_tail == NULL)
cache_queue_tail = cache_queue_head;
+
+ did_insert = 1;
}
else if (cache_queue_head == ci)
{
else
cache_queue_tail->next = ci;
cache_queue_tail = ci;
+
+ did_insert = 1;
}
ci->flags |= CI_FLAGS_IN_QUEUE;
+ if (did_insert)
+ {
+ pthread_mutex_lock (&stats_lock);
+ stats_queue_length++;
+ pthread_mutex_unlock (&stats_lock);
+ }
+
return (0);
} /* }}} int enqueue_cache_item */
* Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
* while this is in progress.
*/
-static gboolean tree_callback_flush (gpointer key /* {{{ */
- __attribute__((unused)), gpointer value, gpointer data)
+static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
+ gpointer data)
{
cache_item_t *ci;
- time_t now;
-
- key = NULL; /* make compiler happy */
+ callback_flush_data_t *cfd;
ci = (cache_item_t *) value;
- now = *((time_t *) data);
+ cfd = (callback_flush_data_t *) data;
- if (((now - ci->last_flush_time) >= config_write_interval)
+ if (((cfd->now - ci->last_flush_time) >= config_write_interval)
&& ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
&& (ci->values_num > 0))
+ {
enqueue_cache_item (ci, TAIL);
+ }
+ else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
+ && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
+ && (ci->values_num <= 0))
+ {
+ char **temp;
- return (TRUE);
+ temp = (char **) realloc (cfd->keys,
+ sizeof (char *) * (cfd->keys_num + 1));
+ if (temp == NULL)
+ {
+ RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
+ return (FALSE);
+ }
+ cfd->keys = temp;
+ /* Make really sure this points to the _same_ place */
+ assert ((char *) key == ci->file);
+ cfd->keys[cfd->keys_num] = (char *) key;
+ cfd->keys_num++;
+ }
+
+ return (FALSE);
} /* }}} gboolean tree_callback_flush */
static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
|| ((now.tv_sec == next_flush.tv_sec)
&& ((1000 * now.tv_usec) > next_flush.tv_nsec)))
{
- time_t time_now;
+ callback_flush_data_t cfd;
+ size_t k;
+ memset (&cfd, 0, sizeof (cfd));
/* Pass the current time as user data so that we don't need to call
* `time' for each node. */
- time_now = time (NULL);
+ cfd.now = time (NULL);
+ cfd.keys = NULL;
+ cfd.keys_num = 0;
- g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &time_now);
+ /* `tree_callback_flush' will return the keys of all values that haven't
+ * been touched in the last `config_flush_interval' seconds in `cfd'.
+ * The char*'s in this array point to the same memory as ci->file, so we
+ * don't need to free them separately. */
+ g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
+
+ for (k = 0; k < cfd.keys_num; k++)
+ {
+ /* This must not fail. */
+ ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
+ assert (ci != NULL);
+
+ /* If we end up here with values available, something's seriously
+ * messed up. */
+ assert (ci->values_num == 0);
+
+ /* Remove the node from the tree */
+ g_tree_remove (cache_tree, cfd.keys[k]);
+ cfd.keys[k] = NULL;
+
+ /* Now free and clean up `ci'. */
+ free (ci->file);
+ ci->file = NULL;
+ free (ci);
+ ci = NULL;
+ } /* for (k = 0; k < cfd.keys_num; k++) */
+
+ if (cfd.keys != NULL)
+ {
+ free (cfd.keys);
+ cfd.keys = NULL;
+ }
/* Determine the time of the next cache flush. */
while (next_flush.tv_sec < now.tv_sec)
cache_queue_tail = NULL;
ci->next = NULL;
+ pthread_mutex_lock (&stats_lock);
+ assert (stats_queue_length > 0);
+ stats_queue_length--;
+ pthread_mutex_unlock (&stats_lock);
+
pthread_mutex_unlock (&cache_lock);
RRDD_LOG (LOG_DEBUG, "queue_thread_main: rrd_update (%s, %i, %p)",
for (i = 0; i < values_num; i++)
free (values[i]);
+ pthread_mutex_lock (&stats_lock);
+ stats_updates_written++;
+ stats_data_sets_written += values_num;
+ pthread_mutex_unlock (&stats_lock);
+
pthread_mutex_lock (&cache_lock);
pthread_cond_broadcast (&flush_cond);
} /* while (do_shutdown == 0) */
field = *buffer_ret;
field_size = 0;
+ if (buffer_size <= 0)
+ return (-1);
+
/* This is ensured by `handle_request'. */
assert (buffer[buffer_size - 1] == ' ');
pthread_mutex_lock (&cache_lock);
- ci = g_tree_lookup (cache_tree, filename);
+ ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
if (ci == NULL)
{
pthread_mutex_unlock (&cache_lock);
return (0);
} /* }}} int flush_file */
+static int handle_request_help (int fd, /* {{{ */
+ char *buffer, size_t buffer_size)
+{
+ int status;
+ char **help_text;
+ size_t help_text_len;
+ char *command;
+ size_t i;
+
+ char *help_help[] =
+ {
+ "4 Command overview\n",
+ "FLUSH <filename>\n",
+ "HELP [<command>]\n",
+ "UPDATE <filename> <values> [<values> ...]\n",
+ "STATS\n"
+ };
+ size_t help_help_len = sizeof (help_help) / sizeof (help_help[0]);
+
+ char *help_flush[] =
+ {
+ "4 Help for FLUSH\n",
+ "Usage: FLUSH <filename>\n",
+ "\n",
+ "Adds the given filename to the head of the update queue and returns\n",
+ "after is has been dequeued.\n"
+ };
+ size_t help_flush_len = sizeof (help_flush) / sizeof (help_flush[0]);
+
+ char *help_update[] =
+ {
+ "9 Help for UPDATE\n",
+ "Usage: UPDATE <filename> <values> [<values> ...]\n"
+ "\n",
+ "Adds the given file to the internal cache if it is not yet known and\n",
+ "appends the given value(s) to the entry. See the rrdcached(1) manpage\n",
+ "for details.\n",
+ "\n",
+ "Each <values> has the following form:\n",
+ " <values> = <time>:<value>[:<value>[...]]\n",
+ "See the rrdupdate(1) manpage for details.\n"
+ };
+ size_t help_update_len = sizeof (help_update) / sizeof (help_update[0]);
+
+ char *help_stats[] =
+ {
+ "4 Help for STATS\n",
+ "Usage: STATS\n",
+ "\n",
+ "Returns some performance counters, see the rrdcached(1) manpage for\n",
+ "a description of the values.\n"
+ };
+ size_t help_stats_len = sizeof (help_stats) / sizeof (help_stats[0]);
+
+ status = buffer_get_field (&buffer, &buffer_size, &command);
+ if (status != 0)
+ {
+ help_text = help_help;
+ help_text_len = help_help_len;
+ }
+ else
+ {
+ if (strcasecmp (command, "update") == 0)
+ {
+ help_text = help_update;
+ help_text_len = help_update_len;
+ }
+ else if (strcasecmp (command, "flush") == 0)
+ {
+ help_text = help_flush;
+ help_text_len = help_flush_len;
+ }
+ else if (strcasecmp (command, "stats") == 0)
+ {
+ help_text = help_stats;
+ help_text_len = help_stats_len;
+ }
+ else
+ {
+ help_text = help_help;
+ help_text_len = help_help_len;
+ }
+ }
+
+ for (i = 0; i < help_text_len; i++)
+ {
+ status = swrite (fd, help_text[i], strlen (help_text[i]));
+ if (status < 0)
+ {
+ status = errno;
+ RRDD_LOG (LOG_ERR, "handle_request_help: swrite returned an error.");
+ return (status);
+ }
+ }
+
+ return (0);
+} /* }}} int handle_request_help */
+
+static int handle_request_stats (int fd, /* {{{ */
+ char *buffer __attribute__((unused)),
+ size_t buffer_size __attribute__((unused)))
+{
+ int status;
+ char outbuf[4096];
+
+ uint64_t copy_queue_length;
+ uint64_t copy_updates_written;
+ uint64_t copy_data_sets_written;
+
+ uint64_t tree_nodes_number;
+ uint64_t tree_depth;
+
+ pthread_mutex_lock (&stats_lock);
+ copy_queue_length = stats_queue_length;
+ copy_updates_written = stats_updates_written;
+ copy_data_sets_written = stats_data_sets_written;
+ pthread_mutex_unlock (&stats_lock);
+
+ pthread_mutex_lock (&cache_lock);
+ tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
+ tree_depth = (uint64_t) g_tree_height (cache_tree);
+ pthread_mutex_unlock (&cache_lock);
+
+#define RRDD_STATS_SEND \
+ outbuf[sizeof (outbuf) - 1] = 0; \
+ status = swrite (fd, outbuf, strlen (outbuf)); \
+ if (status < 0) \
+ { \
+ status = errno; \
+ RRDD_LOG (LOG_INFO, "handle_request_stats: swrite returned an error."); \
+ return (status); \
+ }
+
+ strncpy (outbuf, "5 Statistics follow\n", sizeof (outbuf));
+ RRDD_STATS_SEND;
+
+ snprintf (outbuf, sizeof (outbuf),
+ "QueueLength: %"PRIu64"\n", copy_queue_length);
+ RRDD_STATS_SEND;
+
+ snprintf (outbuf, sizeof (outbuf),
+ "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
+ RRDD_STATS_SEND;
+
+ snprintf (outbuf, sizeof (outbuf),
+ "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
+ RRDD_STATS_SEND;
+
+ snprintf (outbuf, sizeof (outbuf),
+ "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
+ RRDD_STATS_SEND;
+
+ snprintf (outbuf, sizeof (outbuf),
+ "TreeDepth: %"PRIu64"\n", tree_depth);
+ RRDD_STATS_SEND;
+
+ return (0);
+#undef RRDD_STATS_SEND
+} /* }}} int handle_request_stats */
+
static int handle_request_flush (int fd, /* {{{ */
char *buffer, size_t buffer_size)
{
status = buffer_get_field (&buffer, &buffer_size, &file);
if (status != 0)
{
- RRDD_LOG (LOG_INFO, "handle_request_flush: Cannot get file name.");
- return (-1);
+ strncpy (result, "-1 Usage: flush <filename>\n", sizeof (result));
}
-
- status = flush_file (file);
- if (status == 0)
- snprintf (result, sizeof (result), "0 Successfully flushed %s.\n", file);
- else if (status == ENOENT)
- snprintf (result, sizeof (result), "-1 No such file: %s.\n", file);
- else if (status < 0)
- strncpy (result, "-1 Internal error.\n", sizeof (result));
else
- snprintf (result, sizeof (result), "-1 Failed with status %i.\n", status);
+ {
+ status = flush_file (file);
+ if (status == 0)
+ snprintf (result, sizeof (result), "0 Successfully flushed %s.\n", file);
+ else if (status == ENOENT)
+ snprintf (result, sizeof (result), "-1 No such file: %s.\n", file);
+ else if (status < 0)
+ strncpy (result, "-1 Internal error.\n", sizeof (result));
+ else
+ snprintf (result, sizeof (result), "-1 Failed with status %i.\n", status);
+ }
result[sizeof (result) - 1] = 0;
- status = write (fd, result, strlen (result));
+ status = swrite (fd, result, strlen (result));
if (status < 0)
{
status = errno;
- RRDD_LOG (LOG_INFO, "handle_request_flush: write(2) returned an error.");
+ RRDD_LOG (LOG_INFO, "handle_request_flush: swrite returned an error.");
return (status);
}
cache_item_t *ci;
char answer[4096];
+#define RRDD_UPDATE_SEND \
+ answer[sizeof (answer) - 1] = 0; \
+ status = swrite (fd, answer, strlen (answer)); \
+ if (status < 0) \
+ { \
+ status = errno; \
+ RRDD_LOG (LOG_INFO, "handle_request_update: swrite returned an error."); \
+ return (status); \
+ }
+
now = time (NULL);
status = buffer_get_field (&buffer, &buffer_size, &file);
if (status != 0)
{
- RRDD_LOG (LOG_INFO, "handle_request_update: Cannot get file name.");
- return (-1);
+ strncpy (answer, "-1 Usage: UPDATE <filename> <values> [<values> ...]\n",
+ sizeof (answer));
+ RRDD_UPDATE_SEND;
+ return (0);
}
pthread_mutex_lock (&cache_lock);
{
pthread_mutex_unlock (&cache_lock);
RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
- return (-1);
+
+ strncpy (answer, "-1 malloc failed.\n", sizeof (answer));
+ RRDD_UPDATE_SEND;
+ return (0);
}
memset (ci, 0, sizeof (cache_item_t));
if (ci->file == NULL)
{
pthread_mutex_unlock (&cache_lock);
- RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
free (ci);
- return (-1);
+ RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
+
+ strncpy (answer, "-1 strdup failed.\n", sizeof (answer));
+ RRDD_UPDATE_SEND;
+ return (0);
}
ci->values = NULL;
pthread_mutex_unlock (&cache_lock);
- snprintf (answer, sizeof (answer), "0 Enqueued %i value(s)\n", values_num);
- answer[sizeof (answer) - 1] = 0;
-
- status = write (fd, answer, strlen (answer));
- if (status < 0)
+ if (values_num < 1)
{
- status = errno;
- RRDD_LOG (LOG_INFO, "handle_request_update: write(2) returned an error.");
- return (status);
+ strncpy (answer, "-1 No values updated.\n", sizeof (answer));
}
-
+ else
+ {
+ snprintf (answer, sizeof (answer), "0 Enqueued %i value%s\n", values_num,
+ (values_num == 1) ? "" : "s");
+ }
+ RRDD_UPDATE_SEND;
return (0);
+#undef RRDD_UPDATE_SEND
} /* }}} int handle_request_update */
static int handle_request (int fd) /* {{{ */
char *command;
int status;
- status = read (fd, buffer, sizeof (buffer));
+ status = (int) sread (fd, buffer, sizeof (buffer));
if (status == 0)
{
return (1);
}
else if (status < 0)
{
- RRDD_LOG (LOG_ERR, "handle_request: read(2) failed.");
- return (-1);
- }
- buffer_size = status;
- assert (((size_t) buffer_size) <= sizeof (buffer));
-
- if (buffer[buffer_size - 1] != '\n')
- {
- RRDD_LOG (LOG_INFO, "handle_request: malformed request.");
+ RRDD_LOG (LOG_ERR, "handle_request: sread failed.");
return (-1);
}
-
- /* Accept Windows style line endings, too */
- if ((buffer_size > 2) && (buffer[buffer_size - 2] == '\r'))
- {
- buffer_size--;
- buffer[buffer_size - 1] = '\n';
- }
+ buffer_size = (size_t) status;
+ assert (buffer_size <= sizeof (buffer));
+ assert (buffer[buffer_size - 1] == 0);
/* Place the normal field separator at the end to simplify
* `buffer_get_field's work. */
return (-1);
}
- if (strcmp (command, "update") == 0)
+ if (strcasecmp (command, "update") == 0)
{
return (handle_request_update (fd, buffer_ptr, buffer_size));
}
- else if (strcmp (command, "flush") == 0)
+ else if (strcasecmp (command, "flush") == 0)
{
return (handle_request_flush (fd, buffer_ptr, buffer_size));
}
+ else if (strcasecmp (command, "stats") == 0)
+ {
+ return (handle_request_stats (fd, buffer_ptr, buffer_size));
+ }
+ else if (strcasecmp (command, "help") == 0)
+ {
+ return (handle_request_help (fd, buffer_ptr, buffer_size));
+ }
else
{
- RRDD_LOG (LOG_INFO, "handle_request: unknown command: %s.", buffer);
- return (-1);
+ char result[4096];
+
+ snprintf (result, sizeof (result), "-1 Unknown command: %s\n", command);
+ result[sizeof (result) - 1] = 0;
+
+ status = swrite (fd, result, strlen (result));
+ if (status < 0)
+ {
+ RRDD_LOG (LOG_ERR, "handle_request: swrite failed.");
+ return (-1);
+ }
}
+
+ return (0);
} /* }}} int handle_request */
static void *connection_thread_main (void *args /* {{{ */
memset (&sa_term, 0, sizeof (sa_term));
sa_term.sa_handler = sig_term_handler;
- sigaction (SIGINT, &sa_term, NULL);
+ sigaction (SIGTERM, &sa_term, NULL);
memset (&sa_pipe, 0, sizeof (sa_pipe));
sa_pipe.sa_handler = SIG_IGN;