X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Frrd_daemon.c;h=0fd0fabc82ec596f6ccdefbc2cb375858b7e8969;hb=11025bcccff1b3cfe42e06cbcec425ef796ec35d;hp=eb94e89544adfcf533b1bc79b12c637f173900f5;hpb=05533cafa4222f3962a9ded2ec88f3469a1139a2;p=rrdtool.git diff --git a/src/rrd_daemon.c b/src/rrd_daemon.c index eb94e89..0fd0fab 100644 --- a/src/rrd_daemon.c +++ b/src/rrd_daemon.c @@ -17,31 +17,9 @@ * * Authors: * Florian octo Forster + * kevin brintnall **/ -/* - * First tell the compiler to stick to the C99 and POSIX standards as close as - * possible. - */ -#ifndef __STRICT_ANSI__ /* {{{ */ -# define __STRICT_ANSI__ -#endif - -#ifndef _ISOC99_SOURCE -# define _ISOC99_SOURCE -#endif - -#ifdef _POSIX_C_SOURCE -# undef _POSIX_C_SOURCE -#endif -#define _POSIX_C_SOURCE 200112L - -/* Single UNIX needed for strdup. */ -#ifdef _XOPEN_SOURCE -# undef _XOPEN_SOURCE -#endif -#define _XOPEN_SOURCE 500 - #ifndef _REENTRANT # define _REENTRANT #endif @@ -50,9 +28,6 @@ # define _THREAD_SAFE #endif -#ifdef _GNU_SOURCE -# undef _GNU_SOURCE -#endif /* }}} */ /* @@ -112,8 +87,8 @@ struct cache_item_s char **values; int values_num; time_t last_flush_time; -#define CI_FLAGS_IN_TREE 0x01 -#define CI_FLAGS_IN_QUEUE 0x02 +#define CI_FLAGS_IN_TREE (1<<0) +#define CI_FLAGS_IN_QUEUE (1<<1) int flags; cache_item_t *next; @@ -122,6 +97,7 @@ struct cache_item_s struct callback_flush_data_s { time_t now; + time_t abs_timeout; char **keys; size_t keys_num; }; @@ -134,9 +110,14 @@ enum queue_side_e }; typedef enum queue_side_e queue_side_t; +/* max length of socket command or response */ +#define CMD_MAX 4096 + /* * Variables */ +static int stay_foreground = 0; + static listen_socket_t *listen_fds = NULL; static size_t listen_fds_num = 0; @@ -144,9 +125,9 @@ static int do_shutdown = 0; static pthread_t queue_thread; -static pthread_t *connetion_threads = NULL; -static pthread_mutex_t connetion_threads_lock = PTHREAD_MUTEX_INITIALIZER; -static int connetion_threads_num = 0; +static pthread_t *connection_threads = NULL; +static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER; +static int connection_threads_num = 0; /* Cache stuff */ static GTree *cache_tree = NULL; @@ -158,6 +139,7 @@ static pthread_cond_t cache_cond = PTHREAD_COND_INITIALIZER; static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER; static int config_write_interval = 300; +static int config_write_jitter = 0; static int config_flush_interval = 3600; static char *config_pid_file = NULL; static char *config_base_dir = NULL; @@ -166,27 +148,45 @@ static char **config_listen_address_list = NULL; static int config_listen_address_list_len = 0; static uint64_t stats_queue_length = 0; +static uint64_t stats_updates_received = 0; +static uint64_t stats_flush_received = 0; static uint64_t stats_updates_written = 0; static uint64_t stats_data_sets_written = 0; +static uint64_t stats_journal_bytes = 0; +static uint64_t stats_journal_rotate = 0; static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER; +/* Journaled updates */ +static char *journal_cur = NULL; +static char *journal_old = NULL; +static FILE *journal_fh = NULL; +static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER; +static int journal_write(char *cmd, char *args); +static void journal_done(void); +static void journal_rotate(void); + /* * Functions */ static void sig_int_handler (int s __attribute__((unused))) /* {{{ */ { + RRDD_LOG(LOG_NOTICE, "caught SIGINT"); do_shutdown++; + pthread_cond_broadcast(&cache_cond); } /* }}} void sig_int_handler */ static void sig_term_handler (int s __attribute__((unused))) /* {{{ */ { + RRDD_LOG(LOG_NOTICE, "caught SIGTERM"); do_shutdown++; + pthread_cond_broadcast(&cache_cond); } /* }}} void sig_term_handler */ static int write_pidfile (void) /* {{{ */ { pid_t pid; char *file; + int fd; FILE *fh; pid = getpid (); @@ -195,10 +195,19 @@ static int write_pidfile (void) /* {{{ */ ? config_pid_file : LOCALSTATEDIR "/run/rrdcached.pid"; - fh = fopen (file, "w"); + fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH); + if (fd < 0) + { + RRDD_LOG(LOG_ERR, "FATAL: cannot create '%s' (%s)", + file, rrd_strerror(errno)); + return (-1); + } + + fh = fdopen (fd, "w"); if (fh == NULL) { RRDD_LOG (LOG_ERR, "write_pidfile: Opening `%s' failed.", file); + close(fd); return (-1); } @@ -281,6 +290,9 @@ static ssize_t swrite (int fd, const void *buf, size_t count) /* {{{ */ size_t nleft; ssize_t status; + /* special case for journal replay */ + if (fd < 0) return 0; + ptr = (const char *) buf; nleft = count; @@ -294,13 +306,25 @@ static ssize_t swrite (int fd, const void *buf, size_t count) /* {{{ */ if (status < 0) return (status); - nleft = nleft - status; - ptr = ptr + status; + nleft -= status; + ptr += status; } return (0); } /* }}} ssize_t swrite */ +static void _wipe_ci_values(cache_item_t *ci, time_t when) +{ + ci->values = NULL; + ci->values_num = 0; + + ci->last_flush_time = when; + if (config_write_jitter > 0) + ci->last_flush_time += (random() % config_write_jitter); + + ci->flags &= ~(CI_FLAGS_IN_QUEUE); +} + /* * enqueue_cache_item: * `cache_lock' must be acquired before calling this function! @@ -396,7 +420,13 @@ static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */ ci = (cache_item_t *) value; cfd = (callback_flush_data_t *) data; - if (((cfd->now - ci->last_flush_time) >= config_write_interval) + if ((ci->last_flush_time <= cfd->abs_timeout) + && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) + && (ci->values_num > 0)) + { + enqueue_cache_item (ci, TAIL); + } + else if ((do_shutdown != 0) && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) && (ci->values_num > 0)) { @@ -425,6 +455,61 @@ static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */ return (FALSE); } /* }}} gboolean tree_callback_flush */ +static int flush_old_values (int max_age) +{ + callback_flush_data_t cfd; + size_t k; + + memset (&cfd, 0, sizeof (cfd)); + /* Pass the current time as user data so that we don't need to call + * `time' for each node. */ + cfd.now = time (NULL); + cfd.keys = NULL; + cfd.keys_num = 0; + + if (max_age > 0) + cfd.abs_timeout = cfd.now - max_age; + else + cfd.abs_timeout = cfd.now + 1; + + /* `tree_callback_flush' will return the keys of all values that haven't + * been touched in the last `config_flush_interval' seconds in `cfd'. + * The char*'s in this array point to the same memory as ci->file, so we + * don't need to free them separately. */ + g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd); + + for (k = 0; k < cfd.keys_num; k++) + { + cache_item_t *ci; + + /* This must not fail. */ + ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]); + assert (ci != NULL); + + /* If we end up here with values available, something's seriously + * messed up. */ + assert (ci->values_num == 0); + + /* Remove the node from the tree */ + g_tree_remove (cache_tree, cfd.keys[k]); + cfd.keys[k] = NULL; + + /* Now free and clean up `ci'. */ + free (ci->file); + ci->file = NULL; + free (ci); + ci = NULL; + } /* for (k = 0; k < cfd.keys_num; k++) */ + + if (cfd.keys != NULL) + { + free (cfd.keys); + cfd.keys = NULL; + } + + return (0); +} /* int flush_old_values */ + static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ { struct timeval now; @@ -450,52 +535,19 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ || ((now.tv_sec == next_flush.tv_sec) && ((1000 * now.tv_usec) > next_flush.tv_nsec))) { - callback_flush_data_t cfd; - size_t k; - - memset (&cfd, 0, sizeof (cfd)); - /* Pass the current time as user data so that we don't need to call - * `time' for each node. */ - cfd.now = time (NULL); - cfd.keys = NULL; - cfd.keys_num = 0; - - /* `tree_callback_flush' will return the keys of all values that haven't - * been touched in the last `config_flush_interval' seconds in `cfd'. - * The char*'s in this array point to the same memory as ci->file, so we - * don't need to free them separately. */ - g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd); - - for (k = 0; k < cfd.keys_num; k++) - { - /* This must not fail. */ - ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]); - assert (ci != NULL); - - /* If we end up here with values available, something's seriously - * messed up. */ - assert (ci->values_num == 0); - - /* Remove the node from the tree */ - g_tree_remove (cache_tree, cfd.keys[k]); - cfd.keys[k] = NULL; - - /* Now free and clean up `ci'. */ - free (ci->file); - ci->file = NULL; - free (ci); - ci = NULL; - } /* for (k = 0; k < cfd.keys_num; k++) */ - - if (cfd.keys != NULL) - { - free (cfd.keys); - cfd.keys = NULL; - } + /* Flush all values that haven't been written in the last + * `config_write_interval' seconds. */ + flush_old_values (config_write_interval); /* Determine the time of the next cache flush. */ - while (next_flush.tv_sec < now.tv_sec) + while (next_flush.tv_sec <= now.tv_sec) next_flush.tv_sec += config_flush_interval; + + /* unlock the cache while we rotate so we don't block incoming + * updates if the fsync() blocks on disk I/O */ + pthread_mutex_unlock(&cache_lock); + journal_rotate(); + pthread_mutex_lock(&cache_lock); } /* Now, check if there's something to store away. If not, wait until @@ -510,6 +562,10 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ } } + /* We're about to shut down, so lets flush the entire tree. */ + if ((do_shutdown != 0) && (cache_queue_head == NULL)) + flush_old_values (/* max age = */ -1); + /* Check if a value has arrived. This may be NULL if we timed out or there * was an interrupt such as a signal. */ if (cache_queue_head == NULL) @@ -525,14 +581,13 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ continue; } + assert(ci->values != NULL); + assert(ci->values_num > 0); + values = ci->values; values_num = ci->values_num; - ci->values = NULL; - ci->values_num = 0; - - ci->last_flush_time = time (NULL); - ci->flags &= ~(CI_FLAGS_IN_QUEUE); + _wipe_ci_values(ci, time(NULL)); cache_queue_head = ci->next; if (cache_queue_head == NULL) @@ -546,18 +601,23 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ pthread_mutex_unlock (&cache_lock); + rrd_clear_error (); status = rrd_update_r (file, NULL, values_num, (void *) values); if (status != 0) { - RRDD_LOG (LOG_ERR, "queue_thread_main: " - "rrd_update_r failed with status %i.", - status); + RRDD_LOG (LOG_NOTICE, "queue_thread_main: " + "rrd_update_r (%s) failed with status %i. (%s)", + file, status, rrd_get_error()); } - free (file); + journal_write("wrote", file); + for (i = 0; i < values_num; i++) free (values[i]); + free(values); + free(file); + if (status == 0) { pthread_mutex_lock (&stats_lock); @@ -568,9 +628,17 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ pthread_mutex_lock (&cache_lock); pthread_cond_broadcast (&flush_cond); - } /* while (do_shutdown == 0) */ + + /* We're about to shut down, so lets flush the entire tree. */ + if ((do_shutdown != 0) && (cache_queue_head == NULL)) + flush_old_values (/* max age = */ -1); + } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */ pthread_mutex_unlock (&cache_lock); + assert(cache_queue_head == NULL); + RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed"); + journal_done(); + return (NULL); } /* }}} void *queue_thread_main */ @@ -594,13 +662,13 @@ static int buffer_get_field (char **buffer_ret, /* {{{ */ return (-1); /* This is ensured by `handle_request'. */ - assert (buffer[buffer_size - 1] == ' '); + assert (buffer[buffer_size - 1] == '\0'); status = -1; while (buffer_pos < buffer_size) { /* Check for end-of-field or end-of-buffer */ - if (buffer[buffer_pos] == ' ') + if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0') { field[field_size] = 0; field_size++; @@ -777,19 +845,27 @@ static int handle_request_stats (int fd, /* {{{ */ size_t buffer_size __attribute__((unused))) { int status; - char outbuf[4096]; + char outbuf[CMD_MAX]; uint64_t copy_queue_length; + uint64_t copy_updates_received; + uint64_t copy_flush_received; uint64_t copy_updates_written; uint64_t copy_data_sets_written; + uint64_t copy_journal_bytes; + uint64_t copy_journal_rotate; uint64_t tree_nodes_number; uint64_t tree_depth; pthread_mutex_lock (&stats_lock); copy_queue_length = stats_queue_length; + copy_updates_received = stats_updates_received; + copy_flush_received = stats_flush_received; copy_updates_written = stats_updates_written; copy_data_sets_written = stats_data_sets_written; + copy_journal_bytes = stats_journal_bytes; + copy_journal_rotate = stats_journal_rotate; pthread_mutex_unlock (&stats_lock); pthread_mutex_lock (&cache_lock); @@ -807,7 +883,7 @@ static int handle_request_stats (int fd, /* {{{ */ return (status); \ } - strncpy (outbuf, "5 Statistics follow\n", sizeof (outbuf)); + strncpy (outbuf, "9 Statistics follow\n", sizeof (outbuf)); RRDD_STATS_SEND; snprintf (outbuf, sizeof (outbuf), @@ -815,6 +891,14 @@ static int handle_request_stats (int fd, /* {{{ */ RRDD_STATS_SEND; snprintf (outbuf, sizeof (outbuf), + "UpdatesReceived: %"PRIu64"\n", copy_updates_received); + RRDD_STATS_SEND; + + snprintf (outbuf, sizeof (outbuf), + "FlushesReceived: %"PRIu64"\n", copy_flush_received); + RRDD_STATS_SEND; + + snprintf (outbuf, sizeof (outbuf), "UpdatesWritten: %"PRIu64"\n", copy_updates_written); RRDD_STATS_SEND; @@ -830,6 +914,14 @@ static int handle_request_stats (int fd, /* {{{ */ "TreeDepth: %"PRIu64"\n", tree_depth); RRDD_STATS_SEND; + snprintf (outbuf, sizeof(outbuf), + "JournalBytes: %"PRIu64"\n", copy_journal_bytes); + RRDD_STATS_SEND; + + snprintf (outbuf, sizeof(outbuf), + "JournalRotate: %"PRIu64"\n", copy_journal_rotate); + RRDD_STATS_SEND; + return (0); #undef RRDD_STATS_SEND } /* }}} int handle_request_stats */ @@ -839,7 +931,7 @@ static int handle_request_flush (int fd, /* {{{ */ { char *file; int status; - char result[4096]; + char result[CMD_MAX]; status = buffer_get_field (&buffer, &buffer_size, &file); if (status != 0) @@ -848,11 +940,24 @@ static int handle_request_flush (int fd, /* {{{ */ } else { + pthread_mutex_lock(&stats_lock); + stats_flush_received++; + pthread_mutex_unlock(&stats_lock); + status = flush_file (file); if (status == 0) snprintf (result, sizeof (result), "0 Successfully flushed %s.\n", file); else if (status == ENOENT) - snprintf (result, sizeof (result), "-1 No such file: %s.\n", file); + { + /* no file in our tree; see whether it exists at all */ + struct stat statbuf; + + memset(&statbuf, 0, sizeof(statbuf)); + if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode)) + snprintf (result, sizeof (result), "0 Nothing to flush: %s.\n", file); + else + snprintf (result, sizeof (result), "-1 No such file: %s.\n", file); + } else if (status < 0) strncpy (result, "-1 Internal error.\n", sizeof (result)); else @@ -881,7 +986,7 @@ static int handle_request_update (int fd, /* {{{ */ time_t now; cache_item_t *ci; - char answer[4096]; + char answer[CMD_MAX]; #define RRDD_UPDATE_SEND \ answer[sizeof (answer) - 1] = 0; \ @@ -904,11 +1009,51 @@ static int handle_request_update (int fd, /* {{{ */ return (0); } + pthread_mutex_lock(&stats_lock); + stats_updates_received++; + pthread_mutex_unlock(&stats_lock); + pthread_mutex_lock (&cache_lock); ci = g_tree_lookup (cache_tree, file); if (ci == NULL) /* {{{ */ { + struct stat statbuf; + + memset (&statbuf, 0, sizeof (statbuf)); + status = stat (file, &statbuf); + if (status != 0) + { + pthread_mutex_unlock (&cache_lock); + RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file); + + status = errno; + if (status == ENOENT) + snprintf (answer, sizeof (answer), "-1 No such file: %s\n", file); + else + snprintf (answer, sizeof (answer), "-1 stat failed with error %i.\n", + status); + RRDD_UPDATE_SEND; + return (0); + } + if (!S_ISREG (statbuf.st_mode)) + { + pthread_mutex_unlock (&cache_lock); + + snprintf (answer, sizeof (answer), "-1 Not a regular file: %s\n", file); + RRDD_UPDATE_SEND; + return (0); + } + if (access(file, R_OK|W_OK) != 0) + { + pthread_mutex_unlock (&cache_lock); + + snprintf (answer, sizeof (answer), "-1 Cannot read/write %s: %s\n", + file, rrd_strerror(errno)); + RRDD_UPDATE_SEND; + return (0); + } + ci = (cache_item_t *) malloc (sizeof (cache_item_t)); if (ci == NULL) { @@ -933,9 +1078,7 @@ static int handle_request_update (int fd, /* {{{ */ return (0); } - ci->values = NULL; - ci->values_num = 0; - ci->last_flush_time = now; + _wipe_ci_values(ci, now); ci->flags = CI_FLAGS_IN_TREE; g_tree_insert (cache_tree, (void *) ci->file, (void *) ci); @@ -998,31 +1141,48 @@ static int handle_request_update (int fd, /* {{{ */ #undef RRDD_UPDATE_SEND } /* }}} int handle_request_update */ -static int handle_request (int fd) /* {{{ */ +/* we came across a "WROTE" entry during journal replay. + * throw away any values that we have accumulated for this file + */ +static int handle_request_wrote (int fd __attribute__((unused)), /* {{{ */ + const char *buffer, + size_t buffer_size __attribute__((unused))) { - char buffer[4096]; - size_t buffer_size; - char *buffer_ptr; - char *command; - int status; + int i; + cache_item_t *ci; + const char *file = buffer; - status = (int) sread (fd, buffer, sizeof (buffer)); - if (status == 0) + pthread_mutex_lock(&cache_lock); + + ci = g_tree_lookup(cache_tree, file); + if (ci == NULL) { - return (1); + pthread_mutex_unlock(&cache_lock); + return (0); } - else if (status < 0) + + if (ci->values) { - RRDD_LOG (LOG_ERR, "handle_request: sread failed."); - return (-1); + for (i=0; i < ci->values_num; i++) + free(ci->values[i]); + + free(ci->values); } - buffer_size = (size_t) status; - assert (buffer_size <= sizeof (buffer)); - assert (buffer[buffer_size - 1] == 0); - /* Place the normal field separator at the end to simplify - * `buffer_get_field's work. */ - buffer[buffer_size - 1] = ' '; + _wipe_ci_values(ci, time(NULL)); + + pthread_mutex_unlock(&cache_lock); + return (0); +} /* }}} int handle_request_wrote */ + +/* if fd < 0, we are in journal replay mode */ +static int handle_request (int fd, char *buffer, size_t buffer_size) /* {{{ */ +{ + char *buffer_ptr; + char *command; + int status; + + assert (buffer[buffer_size - 1] == '\0'); buffer_ptr = buffer; command = NULL; @@ -1035,8 +1195,17 @@ static int handle_request (int fd) /* {{{ */ if (strcasecmp (command, "update") == 0) { + /* don't re-write updates in replay mode */ + if (fd >= 0) + journal_write(command, buffer_ptr); + return (handle_request_update (fd, buffer_ptr, buffer_size)); } + else if (strcasecmp (command, "wrote") == 0 && fd < 0) + { + /* this is only valid in replay mode */ + return (handle_request_wrote (fd, buffer_ptr, buffer_size)); + } else if (strcasecmp (command, "flush") == 0) { return (handle_request_flush (fd, buffer_ptr, buffer_size)); @@ -1051,7 +1220,7 @@ static int handle_request (int fd) /* {{{ */ } else { - char result[4096]; + char result[CMD_MAX]; snprintf (result, sizeof (result), "-1 Unknown command: %s\n", command); result[sizeof (result) - 1] = 0; @@ -1067,36 +1236,174 @@ static int handle_request (int fd) /* {{{ */ return (0); } /* }}} int handle_request */ -static void *connection_thread_main (void *args /* {{{ */ - __attribute__((unused))) +/* MUST NOT hold journal_lock before calling this */ +static void journal_rotate(void) /* {{{ */ +{ + FILE *old_fh = NULL; + + if (journal_cur == NULL || journal_old == NULL) + return; + + pthread_mutex_lock(&journal_lock); + + /* we rotate this way (rename before close) so that the we can release + * the journal lock as fast as possible. Journal writes to the new + * journal can proceed immediately after the new file is opened. The + * fclose can then block without affecting new updates. + */ + if (journal_fh != NULL) + { + old_fh = journal_fh; + rename(journal_cur, journal_old); + ++stats_journal_rotate; + } + + journal_fh = fopen(journal_cur, "a"); + pthread_mutex_unlock(&journal_lock); + + if (old_fh != NULL) + fclose(old_fh); + + if (journal_fh == NULL) + RRDD_LOG(LOG_CRIT, + "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)", + journal_cur, rrd_strerror(errno)); + +} /* }}} static void journal_rotate */ + +static void journal_done(void) /* {{{ */ +{ + if (journal_cur == NULL) + return; + + pthread_mutex_lock(&journal_lock); + if (journal_fh != NULL) + { + fclose(journal_fh); + journal_fh = NULL; + } + + RRDD_LOG(LOG_INFO, "removing journals"); + + unlink(journal_old); + unlink(journal_cur); + pthread_mutex_unlock(&journal_lock); + +} /* }}} static void journal_done */ + +static int journal_write(char *cmd, char *args) /* {{{ */ +{ + int chars; + + if (journal_fh == NULL) + return 0; + + pthread_mutex_lock(&journal_lock); + chars = fprintf(journal_fh, "%s %s\n", cmd, args); + pthread_mutex_unlock(&journal_lock); + + if (chars > 0) + { + pthread_mutex_lock(&stats_lock); + stats_journal_bytes += chars; + pthread_mutex_unlock(&stats_lock); + } + + return chars; +} /* }}} static int journal_write */ + +static int journal_replay (const char *file) /* {{{ */ +{ + FILE *fh; + int entry_cnt = 0; + int fail_cnt = 0; + uint64_t line = 0; + char entry[CMD_MAX]; + + if (file == NULL) return 0; + + fh = fopen(file, "r"); + if (fh == NULL) + { + if (errno != ENOENT) + RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)", + file, rrd_strerror(errno)); + return 0; + } + else + RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file); + + while(!feof(fh)) + { + size_t entry_len; + + ++line; + fgets(entry, sizeof(entry), fh); + entry_len = strlen(entry); + + /* check \n termination in case journal writing crashed mid-line */ + if (entry_len == 0) + continue; + else if (entry[entry_len - 1] != '\n') + { + RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line); + ++fail_cnt; + continue; + } + + entry[entry_len - 1] = '\0'; + + if (handle_request(-1, entry, entry_len) == 0) + ++entry_cnt; + else + ++fail_cnt; + } + + fclose(fh); + + if (entry_cnt > 0) + { + RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)", + entry_cnt, fail_cnt); + return 1; + } + else + return 0; + +} /* }}} static int journal_replay */ + +static void *connection_thread_main (void *args) /* {{{ */ { pthread_t self; int i; int fd; fd = *((int *) args); + free (args); - pthread_mutex_lock (&connetion_threads_lock); + pthread_mutex_lock (&connection_threads_lock); { pthread_t *temp; - temp = (pthread_t *) realloc (connetion_threads, - sizeof (pthread_t) * (connetion_threads_num + 1)); + temp = (pthread_t *) realloc (connection_threads, + sizeof (pthread_t) * (connection_threads_num + 1)); if (temp == NULL) { RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed."); } else { - connetion_threads = temp; - connetion_threads[connetion_threads_num] = pthread_self (); - connetion_threads_num++; + connection_threads = temp; + connection_threads[connection_threads_num] = pthread_self (); + connection_threads_num++; } } - pthread_mutex_unlock (&connetion_threads_lock); + pthread_mutex_unlock (&connection_threads_lock); while (do_shutdown == 0) { + char buffer[CMD_MAX]; + struct pollfd pollfd; int status; @@ -1130,7 +1437,18 @@ static void *connection_thread_main (void *args /* {{{ */ break; } - status = handle_request (fd); + status = (int) sread (fd, buffer, sizeof (buffer)); + if (status <= 0) + { + close (fd); + + if (status < 0) + RRDD_LOG(LOG_ERR, "connection_thread_main: sread failed."); + + break; + } + + status = handle_request (fd, buffer, /*buffer_size=*/ status); if (status != 0) { close (fd); @@ -1140,25 +1458,24 @@ static void *connection_thread_main (void *args /* {{{ */ self = pthread_self (); /* Remove this thread from the connection threads list */ - pthread_mutex_lock (&connetion_threads_lock); + pthread_mutex_lock (&connection_threads_lock); /* Find out own index in the array */ - for (i = 0; i < connetion_threads_num; i++) - if (pthread_equal (connetion_threads[i], self) != 0) + for (i = 0; i < connection_threads_num; i++) + if (pthread_equal (connection_threads[i], self) != 0) break; - assert (i < connetion_threads_num); + assert (i < connection_threads_num); /* Move the trailing threads forward. */ - if (i < (connetion_threads_num - 1)) + if (i < (connection_threads_num - 1)) { - memmove (connetion_threads + i, - connetion_threads + i + 1, - sizeof (pthread_t) * (connetion_threads_num - i - 1)); + memmove (connection_threads + i, + connection_threads + i + 1, + sizeof (pthread_t) * (connection_threads_num - i - 1)); } - connetion_threads_num--; - pthread_mutex_unlock (&connetion_threads_lock); + connection_threads_num--; + pthread_mutex_unlock (&connection_threads_lock); - free (args); return (NULL); } /* }}} void *connection_thread_main */ @@ -1342,6 +1659,8 @@ static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */ } memset (pollfds, 0, sizeof (*pollfds) * pollfds_num); + RRDD_LOG(LOG_INFO, "listening for connections"); + while (do_shutdown == 0) { assert (pollfds_num == ((int) listen_fds_num)); @@ -1413,20 +1732,22 @@ static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */ } /* for (pollfds_num) */ } /* while (do_shutdown == 0) */ + RRDD_LOG(LOG_INFO, "starting shutdown"); + close_listen_sockets (); - pthread_mutex_lock (&connetion_threads_lock); - while (connetion_threads_num > 0) + pthread_mutex_lock (&connection_threads_lock); + while (connection_threads_num > 0) { pthread_t wait_for; - wait_for = connetion_threads[0]; + wait_for = connection_threads[0]; - pthread_mutex_unlock (&connetion_threads_lock); + pthread_mutex_unlock (&connection_threads_lock); pthread_join (wait_for, /* retval = */ NULL); - pthread_mutex_lock (&connetion_threads_lock); + pthread_mutex_lock (&connection_threads_lock); } - pthread_mutex_unlock (&connetion_threads_lock); + pthread_mutex_unlock (&connection_threads_lock); return (NULL); } /* }}} void *listen_thread_main */ @@ -1443,6 +1764,9 @@ static int daemonize (void) /* {{{ */ static struct sigaction sa_term; static struct sigaction sa_pipe; + if (stay_foreground) + goto child_startup; + child = fork (); if (child < 0) { @@ -1477,6 +1801,7 @@ static int daemonize (void) /* {{{ */ dup (0); dup (0); +child_startup: /* Install signal handlers */ memset (&sa_int, 0, sizeof (sa_int)); sa_int.sa_handler = sig_int_handler; @@ -1491,6 +1816,7 @@ static int daemonize (void) /* {{{ */ sigaction (SIGPIPE, &sa_pipe, NULL); openlog ("rrdcached", LOG_PID, LOG_DAEMON); + RRDD_LOG(LOG_INFO, "starting up"); cache_tree = g_tree_new ((GCompareFunc) strcmp); if (cache_tree == NULL) @@ -1499,18 +1825,8 @@ static int daemonize (void) /* {{{ */ return (-1); } - memset (&queue_thread, 0, sizeof (queue_thread)); - status = pthread_create (&queue_thread, /* attr = */ NULL, - queue_thread_main, /* args = */ NULL); - if (status != 0) - { - RRDD_LOG (LOG_ERR, "daemonize: pthread_create failed."); - return (-1); - } - - write_pidfile (); - - return (0); + status = write_pidfile (); + return status; } /* }}} int daemonize */ static int cleanup (void) /* {{{ */ @@ -1522,6 +1838,7 @@ static int cleanup (void) /* {{{ */ remove_pidfile (); + RRDD_LOG(LOG_INFO, "goodbye"); closelog (); return (0); @@ -1532,10 +1849,14 @@ static int read_options (int argc, char **argv) /* {{{ */ int option; int status = 0; - while ((option = getopt(argc, argv, "l:f:w:b:p:h?")) != -1) + while ((option = getopt(argc, argv, "gl:f:w:b:z:p:j:h?")) != -1) { switch (option) { + case 'g': + stay_foreground=1; + break; + case 'l': { char **temp; @@ -1589,6 +1910,22 @@ static int read_options (int argc, char **argv) /* {{{ */ } break; + case 'z': + { + int temp; + + temp = atoi(optarg); + if (temp > 0) + config_write_jitter = temp; + else + { + fprintf (stderr, "Invalid write jitter: -z %s\n", optarg); + status = 2; + } + + break; + } + case 'b': { size_t len; @@ -1630,6 +1967,41 @@ static int read_options (int argc, char **argv) /* {{{ */ } break; + case 'j': + { + struct stat statbuf; + const char *dir = optarg; + + status = stat(dir, &statbuf); + if (status != 0) + { + fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno)); + return 6; + } + + if (!S_ISDIR(statbuf.st_mode) + || access(dir, R_OK|W_OK|X_OK) != 0) + { + fprintf(stderr, "Must specify a writable directory with -j! (%s)\n", + errno ? rrd_strerror(errno) : ""); + return 6; + } + + journal_cur = malloc(PATH_MAX + 1); + journal_old = malloc(PATH_MAX + 1); + if (journal_cur == NULL || journal_old == NULL) + { + fprintf(stderr, "malloc failure for journal files\n"); + return 6; + } + else + { + snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir); + snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir); + } + } + break; + case 'h': case '?': printf ("RRDd %s Copyright (C) 2008 Florian octo Forster\n" @@ -1639,6 +2011,7 @@ static int read_options (int argc, char **argv) /* {{{ */ "Valid options are:\n" " -l
Socket address to listen to.\n" " -w Interval in which to write data.\n" + " -z Delay writes up to seconds to spread load" \ " -f Interval in which to flush dead data.\n" " -p Location of the PID-file.\n" " -b Base directory to change to.\n" @@ -1652,6 +2025,14 @@ static int read_options (int argc, char **argv) /* {{{ */ } /* switch (option) */ } /* while (getopt) */ + /* advise the user when values are not sane */ + if (config_flush_interval < 2 * config_write_interval) + fprintf(stderr, "WARNING: flush interval (-f) should be at least" + " 2x write interval (-w) !\n"); + if (config_write_jitter > config_write_interval) + fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than" + " write interval (-w) !\n"); + return (status); } /* }}} int read_options */ @@ -1684,8 +2065,40 @@ int main (int argc, char **argv) return (1); } - listen_thread_main (NULL); + if (journal_cur != NULL) + { + int had_journal = 0; + + pthread_mutex_lock(&journal_lock); + + RRDD_LOG(LOG_INFO, "checking for journal files"); + + had_journal += journal_replay(journal_old); + had_journal += journal_replay(journal_cur); + + if (had_journal) + flush_old_values(-1); + + pthread_mutex_unlock(&journal_lock); + journal_rotate(); + + RRDD_LOG(LOG_INFO, "journal processing complete"); + } + /* start the queue thread */ + memset (&queue_thread, 0, sizeof (queue_thread)); + status = pthread_create (&queue_thread, + NULL, /* attr */ + queue_thread_main, + NULL); /* args */ + if (status != 0) + { + RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread"); + cleanup(); + return (1); + } + + listen_thread_main (NULL); cleanup (); return (0);