From: oetiker Date: Sat, 8 Aug 2009 09:38:48 +0000 (+0000) Subject: The journal files are time-stamped and replayed in order. This allows X-Git-Url: https://git.octo.it/?p=rrdtool.git;a=commitdiff_plain;h=9d1a64da41154f4d2f6c803c572f6f7d2684d732 The journal files are time-stamped and replayed in order. This allows systems with 32-bit signed off_t to write more than 2GB of journal entries per flush interval. --kevin git-svn-id: svn://svn.oetiker.ch/rrdtool/trunk/program@1885 a5681a0c-68f1-0310-ab6d-d61299d08faa --- diff --git a/src/rrd_daemon.c b/src/rrd_daemon.c index f0ad729..199ebdb 100644 --- a/src/rrd_daemon.c +++ b/src/rrd_daemon.c @@ -91,6 +91,7 @@ #include #include +#include #include #include #include @@ -202,6 +203,12 @@ enum queue_side_e }; typedef enum queue_side_e queue_side_t; +/* describe a set of journal files */ +typedef struct { + char **files; + size_t files_num; +} journal_set; + /* max length of socket command or response */ #define CMD_MAX 4096 #define RBUF_SIZE (CMD_MAX*2) @@ -260,9 +267,13 @@ static uint64_t stats_journal_rotate = 0; static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER; /* Journaled updates */ -static char *journal_cur = NULL; -static char *journal_old = NULL; -static FILE *journal_fh = NULL; +#define JOURNAL_BASE "rrd.journal" +static journal_set *journal_cur = NULL; +static journal_set *journal_old = NULL; +static char *journal_dir = NULL; +static FILE *journal_fh = NULL; /* current journal file handle */ +static long journal_size = 0; /* current journal size */ +#define JOURNAL_MAX (1 * 1024 * 1024 * 1024) static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER; static int journal_write(char *cmd, char *args); static void journal_done(void); @@ -1745,74 +1756,136 @@ static int handle_request (DISPATCH_PROTO) /* {{{ */ return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size); } /* }}} int handle_request */ -/* MUST NOT hold journal_lock before calling this */ -static void journal_rotate(void) /* {{{ */ +static void journal_set_free (journal_set *js) /* {{{ */ { - FILE *old_fh = NULL; - int new_fd; - - if (journal_cur == NULL || journal_old == NULL) + if (js == NULL) return; - pthread_mutex_lock(&journal_lock); + rrd_free_ptrs((void ***) &js->files, &js->files_num); - /* we rotate this way (rename before close) so that the we can release - * the journal lock as fast as possible. Journal writes to the new - * journal can proceed immediately after the new file is opened. The - * fclose can then block without affecting new updates. - */ - if (journal_fh != NULL) + free(js); +} /* }}} journal_set_free */ + +static void journal_set_remove (journal_set *js) /* {{{ */ +{ + if (js == NULL) + return; + + for (uint i=0; i < js->files_num; i++) { - old_fh = journal_fh; - journal_fh = NULL; - rename(journal_cur, journal_old); - ++stats_journal_rotate; + RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]); + unlink(js->files[i]); } +} /* }}} journal_set_remove */ - new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND, - S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH); - if (new_fd >= 0) +/* close current journal file handle. + * MUST hold journal_lock before calling */ +static void journal_close(void) /* {{{ */ +{ + if (journal_fh != NULL) { - journal_fh = fdopen(new_fd, "a"); - if (journal_fh == NULL) - close(new_fd); + if (fclose(journal_fh) != 0) + RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno)); } - pthread_mutex_unlock(&journal_lock); + journal_fh = NULL; + journal_size = 0; +} /* }}} journal_close */ + +/* MUST hold journal_lock before calling */ +static void journal_new_file(void) /* {{{ */ +{ + struct timeval now; + int new_fd; + char new_file[PATH_MAX + 1]; + + assert(journal_dir != NULL); + assert(journal_cur != NULL); - if (old_fh != NULL) - fclose(old_fh); + journal_close(); + gettimeofday(&now, NULL); + /* this format assures that the files sort in strcmp() order */ + snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d", + journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec); + + new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND, + S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH); + if (new_fd < 0) + goto error; + + journal_fh = fdopen(new_fd, "a"); if (journal_fh == NULL) - { - RRDD_LOG(LOG_CRIT, - "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)", - journal_cur, rrd_strerror(errno)); + goto error; - RRDD_LOG(LOG_ERR, - "JOURNALING DISABLED: All values will be flushed at shutdown"); - config_flush_at_shutdown = 1; - } + journal_size = ftell(journal_fh); + RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file); + + /* record the file in the journal set */ + rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file); + + return; + +error: + RRDD_LOG(LOG_CRIT, + "JOURNALING DISABLED: Error while trying to create %s : %s", + new_file, rrd_strerror(errno)); + RRDD_LOG(LOG_CRIT, + "JOURNALING DISABLED: All values will be flushed at shutdown"); + + close(new_fd); + config_flush_at_shutdown = 1; + +} /* }}} journal_new_file */ + +/* MUST NOT hold journal_lock before calling this */ +static void journal_rotate(void) /* {{{ */ +{ + journal_set *old_js = NULL; + + if (journal_dir == NULL) + return; + + RRDD_LOG(LOG_DEBUG, "rotating journals"); + + pthread_mutex_lock(&stats_lock); + ++stats_journal_rotate; + pthread_mutex_unlock(&stats_lock); + + pthread_mutex_lock(&journal_lock); + + journal_close(); + + /* rotate the journal sets */ + old_js = journal_old; + journal_old = journal_cur; + journal_cur = calloc(1, sizeof(journal_set)); + + if (journal_cur != NULL) + journal_new_file(); + else + RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n"); + + pthread_mutex_unlock(&journal_lock); + + journal_set_remove(old_js); + journal_set_free (old_js); } /* }}} static void journal_rotate */ +/* MUST hold journal_lock when calling */ static void journal_done(void) /* {{{ */ { if (journal_cur == NULL) return; - pthread_mutex_lock(&journal_lock); - if (journal_fh != NULL) - { - fclose(journal_fh); - journal_fh = NULL; - } + journal_close(); if (config_flush_at_shutdown) { RRDD_LOG(LOG_INFO, "removing journals"); - unlink(journal_old); - unlink(journal_cur); + journal_set_remove(journal_old); + journal_set_remove(journal_cur); } else { @@ -1820,7 +1893,9 @@ static void journal_done(void) /* {{{ */ "journals will be used at next startup"); } - pthread_mutex_unlock(&journal_lock); + journal_set_free(journal_cur); + journal_set_free(journal_old); + free(journal_dir); } /* }}} static void journal_done */ @@ -1833,6 +1908,11 @@ static int journal_write(char *cmd, char *args) /* {{{ */ pthread_mutex_lock(&journal_lock); chars = fprintf(journal_fh, "%s %s\n", cmd, args); + journal_size += chars; + + if (journal_size > JOURNAL_MAX) + journal_new_file(); + pthread_mutex_unlock(&journal_lock); if (chars > 0) @@ -1864,9 +1944,6 @@ static int journal_replay (const char *file) /* {{{ */ memset(&statbuf, 0, sizeof(statbuf)); if (stat(file, &statbuf) != 0) { - if (errno == ENOENT) - return 0; - reason = "stat error"; status = errno; } @@ -1942,25 +2019,79 @@ static int journal_replay (const char *file) /* {{{ */ return entry_cnt > 0 ? 1 : 0; } /* }}} static int journal_replay */ +static int journal_sort(const void *v1, const void *v2) +{ + char **jn1 = (char **) v1; + char **jn2 = (char **) v2; + + return strcmp(*jn1,*jn2); +} + static void journal_init(void) /* {{{ */ { int had_journal = 0; + DIR *dir; + struct dirent *dent; + char path[PATH_MAX+1]; - if (journal_cur == NULL) return; + if (journal_dir == NULL) return; pthread_mutex_lock(&journal_lock); + journal_cur = calloc(1, sizeof(journal_set)); + if (journal_cur == NULL) + { + RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n"); + return; + } + RRDD_LOG(LOG_INFO, "checking for journal files"); - had_journal += journal_replay(journal_old); - had_journal += journal_replay(journal_cur); + /* Handle old journal files during transition. This gives them the + * correct sort order. TODO: remove after first release + */ + { + char old_path[PATH_MAX+1]; + snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" ); + snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000"); + rename(old_path, path); + + snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ); + snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001"); + rename(old_path, path); + } + + dir = opendir(journal_dir); + while ((dent = readdir(dir)) != NULL) + { + /* looks like a journal file? */ + if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE))) + continue; + + snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name); + + if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path)) + { + RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!", + dent->d_name); + break; + } + } + closedir(dir); + + qsort(journal_cur->files, journal_cur->files_num, + sizeof(journal_cur->files[0]), journal_sort); + + for (uint i=0; i < journal_cur->files_num; i++) + had_journal += journal_replay(journal_cur->files[i]); + + journal_new_file(); /* it must have been a crash. start a flush */ if (had_journal && config_flush_at_shutdown) flush_old_values(-1); pthread_mutex_unlock(&journal_lock); - journal_rotate(); RRDD_LOG(LOG_INFO, "journal processing complete"); @@ -2527,17 +2658,16 @@ static int cleanup (void) /* {{{ */ RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed"); } - journal_done(); - free(queue_threads); free(config_base_dir); free(config_pid_file); - free(journal_cur); - free(journal_old); pthread_mutex_lock(&cache_lock); g_tree_destroy(cache_tree); + pthread_mutex_lock(&journal_lock); + journal_done(); + RRDD_LOG(LOG_INFO, "goodbye"); closelog (); @@ -2720,7 +2850,7 @@ static int read_options (int argc, char **argv) /* {{{ */ case 'j': { struct stat statbuf; - const char *dir = optarg; + const char *dir = journal_dir = strdup(optarg); status = stat(dir, &statbuf); if (status != 0) @@ -2736,19 +2866,6 @@ static int read_options (int argc, char **argv) /* {{{ */ errno ? rrd_strerror(errno) : ""); return 6; } - - journal_cur = malloc(PATH_MAX + 1); - journal_old = malloc(PATH_MAX + 1); - if (journal_cur == NULL || journal_old == NULL) - { - fprintf(stderr, "malloc failure for journal files\n"); - return 6; - } - else - { - snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir); - snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir); - } } break; @@ -2793,7 +2910,7 @@ static int read_options (int argc, char **argv) /* {{{ */ fprintf(stderr, "WARNING: -B does not make sense without -b!\n" " Consult the rrdcached documentation\n"); - if (journal_cur == NULL) + if (journal_dir == NULL) config_flush_at_shutdown = 1; return (status);