X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Frrd_daemon.c;h=0fd0fabc82ec596f6ccdefbc2cb375858b7e8969;hb=11025bcccff1b3cfe42e06cbcec425ef796ec35d;hp=3163e1865629abe2879f0bcaeb5dd583f202c66f;hpb=28e399e75eb5a26a5ab1b65b7b18b4cfd69e263d;p=rrdtool.git diff --git a/src/rrd_daemon.c b/src/rrd_daemon.c index 3163e18..0fd0fab 100644 --- a/src/rrd_daemon.c +++ b/src/rrd_daemon.c @@ -17,31 +17,9 @@ * * Authors: * Florian octo Forster + * kevin brintnall **/ -/* - * First tell the compiler to stick to the C99 and POSIX standards as close as - * possible. - */ -#ifndef __STRICT_ANSI__ /* {{{ */ -# define __STRICT_ANSI__ -#endif - -#ifndef _ISOC99_SOURCE -# define _ISOC99_SOURCE -#endif - -#ifdef _POSIX_C_SOURCE -# undef _POSIX_C_SOURCE -#endif -#define _POSIX_C_SOURCE 200112L - -/* Single UNIX needed for strdup. */ -#ifdef _XOPEN_SOURCE -# undef _XOPEN_SOURCE -#endif -#define _XOPEN_SOURCE 500 - #ifndef _REENTRANT # define _REENTRANT #endif @@ -50,9 +28,6 @@ # define _THREAD_SAFE #endif -#ifdef _GNU_SOURCE -# undef _GNU_SOURCE -#endif /* }}} */ /* @@ -66,6 +41,9 @@ #include #include #include +#include +#include +#include #include #include @@ -109,8 +87,8 @@ struct cache_item_s char **values; int values_num; time_t last_flush_time; -#define CI_FLAGS_IN_TREE 0x01 -#define CI_FLAGS_IN_QUEUE 0x02 +#define CI_FLAGS_IN_TREE (1<<0) +#define CI_FLAGS_IN_QUEUE (1<<1) int flags; cache_item_t *next; @@ -119,6 +97,7 @@ struct cache_item_s struct callback_flush_data_s { time_t now; + time_t abs_timeout; char **keys; size_t keys_num; }; @@ -131,9 +110,14 @@ enum queue_side_e }; typedef enum queue_side_e queue_side_t; +/* max length of socket command or response */ +#define CMD_MAX 4096 + /* * Variables */ +static int stay_foreground = 0; + static listen_socket_t *listen_fds = NULL; static size_t listen_fds_num = 0; @@ -141,9 +125,9 @@ static int do_shutdown = 0; static pthread_t queue_thread; -static pthread_t *connetion_threads = NULL; -static pthread_mutex_t connetion_threads_lock = PTHREAD_MUTEX_INITIALIZER; -static int connetion_threads_num = 0; +static pthread_t *connection_threads = NULL; +static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER; +static int connection_threads_num = 0; /* Cache stuff */ static GTree *cache_tree = NULL; @@ -155,6 +139,7 @@ static pthread_cond_t cache_cond = PTHREAD_COND_INITIALIZER; static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER; static int config_write_interval = 300; +static int config_write_jitter = 0; static int config_flush_interval = 3600; static char *config_pid_file = NULL; static char *config_base_dir = NULL; @@ -162,23 +147,46 @@ static char *config_base_dir = NULL; static char **config_listen_address_list = NULL; static int config_listen_address_list_len = 0; +static uint64_t stats_queue_length = 0; +static uint64_t stats_updates_received = 0; +static uint64_t stats_flush_received = 0; +static uint64_t stats_updates_written = 0; +static uint64_t stats_data_sets_written = 0; +static uint64_t stats_journal_bytes = 0; +static uint64_t stats_journal_rotate = 0; +static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER; + +/* Journaled updates */ +static char *journal_cur = NULL; +static char *journal_old = NULL; +static FILE *journal_fh = NULL; +static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER; +static int journal_write(char *cmd, char *args); +static void journal_done(void); +static void journal_rotate(void); + /* * Functions */ static void sig_int_handler (int s __attribute__((unused))) /* {{{ */ { + RRDD_LOG(LOG_NOTICE, "caught SIGINT"); do_shutdown++; + pthread_cond_broadcast(&cache_cond); } /* }}} void sig_int_handler */ static void sig_term_handler (int s __attribute__((unused))) /* {{{ */ { + RRDD_LOG(LOG_NOTICE, "caught SIGTERM"); do_shutdown++; + pthread_cond_broadcast(&cache_cond); } /* }}} void sig_term_handler */ static int write_pidfile (void) /* {{{ */ { pid_t pid; char *file; + int fd; FILE *fh; pid = getpid (); @@ -187,10 +195,19 @@ static int write_pidfile (void) /* {{{ */ ? config_pid_file : LOCALSTATEDIR "/run/rrdcached.pid"; - fh = fopen (file, "w"); + fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH); + if (fd < 0) + { + RRDD_LOG(LOG_ERR, "FATAL: cannot create '%s' (%s)", + file, rrd_strerror(errno)); + return (-1); + } + + fh = fdopen (fd, "w"); if (fh == NULL) { RRDD_LOG (LOG_ERR, "write_pidfile: Opening `%s' failed.", file); + close(fd); return (-1); } @@ -215,6 +232,99 @@ static int remove_pidfile (void) /* {{{ */ return (errno); } /* }}} int remove_pidfile */ +static ssize_t sread (int fd, void *buffer_void, size_t buffer_size) /* {{{ */ +{ + char *buffer; + size_t buffer_used; + size_t buffer_free; + ssize_t status; + + buffer = (char *) buffer_void; + buffer_used = 0; + buffer_free = buffer_size; + + while (buffer_free > 0) + { + status = read (fd, buffer + buffer_used, buffer_free); + if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR))) + continue; + + if (status < 0) + return (-1); + + if (status == 0) + return (0); + + assert ((0 > status) || (buffer_free >= (size_t) status)); + + buffer_free = buffer_free - status; + buffer_used = buffer_used + status; + + if (buffer[buffer_used - 1] == '\n') + break; + } + + assert (buffer_used > 0); + + if (buffer[buffer_used - 1] != '\n') + { + errno = ENOBUFS; + return (-1); + } + + buffer[buffer_used - 1] = 0; + + /* Fix network line endings. */ + if ((buffer_used > 1) && (buffer[buffer_used - 2] == '\r')) + { + buffer_used--; + buffer[buffer_used - 1] = 0; + } + + return (buffer_used); +} /* }}} ssize_t sread */ + +static ssize_t swrite (int fd, const void *buf, size_t count) /* {{{ */ +{ + const char *ptr; + size_t nleft; + ssize_t status; + + /* special case for journal replay */ + if (fd < 0) return 0; + + ptr = (const char *) buf; + nleft = count; + + while (nleft > 0) + { + status = write (fd, (const void *) ptr, nleft); + + if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR))) + continue; + + if (status < 0) + return (status); + + nleft -= status; + ptr += status; + } + + return (0); +} /* }}} ssize_t swrite */ + +static void _wipe_ci_values(cache_item_t *ci, time_t when) +{ + ci->values = NULL; + ci->values_num = 0; + + ci->last_flush_time = when; + if (config_write_jitter > 0) + ci->last_flush_time += (random() % config_write_jitter); + + ci->flags &= ~(CI_FLAGS_IN_QUEUE); +} + /* * enqueue_cache_item: * `cache_lock' must be acquired before calling this function! @@ -222,8 +332,7 @@ static int remove_pidfile (void) /* {{{ */ static int enqueue_cache_item (cache_item_t *ci, /* {{{ */ queue_side_t side) { - RRDD_LOG (LOG_DEBUG, "enqueue_cache_item: Adding %s to the update queue.", - ci->file); + int did_insert = 0; if (ci == NULL) return (-1); @@ -241,6 +350,8 @@ static int enqueue_cache_item (cache_item_t *ci, /* {{{ */ if (cache_queue_tail == NULL) cache_queue_tail = cache_queue_head; + + did_insert = 1; } else if (cache_queue_head == ci) { @@ -279,10 +390,19 @@ static int enqueue_cache_item (cache_item_t *ci, /* {{{ */ else cache_queue_tail->next = ci; cache_queue_tail = ci; + + did_insert = 1; } ci->flags |= CI_FLAGS_IN_QUEUE; + if (did_insert) + { + pthread_mutex_lock (&stats_lock); + stats_queue_length++; + pthread_mutex_unlock (&stats_lock); + } + return (0); } /* }}} int enqueue_cache_item */ @@ -300,7 +420,13 @@ static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */ ci = (cache_item_t *) value; cfd = (callback_flush_data_t *) data; - if (((cfd->now - ci->last_flush_time) >= config_write_interval) + if ((ci->last_flush_time <= cfd->abs_timeout) + && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) + && (ci->values_num > 0)) + { + enqueue_cache_item (ci, TAIL); + } + else if ((do_shutdown != 0) && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) && (ci->values_num > 0)) { @@ -329,6 +455,61 @@ static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */ return (FALSE); } /* }}} gboolean tree_callback_flush */ +static int flush_old_values (int max_age) +{ + callback_flush_data_t cfd; + size_t k; + + memset (&cfd, 0, sizeof (cfd)); + /* Pass the current time as user data so that we don't need to call + * `time' for each node. */ + cfd.now = time (NULL); + cfd.keys = NULL; + cfd.keys_num = 0; + + if (max_age > 0) + cfd.abs_timeout = cfd.now - max_age; + else + cfd.abs_timeout = cfd.now + 1; + + /* `tree_callback_flush' will return the keys of all values that haven't + * been touched in the last `config_flush_interval' seconds in `cfd'. + * The char*'s in this array point to the same memory as ci->file, so we + * don't need to free them separately. */ + g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd); + + for (k = 0; k < cfd.keys_num; k++) + { + cache_item_t *ci; + + /* This must not fail. */ + ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]); + assert (ci != NULL); + + /* If we end up here with values available, something's seriously + * messed up. */ + assert (ci->values_num == 0); + + /* Remove the node from the tree */ + g_tree_remove (cache_tree, cfd.keys[k]); + cfd.keys[k] = NULL; + + /* Now free and clean up `ci'. */ + free (ci->file); + ci->file = NULL; + free (ci); + ci = NULL; + } /* for (k = 0; k < cfd.keys_num; k++) */ + + if (cfd.keys != NULL) + { + free (cfd.keys); + cfd.keys = NULL; + } + + return (0); +} /* int flush_old_values */ + static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ { struct timeval now; @@ -354,52 +535,19 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ || ((now.tv_sec == next_flush.tv_sec) && ((1000 * now.tv_usec) > next_flush.tv_nsec))) { - callback_flush_data_t cfd; - size_t k; - - memset (&cfd, 0, sizeof (cfd)); - /* Pass the current time as user data so that we don't need to call - * `time' for each node. */ - cfd.now = time (NULL); - cfd.keys = NULL; - cfd.keys_num = 0; - - /* `tree_callback_flush' will return the keys of all values that haven't - * been touched in the last `config_flush_interval' seconds in `cfd'. - * The char*'s in this array point to the same memory as ci->file, so we - * don't need to free them separately. */ - g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd); - - for (k = 0; k < cfd.keys_num; k++) - { - /* This must not fail. */ - ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]); - assert (ci != NULL); - - /* If we end up here with values available, something's seriously - * messed up. */ - assert (ci->values_num == 0); - - /* Remove the node from the tree */ - g_tree_remove (cache_tree, cfd.keys[k]); - cfd.keys[k] = NULL; - - /* Now free and clean up `ci'. */ - free (ci->file); - ci->file = NULL; - free (ci); - ci = NULL; - } /* for (k = 0; k < cfd.keys_num; k++) */ - - if (cfd.keys != NULL) - { - free (cfd.keys); - cfd.keys = NULL; - } + /* Flush all values that haven't been written in the last + * `config_write_interval' seconds. */ + flush_old_values (config_write_interval); /* Determine the time of the next cache flush. */ - while (next_flush.tv_sec < now.tv_sec) + while (next_flush.tv_sec <= now.tv_sec) next_flush.tv_sec += config_flush_interval; + + /* unlock the cache while we rotate so we don't block incoming + * updates if the fsync() blocks on disk I/O */ + pthread_mutex_unlock(&cache_lock); + journal_rotate(); + pthread_mutex_lock(&cache_lock); } /* Now, check if there's something to store away. If not, wait until @@ -414,6 +562,10 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ } } + /* We're about to shut down, so lets flush the entire tree. */ + if ((do_shutdown != 0) && (cache_queue_head == NULL)) + flush_old_values (/* max age = */ -1); + /* Check if a value has arrived. This may be NULL if we timed out or there * was an interrupt such as a signal. */ if (cache_queue_head == NULL) @@ -429,43 +581,63 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ continue; } + assert(ci->values != NULL); + assert(ci->values_num > 0); + values = ci->values; values_num = ci->values_num; - ci->values = NULL; - ci->values_num = 0; - - ci->last_flush_time = time (NULL); - ci->flags &= ~(CI_FLAGS_IN_QUEUE); + _wipe_ci_values(ci, time(NULL)); cache_queue_head = ci->next; if (cache_queue_head == NULL) cache_queue_tail = NULL; ci->next = NULL; - pthread_mutex_unlock (&cache_lock); + pthread_mutex_lock (&stats_lock); + assert (stats_queue_length > 0); + stats_queue_length--; + pthread_mutex_unlock (&stats_lock); - RRDD_LOG (LOG_DEBUG, "queue_thread_main: rrd_update (%s, %i, %p)", - file, values_num, (void *) values); + pthread_mutex_unlock (&cache_lock); + rrd_clear_error (); status = rrd_update_r (file, NULL, values_num, (void *) values); if (status != 0) { - RRDD_LOG (LOG_ERR, "queue_thread_main: " - "rrd_update_r failed with status %i.", - status); + RRDD_LOG (LOG_NOTICE, "queue_thread_main: " + "rrd_update_r (%s) failed with status %i. (%s)", + file, status, rrd_get_error()); } - free (file); + journal_write("wrote", file); + for (i = 0; i < values_num; i++) free (values[i]); + free(values); + free(file); + + if (status == 0) + { + pthread_mutex_lock (&stats_lock); + stats_updates_written++; + stats_data_sets_written += values_num; + pthread_mutex_unlock (&stats_lock); + } + pthread_mutex_lock (&cache_lock); pthread_cond_broadcast (&flush_cond); - } /* while (do_shutdown == 0) */ + + /* We're about to shut down, so lets flush the entire tree. */ + if ((do_shutdown != 0) && (cache_queue_head == NULL)) + flush_old_values (/* max age = */ -1); + } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */ pthread_mutex_unlock (&cache_lock); - RRDD_LOG (LOG_DEBUG, "queue_thread_main: Exiting."); + assert(cache_queue_head == NULL); + RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed"); + journal_done(); return (NULL); } /* }}} void *queue_thread_main */ @@ -486,14 +658,17 @@ static int buffer_get_field (char **buffer_ret, /* {{{ */ field = *buffer_ret; field_size = 0; + if (buffer_size <= 0) + return (-1); + /* This is ensured by `handle_request'. */ - assert (buffer[buffer_size - 1] == ' '); + assert (buffer[buffer_size - 1] == '\0'); status = -1; while (buffer_pos < buffer_size) { /* Check for end-of-field or end-of-buffer */ - if (buffer[buffer_pos] == ' ') + if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0') { field[field_size] = 0; field_size++; @@ -567,36 +742,234 @@ static int flush_file (const char *filename) /* {{{ */ return (0); } /* }}} int flush_file */ +static int handle_request_help (int fd, /* {{{ */ + char *buffer, size_t buffer_size) +{ + int status; + char **help_text; + size_t help_text_len; + char *command; + size_t i; + + char *help_help[] = + { + "4 Command overview\n", + "FLUSH \n", + "HELP []\n", + "UPDATE [ ...]\n", + "STATS\n" + }; + size_t help_help_len = sizeof (help_help) / sizeof (help_help[0]); + + char *help_flush[] = + { + "4 Help for FLUSH\n", + "Usage: FLUSH \n", + "\n", + "Adds the given filename to the head of the update queue and returns\n", + "after is has been dequeued.\n" + }; + size_t help_flush_len = sizeof (help_flush) / sizeof (help_flush[0]); + + char *help_update[] = + { + "9 Help for UPDATE\n", + "Usage: UPDATE [ ...]\n" + "\n", + "Adds the given file to the internal cache if it is not yet known and\n", + "appends the given value(s) to the entry. See the rrdcached(1) manpage\n", + "for details.\n", + "\n", + "Each has the following form:\n", + " =