+/* MUST NOT hold journal_lock before calling this */
+static void journal_rotate(void) /* {{{ */
+{
+ FILE *old_fh = NULL;
+
+ if (journal_cur == NULL || journal_old == NULL)
+ return;
+
+ pthread_mutex_lock(&journal_lock);
+
+ /* we rotate this way (rename before close) so that the we can release
+ * the journal lock as fast as possible. Journal writes to the new
+ * journal can proceed immediately after the new file is opened. The
+ * fclose can then block without affecting new updates.
+ */
+ if (journal_fh != NULL)
+ {
+ old_fh = journal_fh;
+ rename(journal_cur, journal_old);
+ ++stats_journal_rotate;
+ }
+
+ journal_fh = fopen(journal_cur, "a");
+ pthread_mutex_unlock(&journal_lock);
+
+ if (old_fh != NULL)
+ fclose(old_fh);
+
+ if (journal_fh == NULL)
+ RRDD_LOG(LOG_CRIT,
+ "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
+ journal_cur, rrd_strerror(errno));
+
+} /* }}} static void journal_rotate */
+
+static void journal_done(void) /* {{{ */
+{
+ if (journal_cur == NULL)
+ return;
+
+ pthread_mutex_lock(&journal_lock);
+ if (journal_fh != NULL)
+ {
+ fclose(journal_fh);
+ journal_fh = NULL;
+ }
+
+ RRDD_LOG(LOG_INFO, "removing journals");
+
+ unlink(journal_old);
+ unlink(journal_cur);
+ pthread_mutex_unlock(&journal_lock);
+
+} /* }}} static void journal_done */
+
+static int journal_write(char *cmd, char *args) /* {{{ */
+{
+ int chars;
+
+ if (journal_fh == NULL)
+ return 0;
+
+ pthread_mutex_lock(&journal_lock);
+ chars = fprintf(journal_fh, "%s %s\n", cmd, args);
+ pthread_mutex_unlock(&journal_lock);
+
+ if (chars > 0)
+ {
+ pthread_mutex_lock(&stats_lock);
+ stats_journal_bytes += chars;
+ pthread_mutex_unlock(&stats_lock);
+ }
+
+ return chars;
+} /* }}} static int journal_write */
+
+static int journal_replay (const char *file) /* {{{ */
+{
+ FILE *fh;
+ int entry_cnt = 0;
+ int fail_cnt = 0;
+ uint64_t line = 0;
+ char entry[CMD_MAX];
+
+ if (file == NULL) return 0;
+
+ fh = fopen(file, "r");
+ if (fh == NULL)
+ {
+ if (errno != ENOENT)
+ RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
+ file, rrd_strerror(errno));
+ return 0;
+ }
+ else
+ RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
+
+ while(!feof(fh))
+ {
+ size_t entry_len;
+
+ ++line;
+ fgets(entry, sizeof(entry), fh);
+ entry_len = strlen(entry);
+
+ /* check \n termination in case journal writing crashed mid-line */
+ if (entry_len == 0)
+ continue;
+ else if (entry[entry_len - 1] != '\n')
+ {
+ RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
+ ++fail_cnt;
+ continue;
+ }
+
+ entry[entry_len - 1] = '\0';
+
+ if (handle_request(-1, entry, entry_len) == 0)
+ ++entry_cnt;
+ else
+ ++fail_cnt;
+ }
+
+ fclose(fh);
+
+ if (entry_cnt > 0)
+ {
+ RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
+ entry_cnt, fail_cnt);
+ return 1;
+ }
+ else
+ return 0;
+
+} /* }}} static int journal_replay */
+
+static void *connection_thread_main (void *args) /* {{{ */