X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Frrd_daemon.c;h=0816526bd43a808e549dc4c68b1b4f04aa7dbf82;hb=refs%2Fheads%2Fff%2Frrdd;hp=7a51f5d0545e1a7b5f428c474aea152c596ded10;hpb=2154be5fab4197fdfa551b5a46cc3b12c4571d2a;p=rrdtool.git diff --git a/src/rrd_daemon.c b/src/rrd_daemon.c index 7a51f5d..0816526 100644 --- a/src/rrd_daemon.c +++ b/src/rrd_daemon.c @@ -1,6 +1,7 @@ /** * RRDTool - src/rrd_daemon.c * Copyright (C) 2008 Florian octo Forster + * Copyright (C) 2008 Kevin Brintnall * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the @@ -17,8 +18,10 @@ * * Authors: * Florian octo Forster + * kevin brintnall **/ +#if 0 /* * First tell the compiler to stick to the C99 and POSIX standards as close as * possible. @@ -54,6 +57,7 @@ # undef _GNU_SOURCE #endif /* }}} */ +#endif /* 0 */ /* * Now for some includes.. @@ -112,8 +116,8 @@ struct cache_item_s char **values; int values_num; time_t last_flush_time; -#define CI_FLAGS_IN_TREE 0x01 -#define CI_FLAGS_IN_QUEUE 0x02 +#define CI_FLAGS_IN_TREE (1<<0) +#define CI_FLAGS_IN_QUEUE (1<<1) int flags; cache_item_t *next; @@ -122,6 +126,7 @@ struct cache_item_s struct callback_flush_data_s { time_t now; + time_t abs_timeout; char **keys; size_t keys_num; }; @@ -134,9 +139,14 @@ enum queue_side_e }; typedef enum queue_side_e queue_side_t; +/* max length of socket command or response */ +#define CMD_MAX 4096 + /* * Variables */ +static int stay_foreground = 0; + static listen_socket_t *listen_fds = NULL; static size_t listen_fds_num = 0; @@ -144,9 +154,9 @@ static int do_shutdown = 0; static pthread_t queue_thread; -static pthread_t *connetion_threads = NULL; -static pthread_mutex_t connetion_threads_lock = PTHREAD_MUTEX_INITIALIZER; -static int connetion_threads_num = 0; +static pthread_t *connection_threads = NULL; +static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER; +static int connection_threads_num = 0; /* Cache stuff */ static GTree *cache_tree = NULL; @@ -158,6 +168,7 @@ static pthread_cond_t cache_cond = PTHREAD_COND_INITIALIZER; static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER; static int config_write_interval = 300; +static int config_write_jitter = 0; static int config_flush_interval = 3600; static char *config_pid_file = NULL; static char *config_base_dir = NULL; @@ -166,27 +177,45 @@ static char **config_listen_address_list = NULL; static int config_listen_address_list_len = 0; static uint64_t stats_queue_length = 0; -static uint64_t stats_updates_total = 0; -static uint64_t stats_values_total = 0; +static uint64_t stats_updates_received = 0; +static uint64_t stats_flush_received = 0; +static uint64_t stats_updates_written = 0; +static uint64_t stats_data_sets_written = 0; +static uint64_t stats_journal_bytes = 0; +static uint64_t stats_journal_rotate = 0; static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER; +/* Journaled updates */ +static char *journal_cur = NULL; +static char *journal_old = NULL; +static FILE *journal_fh = NULL; +static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER; +static int journal_write(char *cmd, char *args); +static void journal_done(void); +static void journal_rotate(void); + /* * Functions */ static void sig_int_handler (int s __attribute__((unused))) /* {{{ */ { + RRDD_LOG(LOG_NOTICE, "caught SIGINT"); do_shutdown++; + pthread_cond_broadcast(&cache_cond); } /* }}} void sig_int_handler */ static void sig_term_handler (int s __attribute__((unused))) /* {{{ */ { + RRDD_LOG(LOG_NOTICE, "caught SIGTERM"); do_shutdown++; + pthread_cond_broadcast(&cache_cond); } /* }}} void sig_term_handler */ static int write_pidfile (void) /* {{{ */ { pid_t pid; char *file; + int fd; FILE *fh; pid = getpid (); @@ -195,10 +224,19 @@ static int write_pidfile (void) /* {{{ */ ? config_pid_file : LOCALSTATEDIR "/run/rrdcached.pid"; - fh = fopen (file, "w"); + fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH); + if (fd < 0) + { + RRDD_LOG(LOG_ERR, "FATAL: cannot create '%s' (%s)", + file, rrd_strerror(errno)); + return (-1); + } + + fh = fdopen (fd, "w"); if (fh == NULL) { RRDD_LOG (LOG_ERR, "write_pidfile: Opening `%s' failed.", file); + close(fd); return (-1); } @@ -223,6 +261,99 @@ static int remove_pidfile (void) /* {{{ */ return (errno); } /* }}} int remove_pidfile */ +static ssize_t sread (int fd, void *buffer_void, size_t buffer_size) /* {{{ */ +{ + char *buffer; + size_t buffer_used; + size_t buffer_free; + ssize_t status; + + buffer = (char *) buffer_void; + buffer_used = 0; + buffer_free = buffer_size; + + while (buffer_free > 0) + { + status = read (fd, buffer + buffer_used, buffer_free); + if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR))) + continue; + + if (status < 0) + return (-1); + + if (status == 0) + return (0); + + assert ((0 > status) || (buffer_free >= (size_t) status)); + + buffer_free = buffer_free - status; + buffer_used = buffer_used + status; + + if (buffer[buffer_used - 1] == '\n') + break; + } + + assert (buffer_used > 0); + + if (buffer[buffer_used - 1] != '\n') + { + errno = ENOBUFS; + return (-1); + } + + buffer[buffer_used - 1] = 0; + + /* Fix network line endings. */ + if ((buffer_used > 1) && (buffer[buffer_used - 2] == '\r')) + { + buffer_used--; + buffer[buffer_used - 1] = 0; + } + + return (buffer_used); +} /* }}} ssize_t sread */ + +static ssize_t swrite (int fd, const void *buf, size_t count) /* {{{ */ +{ + const char *ptr; + size_t nleft; + ssize_t status; + + /* special case for journal replay */ + if (fd < 0) return 0; + + ptr = (const char *) buf; + nleft = count; + + while (nleft > 0) + { + status = write (fd, (const void *) ptr, nleft); + + if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR))) + continue; + + if (status < 0) + return (status); + + nleft -= status; + ptr += status; + } + + return (0); +} /* }}} ssize_t swrite */ + +static void _wipe_ci_values(cache_item_t *ci, time_t when) +{ + ci->values = NULL; + ci->values_num = 0; + + ci->last_flush_time = when; + if (config_write_jitter > 0) + ci->last_flush_time += (random() % config_write_jitter); + + ci->flags &= ~(CI_FLAGS_IN_QUEUE); +} + /* * enqueue_cache_item: * `cache_lock' must be acquired before calling this function! @@ -232,9 +363,6 @@ static int enqueue_cache_item (cache_item_t *ci, /* {{{ */ { int did_insert = 0; - RRDD_LOG (LOG_DEBUG, "enqueue_cache_item: Adding %s to the update queue.", - ci->file); - if (ci == NULL) return (-1); @@ -321,7 +449,13 @@ static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */ ci = (cache_item_t *) value; cfd = (callback_flush_data_t *) data; - if (((cfd->now - ci->last_flush_time) >= config_write_interval) + if ((ci->last_flush_time <= cfd->abs_timeout) + && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) + && (ci->values_num > 0)) + { + enqueue_cache_item (ci, TAIL); + } + else if ((do_shutdown != 0) && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) && (ci->values_num > 0)) { @@ -350,6 +484,61 @@ static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */ return (FALSE); } /* }}} gboolean tree_callback_flush */ +static int flush_old_values (int max_age) +{ + callback_flush_data_t cfd; + size_t k; + + memset (&cfd, 0, sizeof (cfd)); + /* Pass the current time as user data so that we don't need to call + * `time' for each node. */ + cfd.now = time (NULL); + cfd.keys = NULL; + cfd.keys_num = 0; + + if (max_age > 0) + cfd.abs_timeout = cfd.now - max_age; + else + cfd.abs_timeout = cfd.now + 1; + + /* `tree_callback_flush' will return the keys of all values that haven't + * been touched in the last `config_flush_interval' seconds in `cfd'. + * The char*'s in this array point to the same memory as ci->file, so we + * don't need to free them separately. */ + g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd); + + for (k = 0; k < cfd.keys_num; k++) + { + cache_item_t *ci; + + /* This must not fail. */ + ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]); + assert (ci != NULL); + + /* If we end up here with values available, something's seriously + * messed up. */ + assert (ci->values_num == 0); + + /* Remove the node from the tree */ + g_tree_remove (cache_tree, cfd.keys[k]); + cfd.keys[k] = NULL; + + /* Now free and clean up `ci'. */ + free (ci->file); + ci->file = NULL; + free (ci); + ci = NULL; + } /* for (k = 0; k < cfd.keys_num; k++) */ + + if (cfd.keys != NULL) + { + free (cfd.keys); + cfd.keys = NULL; + } + + return (0); +} /* int flush_old_values */ + static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ { struct timeval now; @@ -375,52 +564,19 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ || ((now.tv_sec == next_flush.tv_sec) && ((1000 * now.tv_usec) > next_flush.tv_nsec))) { - callback_flush_data_t cfd; - size_t k; - - memset (&cfd, 0, sizeof (cfd)); - /* Pass the current time as user data so that we don't need to call - * `time' for each node. */ - cfd.now = time (NULL); - cfd.keys = NULL; - cfd.keys_num = 0; - - /* `tree_callback_flush' will return the keys of all values that haven't - * been touched in the last `config_flush_interval' seconds in `cfd'. - * The char*'s in this array point to the same memory as ci->file, so we - * don't need to free them separately. */ - g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd); - - for (k = 0; k < cfd.keys_num; k++) - { - /* This must not fail. */ - ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]); - assert (ci != NULL); - - /* If we end up here with values available, something's seriously - * messed up. */ - assert (ci->values_num == 0); - - /* Remove the node from the tree */ - g_tree_remove (cache_tree, cfd.keys[k]); - cfd.keys[k] = NULL; - - /* Now free and clean up `ci'. */ - free (ci->file); - ci->file = NULL; - free (ci); - ci = NULL; - } /* for (k = 0; k < cfd.keys_num; k++) */ - - if (cfd.keys != NULL) - { - free (cfd.keys); - cfd.keys = NULL; - } + /* Flush all values that haven't been written in the last + * `config_write_interval' seconds. */ + flush_old_values (config_write_interval); /* Determine the time of the next cache flush. */ - while (next_flush.tv_sec < now.tv_sec) + while (next_flush.tv_sec <= now.tv_sec) next_flush.tv_sec += config_flush_interval; + + /* unlock the cache while we rotate so we don't block incoming + * updates if the fsync() blocks on disk I/O */ + pthread_mutex_unlock(&cache_lock); + journal_rotate(); + pthread_mutex_lock(&cache_lock); } /* Now, check if there's something to store away. If not, wait until @@ -435,6 +591,10 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ } } + /* We're about to shut down, so lets flush the entire tree. */ + if ((do_shutdown != 0) && (cache_queue_head == NULL)) + flush_old_values (/* max age = */ -1); + /* Check if a value has arrived. This may be NULL if we timed out or there * was an interrupt such as a signal. */ if (cache_queue_head == NULL) @@ -450,14 +610,13 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ continue; } + assert(ci->values != NULL); + assert(ci->values_num > 0); + values = ci->values; values_num = ci->values_num; - ci->values = NULL; - ci->values_num = 0; - - ci->last_flush_time = time (NULL); - ci->flags &= ~(CI_FLAGS_IN_QUEUE); + _wipe_ci_values(ci, time(NULL)); cache_queue_head = ci->next; if (cache_queue_head == NULL) @@ -471,32 +630,43 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ pthread_mutex_unlock (&cache_lock); - RRDD_LOG (LOG_DEBUG, "queue_thread_main: rrd_update (%s, %i, %p)", - file, values_num, (void *) values); - + rrd_clear_error (); status = rrd_update_r (file, NULL, values_num, (void *) values); if (status != 0) { - RRDD_LOG (LOG_ERR, "queue_thread_main: " - "rrd_update_r failed with status %i.", - status); + RRDD_LOG (LOG_NOTICE, "queue_thread_main: " + "rrd_update_r (%s) failed with status %i. (%s)", + file, status, rrd_get_error()); } - free (file); + journal_write("wrote", file); + for (i = 0; i < values_num; i++) free (values[i]); - pthread_mutex_lock (&stats_lock); - stats_updates_total++; - stats_values_total += values_num; - pthread_mutex_unlock (&stats_lock); + free(values); + free(file); + + if (status == 0) + { + pthread_mutex_lock (&stats_lock); + stats_updates_written++; + stats_data_sets_written += values_num; + pthread_mutex_unlock (&stats_lock); + } pthread_mutex_lock (&cache_lock); pthread_cond_broadcast (&flush_cond); - } /* while (do_shutdown == 0) */ + + /* We're about to shut down, so lets flush the entire tree. */ + if ((do_shutdown != 0) && (cache_queue_head == NULL)) + flush_old_values (/* max age = */ -1); + } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */ pthread_mutex_unlock (&cache_lock); - RRDD_LOG (LOG_DEBUG, "queue_thread_main: Exiting."); + assert(cache_queue_head == NULL); + RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed"); + journal_done(); return (NULL); } /* }}} void *queue_thread_main */ @@ -521,13 +691,13 @@ static int buffer_get_field (char **buffer_ret, /* {{{ */ return (-1); /* This is ensured by `handle_request'. */ - assert (buffer[buffer_size - 1] == ' '); + assert (buffer[buffer_size - 1] == '\0'); status = -1; while (buffer_pos < buffer_size) { /* Check for end-of-field or end-of-buffer */ - if (buffer[buffer_pos] == ' ') + if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0') { field[field_size] = 0; field_size++; @@ -687,11 +857,11 @@ static int handle_request_help (int fd, /* {{{ */ for (i = 0; i < help_text_len; i++) { - status = write (fd, help_text[i], strlen (help_text[i])); + status = swrite (fd, help_text[i], strlen (help_text[i])); if (status < 0) { status = errno; - RRDD_LOG (LOG_ERR, "handle_request_help: write(2) returned an error."); + RRDD_LOG (LOG_ERR, "handle_request_help: swrite returned an error."); return (status); } } @@ -704,37 +874,45 @@ static int handle_request_stats (int fd, /* {{{ */ size_t buffer_size __attribute__((unused))) { int status; - char outbuf[4096]; + char outbuf[CMD_MAX]; uint64_t copy_queue_length; - uint64_t copy_updates_total; - uint64_t copy_values_total; - - uint64_t tree_nodes; + uint64_t copy_updates_received; + uint64_t copy_flush_received; + uint64_t copy_updates_written; + uint64_t copy_data_sets_written; + uint64_t copy_journal_bytes; + uint64_t copy_journal_rotate; + + uint64_t tree_nodes_number; uint64_t tree_depth; pthread_mutex_lock (&stats_lock); - copy_queue_length = stats_queue_length; - copy_updates_total = stats_updates_total; - copy_values_total = stats_values_total; + copy_queue_length = stats_queue_length; + copy_updates_received = stats_updates_received; + copy_flush_received = stats_flush_received; + copy_updates_written = stats_updates_written; + copy_data_sets_written = stats_data_sets_written; + copy_journal_bytes = stats_journal_bytes; + copy_journal_rotate = stats_journal_rotate; pthread_mutex_unlock (&stats_lock); pthread_mutex_lock (&cache_lock); - tree_nodes = (uint64_t) g_tree_nnodes (cache_tree); - tree_depth = (uint64_t) g_tree_height (cache_tree); + tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree); + tree_depth = (uint64_t) g_tree_height (cache_tree); pthread_mutex_unlock (&cache_lock); #define RRDD_STATS_SEND \ outbuf[sizeof (outbuf) - 1] = 0; \ - status = write (fd, outbuf, strlen (outbuf)); \ + status = swrite (fd, outbuf, strlen (outbuf)); \ if (status < 0) \ { \ status = errno; \ - RRDD_LOG (LOG_INFO, "handle_request_stats: write(2) returned an error."); \ + RRDD_LOG (LOG_INFO, "handle_request_stats: swrite returned an error."); \ return (status); \ } - strncpy (outbuf, "5 Statistics follow\n", sizeof (outbuf)); + strncpy (outbuf, "9 Statistics follow\n", sizeof (outbuf)); RRDD_STATS_SEND; snprintf (outbuf, sizeof (outbuf), @@ -742,22 +920,39 @@ static int handle_request_stats (int fd, /* {{{ */ RRDD_STATS_SEND; snprintf (outbuf, sizeof (outbuf), - "UpdatesWritten: %"PRIu64"\n", copy_updates_total); + "UpdatesReceived: %"PRIu64"\n", copy_updates_received); + RRDD_STATS_SEND; + + snprintf (outbuf, sizeof (outbuf), + "FlushesReceived: %"PRIu64"\n", copy_flush_received); RRDD_STATS_SEND; snprintf (outbuf, sizeof (outbuf), - "ValuesWritten: %"PRIu64"\n", copy_values_total); + "UpdatesWritten: %"PRIu64"\n", copy_updates_written); RRDD_STATS_SEND; snprintf (outbuf, sizeof (outbuf), - "TreeNodesNumber: %"PRIu64"\n", tree_nodes); + "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written); + RRDD_STATS_SEND; + + snprintf (outbuf, sizeof (outbuf), + "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number); RRDD_STATS_SEND; snprintf (outbuf, sizeof (outbuf), "TreeDepth: %"PRIu64"\n", tree_depth); RRDD_STATS_SEND; + snprintf (outbuf, sizeof(outbuf), + "JournalBytes: %"PRIu64"\n", copy_journal_bytes); + RRDD_STATS_SEND; + + snprintf (outbuf, sizeof(outbuf), + "JournalRotate: %"PRIu64"\n", copy_journal_rotate); + RRDD_STATS_SEND; + return (0); +#undef RRDD_STATS_SEND } /* }}} int handle_request_stats */ static int handle_request_flush (int fd, /* {{{ */ @@ -765,7 +960,7 @@ static int handle_request_flush (int fd, /* {{{ */ { char *file; int status; - char result[4096]; + char result[CMD_MAX]; status = buffer_get_field (&buffer, &buffer_size, &file); if (status != 0) @@ -774,11 +969,24 @@ static int handle_request_flush (int fd, /* {{{ */ } else { + pthread_mutex_lock(&stats_lock); + stats_flush_received++; + pthread_mutex_unlock(&stats_lock); + status = flush_file (file); if (status == 0) snprintf (result, sizeof (result), "0 Successfully flushed %s.\n", file); else if (status == ENOENT) - snprintf (result, sizeof (result), "-1 No such file: %s.\n", file); + { + /* no file in our tree; see whether it exists at all */ + struct stat statbuf; + + memset(&statbuf, 0, sizeof(statbuf)); + if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode)) + snprintf (result, sizeof (result), "0 Nothing to flush: %s.\n", file); + else + snprintf (result, sizeof (result), "-1 No such file: %s.\n", file); + } else if (status < 0) strncpy (result, "-1 Internal error.\n", sizeof (result)); else @@ -786,11 +994,11 @@ static int handle_request_flush (int fd, /* {{{ */ } result[sizeof (result) - 1] = 0; - status = write (fd, result, strlen (result)); + status = swrite (fd, result, strlen (result)); if (status < 0) { status = errno; - RRDD_LOG (LOG_INFO, "handle_request_flush: write(2) returned an error."); + RRDD_LOG (LOG_INFO, "handle_request_flush: swrite returned an error."); return (status); } @@ -807,28 +1015,83 @@ static int handle_request_update (int fd, /* {{{ */ time_t now; cache_item_t *ci; - char answer[4096]; + char answer[CMD_MAX]; + +#define RRDD_UPDATE_SEND \ + answer[sizeof (answer) - 1] = 0; \ + status = swrite (fd, answer, strlen (answer)); \ + if (status < 0) \ + { \ + status = errno; \ + RRDD_LOG (LOG_INFO, "handle_request_update: swrite returned an error."); \ + return (status); \ + } now = time (NULL); status = buffer_get_field (&buffer, &buffer_size, &file); if (status != 0) { - RRDD_LOG (LOG_INFO, "handle_request_update: Cannot get file name."); - return (-1); + strncpy (answer, "-1 Usage: UPDATE [ ...]\n", + sizeof (answer)); + RRDD_UPDATE_SEND; + return (0); } + pthread_mutex_lock(&stats_lock); + stats_updates_received++; + pthread_mutex_unlock(&stats_lock); + pthread_mutex_lock (&cache_lock); ci = g_tree_lookup (cache_tree, file); if (ci == NULL) /* {{{ */ { + struct stat statbuf; + + memset (&statbuf, 0, sizeof (statbuf)); + status = stat (file, &statbuf); + if (status != 0) + { + pthread_mutex_unlock (&cache_lock); + RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file); + + status = errno; + if (status == ENOENT) + snprintf (answer, sizeof (answer), "-1 No such file: %s\n", file); + else + snprintf (answer, sizeof (answer), "-1 stat failed with error %i.\n", + status); + RRDD_UPDATE_SEND; + return (0); + } + if (!S_ISREG (statbuf.st_mode)) + { + pthread_mutex_unlock (&cache_lock); + + snprintf (answer, sizeof (answer), "-1 Not a regular file: %s\n", file); + RRDD_UPDATE_SEND; + return (0); + } + if (access(file, R_OK|W_OK) != 0) + { + pthread_mutex_unlock (&cache_lock); + + snprintf (answer, sizeof (answer), "-1 Cannot read/write %s: %s\n", + file, rrd_strerror(errno)); + RRDD_UPDATE_SEND; + return (0); + } + ci = (cache_item_t *) malloc (sizeof (cache_item_t)); if (ci == NULL) { pthread_mutex_unlock (&cache_lock); RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed."); - return (-1); + + strncpy (answer, "-1 malloc failed.\n", sizeof (answer)); + RRDD_UPDATE_SEND; + return (0); } memset (ci, 0, sizeof (cache_item_t)); @@ -836,20 +1099,18 @@ static int handle_request_update (int fd, /* {{{ */ if (ci->file == NULL) { pthread_mutex_unlock (&cache_lock); - RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed."); free (ci); - return (-1); + RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed."); + + strncpy (answer, "-1 strdup failed.\n", sizeof (answer)); + RRDD_UPDATE_SEND; + return (0); } - ci->values = NULL; - ci->values_num = 0; - ci->last_flush_time = now; + _wipe_ci_values(ci, now); ci->flags = CI_FLAGS_IN_TREE; g_tree_insert (cache_tree, (void *) ci->file, (void *) ci); - - RRDD_LOG (LOG_DEBUG, "handle_request_update: Created new tree node %s.", - ci->file); } /* }}} */ assert (ci != NULL); @@ -895,57 +1156,62 @@ static int handle_request_update (int fd, /* {{{ */ pthread_mutex_unlock (&cache_lock); - snprintf (answer, sizeof (answer), "0 Enqueued %i value(s)\n", values_num); - answer[sizeof (answer) - 1] = 0; - - status = write (fd, answer, strlen (answer)); - if (status < 0) + if (values_num < 1) { - status = errno; - RRDD_LOG (LOG_INFO, "handle_request_update: write(2) returned an error."); - return (status); + strncpy (answer, "-1 No values updated.\n", sizeof (answer)); } - + else + { + snprintf (answer, sizeof (answer), "0 Enqueued %i value%s\n", values_num, + (values_num == 1) ? "" : "s"); + } + RRDD_UPDATE_SEND; return (0); +#undef RRDD_UPDATE_SEND } /* }}} int handle_request_update */ -static int handle_request (int fd) /* {{{ */ +/* we came across a "WROTE" entry during journal replay. + * throw away any values that we have accumulated for this file + */ +static int handle_request_wrote (int fd __attribute__((unused)), /* {{{ */ + const char *buffer, + size_t buffer_size __attribute__((unused))) { - char buffer[4096]; - size_t buffer_size; - char *buffer_ptr; - char *command; - int status; + int i; + cache_item_t *ci; + const char *file = buffer; - status = read (fd, buffer, sizeof (buffer)); - if (status == 0) - { - return (1); - } - else if (status < 0) - { - RRDD_LOG (LOG_ERR, "handle_request: read(2) failed."); - return (-1); - } - buffer_size = status; - assert (((size_t) buffer_size) <= sizeof (buffer)); + pthread_mutex_lock(&cache_lock); - if (buffer[buffer_size - 1] != '\n') + ci = g_tree_lookup(cache_tree, file); + if (ci == NULL) { - RRDD_LOG (LOG_INFO, "handle_request: malformed request."); - return (-1); + pthread_mutex_unlock(&cache_lock); + return (0); } - /* Accept Windows style line endings, too */ - if ((buffer_size > 2) && (buffer[buffer_size - 2] == '\r')) + if (ci->values) { - buffer_size--; - buffer[buffer_size - 1] = '\n'; + for (i=0; i < ci->values_num; i++) + free(ci->values[i]); + + free(ci->values); } - /* Place the normal field separator at the end to simplify - * `buffer_get_field's work. */ - buffer[buffer_size - 1] = ' '; + _wipe_ci_values(ci, time(NULL)); + + pthread_mutex_unlock(&cache_lock); + return (0); +} /* }}} int handle_request_wrote */ + +/* if fd < 0, we are in journal replay mode */ +static int handle_request (int fd, char *buffer, size_t buffer_size) /* {{{ */ +{ + char *buffer_ptr; + char *command; + int status; + + assert (buffer[buffer_size - 1] == '\0'); buffer_ptr = buffer; command = NULL; @@ -958,8 +1224,17 @@ static int handle_request (int fd) /* {{{ */ if (strcasecmp (command, "update") == 0) { + /* don't re-write updates in replay mode */ + if (fd >= 0) + journal_write(command, buffer_ptr); + return (handle_request_update (fd, buffer_ptr, buffer_size)); } + else if (strcasecmp (command, "wrote") == 0 && fd < 0) + { + /* this is only valid in replay mode */ + return (handle_request_wrote (fd, buffer_ptr, buffer_size)); + } else if (strcasecmp (command, "flush") == 0) { return (handle_request_flush (fd, buffer_ptr, buffer_size)); @@ -974,15 +1249,15 @@ static int handle_request (int fd) /* {{{ */ } else { - char result[4096]; + char result[CMD_MAX]; snprintf (result, sizeof (result), "-1 Unknown command: %s\n", command); result[sizeof (result) - 1] = 0; - status = write (fd, result, strlen (result)); + status = swrite (fd, result, strlen (result)); if (status < 0) { - RRDD_LOG (LOG_ERR, "handle_request: write(2) failed."); + RRDD_LOG (LOG_ERR, "handle_request: swrite failed."); return (-1); } } @@ -990,36 +1265,174 @@ static int handle_request (int fd) /* {{{ */ return (0); } /* }}} int handle_request */ -static void *connection_thread_main (void *args /* {{{ */ - __attribute__((unused))) +/* MUST NOT hold journal_lock before calling this */ +static void journal_rotate(void) /* {{{ */ +{ + FILE *old_fh = NULL; + + if (journal_cur == NULL || journal_old == NULL) + return; + + pthread_mutex_lock(&journal_lock); + + /* we rotate this way (rename before close) so that the we can release + * the journal lock as fast as possible. Journal writes to the new + * journal can proceed immediately after the new file is opened. The + * fclose can then block without affecting new updates. + */ + if (journal_fh != NULL) + { + old_fh = journal_fh; + rename(journal_cur, journal_old); + ++stats_journal_rotate; + } + + journal_fh = fopen(journal_cur, "a"); + pthread_mutex_unlock(&journal_lock); + + if (old_fh != NULL) + fclose(old_fh); + + if (journal_fh == NULL) + RRDD_LOG(LOG_CRIT, + "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)", + journal_cur, rrd_strerror(errno)); + +} /* }}} static void journal_rotate */ + +static void journal_done(void) /* {{{ */ +{ + if (journal_cur == NULL) + return; + + pthread_mutex_lock(&journal_lock); + if (journal_fh != NULL) + { + fclose(journal_fh); + journal_fh = NULL; + } + + RRDD_LOG(LOG_INFO, "removing journals"); + + unlink(journal_old); + unlink(journal_cur); + pthread_mutex_unlock(&journal_lock); + +} /* }}} static void journal_done */ + +static int journal_write(char *cmd, char *args) /* {{{ */ +{ + int chars; + + if (journal_fh == NULL) + return 0; + + pthread_mutex_lock(&journal_lock); + chars = fprintf(journal_fh, "%s %s\n", cmd, args); + pthread_mutex_unlock(&journal_lock); + + if (chars > 0) + { + pthread_mutex_lock(&stats_lock); + stats_journal_bytes += chars; + pthread_mutex_unlock(&stats_lock); + } + + return chars; +} /* }}} static int journal_write */ + +static int journal_replay (const char *file) /* {{{ */ +{ + FILE *fh; + int entry_cnt = 0; + int fail_cnt = 0; + uint64_t line = 0; + char entry[CMD_MAX]; + + if (file == NULL) return 0; + + fh = fopen(file, "r"); + if (fh == NULL) + { + if (errno != ENOENT) + RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)", + file, rrd_strerror(errno)); + return 0; + } + else + RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file); + + while(!feof(fh)) + { + size_t entry_len; + + ++line; + fgets(entry, sizeof(entry), fh); + entry_len = strlen(entry); + + /* check \n termination in case journal writing crashed mid-line */ + if (entry_len == 0) + continue; + else if (entry[entry_len - 1] != '\n') + { + RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line); + ++fail_cnt; + continue; + } + + entry[entry_len - 1] = '\0'; + + if (handle_request(-1, entry, entry_len) == 0) + ++entry_cnt; + else + ++fail_cnt; + } + + fclose(fh); + + if (entry_cnt > 0) + { + RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)", + entry_cnt, fail_cnt); + return 1; + } + else + return 0; + +} /* }}} static int journal_replay */ + +static void *connection_thread_main (void *args) /* {{{ */ { pthread_t self; int i; int fd; fd = *((int *) args); + free (args); - pthread_mutex_lock (&connetion_threads_lock); + pthread_mutex_lock (&connection_threads_lock); { pthread_t *temp; - temp = (pthread_t *) realloc (connetion_threads, - sizeof (pthread_t) * (connetion_threads_num + 1)); + temp = (pthread_t *) realloc (connection_threads, + sizeof (pthread_t) * (connection_threads_num + 1)); if (temp == NULL) { RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed."); } else { - connetion_threads = temp; - connetion_threads[connetion_threads_num] = pthread_self (); - connetion_threads_num++; + connection_threads = temp; + connection_threads[connection_threads_num] = pthread_self (); + connection_threads_num++; } } - pthread_mutex_unlock (&connetion_threads_lock); + pthread_mutex_unlock (&connection_threads_lock); while (do_shutdown == 0) { + char buffer[CMD_MAX]; + struct pollfd pollfd; int status; @@ -1053,7 +1466,18 @@ static void *connection_thread_main (void *args /* {{{ */ break; } - status = handle_request (fd); + status = (int) sread (fd, buffer, sizeof (buffer)); + if (status <= 0) + { + close (fd); + + if (status < 0) + RRDD_LOG(LOG_ERR, "connection_thread_main: sread failed."); + + break; + } + + status = handle_request (fd, buffer, /*buffer_size=*/ status); if (status != 0) { close (fd); @@ -1063,25 +1487,24 @@ static void *connection_thread_main (void *args /* {{{ */ self = pthread_self (); /* Remove this thread from the connection threads list */ - pthread_mutex_lock (&connetion_threads_lock); + pthread_mutex_lock (&connection_threads_lock); /* Find out own index in the array */ - for (i = 0; i < connetion_threads_num; i++) - if (pthread_equal (connetion_threads[i], self) != 0) + for (i = 0; i < connection_threads_num; i++) + if (pthread_equal (connection_threads[i], self) != 0) break; - assert (i < connetion_threads_num); + assert (i < connection_threads_num); /* Move the trailing threads forward. */ - if (i < (connetion_threads_num - 1)) + if (i < (connection_threads_num - 1)) { - memmove (connetion_threads + i, - connetion_threads + i + 1, - sizeof (pthread_t) * (connetion_threads_num - i - 1)); + memmove (connection_threads + i, + connection_threads + i + 1, + sizeof (pthread_t) * (connection_threads_num - i - 1)); } - connetion_threads_num--; - pthread_mutex_unlock (&connetion_threads_lock); + connection_threads_num--; + pthread_mutex_unlock (&connection_threads_lock); - free (args); return (NULL); } /* }}} void *connection_thread_main */ @@ -1244,11 +1667,7 @@ static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */ int i; for (i = 0; i < config_listen_address_list_len; i++) - { - RRDD_LOG (LOG_DEBUG, "listen_thread_main: config_listen_address_list[%i] " - "= %s", i, config_listen_address_list[i]); open_listen_socket (config_listen_address_list[i]); - } if (config_listen_address_list_len < 1) open_listen_socket (RRDCACHED_DEFAULT_ADDRESS); @@ -1269,6 +1688,8 @@ static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */ } memset (pollfds, 0, sizeof (*pollfds) * pollfds_num); + RRDD_LOG(LOG_INFO, "listening for connections"); + while (do_shutdown == 0) { assert (pollfds_num == ((int) listen_fds_num)); @@ -1296,6 +1717,7 @@ static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */ struct sockaddr_storage client_sa; socklen_t client_sa_size; pthread_t tid; + pthread_attr_t attr; if (pollfds[i].revents == 0) continue; @@ -1324,7 +1746,10 @@ static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */ continue; } - status = pthread_create (&tid, /* attr = */ NULL, connection_thread_main, + pthread_attr_init (&attr); + pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED); + + status = pthread_create (&tid, &attr, connection_thread_main, /* args = */ (void *) client_sd); if (status != 0) { @@ -1336,31 +1761,29 @@ static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */ } /* for (pollfds_num) */ } /* while (do_shutdown == 0) */ + RRDD_LOG(LOG_INFO, "starting shutdown"); + close_listen_sockets (); - pthread_mutex_lock (&connetion_threads_lock); - while (connetion_threads_num > 0) + pthread_mutex_lock (&connection_threads_lock); + while (connection_threads_num > 0) { pthread_t wait_for; - wait_for = connetion_threads[0]; + wait_for = connection_threads[0]; - pthread_mutex_unlock (&connetion_threads_lock); + pthread_mutex_unlock (&connection_threads_lock); pthread_join (wait_for, /* retval = */ NULL); - pthread_mutex_lock (&connetion_threads_lock); + pthread_mutex_lock (&connection_threads_lock); } - pthread_mutex_unlock (&connetion_threads_lock); - - RRDD_LOG (LOG_DEBUG, "listen_thread_main: Exiting."); + pthread_mutex_unlock (&connection_threads_lock); return (NULL); } /* }}} void *listen_thread_main */ static int daemonize (void) /* {{{ */ { - pid_t child; int status; - char *base_dir; /* These structures are static, because `sigaction' behaves weird if the are * overwritten.. */ @@ -1368,39 +1791,45 @@ static int daemonize (void) /* {{{ */ static struct sigaction sa_term; static struct sigaction sa_pipe; - child = fork (); - if (child < 0) + if (!stay_foreground) { - fprintf (stderr, "daemonize: fork(2) failed.\n"); - return (-1); - } - else if (child > 0) - { - return (1); - } + pid_t child; + char *base_dir; - /* Change into the /tmp directory. */ - base_dir = (config_base_dir != NULL) - ? config_base_dir - : "/tmp"; - status = chdir (base_dir); - if (status != 0) - { - fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir); - return (-1); - } + child = fork (); + if (child < 0) + { + fprintf (stderr, "daemonize: fork(2) failed.\n"); + return (-1); + } + else if (child > 0) + { + return (1); + } - /* Become session leader */ - setsid (); + /* Change into the /tmp directory. */ + base_dir = (config_base_dir != NULL) + ? config_base_dir + : "/tmp"; + status = chdir (base_dir); + if (status != 0) + { + fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir); + return (-1); + } + + /* Become session leader */ + setsid (); - /* Open the first three file descriptors to /dev/null */ - close (2); - close (1); - close (0); + /* Open the first three file descriptors to /dev/null */ + close (2); + close (1); + close (0); - open ("/dev/null", O_RDWR); - dup (0); - dup (0); + open ("/dev/null", O_RDWR); + dup (0); + dup (0); + } /* if (!stay_foreground) */ /* Install signal handlers */ memset (&sa_int, 0, sizeof (sa_int)); @@ -1409,13 +1838,14 @@ static int daemonize (void) /* {{{ */ memset (&sa_term, 0, sizeof (sa_term)); sa_term.sa_handler = sig_term_handler; - sigaction (SIGINT, &sa_term, NULL); + sigaction (SIGTERM, &sa_term, NULL); memset (&sa_pipe, 0, sizeof (sa_pipe)); sa_pipe.sa_handler = SIG_IGN; sigaction (SIGPIPE, &sa_pipe, NULL); openlog ("rrdcached", LOG_PID, LOG_DAEMON); + RRDD_LOG(LOG_INFO, "starting up"); cache_tree = g_tree_new ((GCompareFunc) strcmp); if (cache_tree == NULL) @@ -1424,33 +1854,20 @@ static int daemonize (void) /* {{{ */ return (-1); } - memset (&queue_thread, 0, sizeof (queue_thread)); - status = pthread_create (&queue_thread, /* attr = */ NULL, - queue_thread_main, /* args = */ NULL); - if (status != 0) - { - RRDD_LOG (LOG_ERR, "daemonize: pthread_create failed."); - return (-1); - } - - write_pidfile (); - - return (0); + status = write_pidfile (); + return status; } /* }}} int daemonize */ static int cleanup (void) /* {{{ */ { - RRDD_LOG (LOG_DEBUG, "cleanup ()"); - do_shutdown++; - RRDD_LOG (LOG_DEBUG, "cleanup: Joining queue_thread.."); pthread_cond_signal (&cache_cond); pthread_join (queue_thread, /* return = */ NULL); - RRDD_LOG (LOG_DEBUG, "cleanup: done"); remove_pidfile (); + RRDD_LOG(LOG_INFO, "goodbye"); closelog (); return (0); @@ -1461,10 +1878,14 @@ static int read_options (int argc, char **argv) /* {{{ */ int option; int status = 0; - while ((option = getopt(argc, argv, "l:f:w:b:p:h?")) != -1) + while ((option = getopt(argc, argv, "gl:f:w:b:z:p:j:h?")) != -1) { switch (option) { + case 'g': + stay_foreground=1; + break; + case 'l': { char **temp; @@ -1518,6 +1939,22 @@ static int read_options (int argc, char **argv) /* {{{ */ } break; + case 'z': + { + int temp; + + temp = atoi(optarg); + if (temp > 0) + config_write_jitter = temp; + else + { + fprintf (stderr, "Invalid write jitter: -z %s\n", optarg); + status = 2; + } + + break; + } + case 'b': { size_t len; @@ -1559,6 +1996,41 @@ static int read_options (int argc, char **argv) /* {{{ */ } break; + case 'j': + { + struct stat statbuf; + const char *dir = optarg; + + status = stat(dir, &statbuf); + if (status != 0) + { + fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno)); + return 6; + } + + if (!S_ISDIR(statbuf.st_mode) + || access(dir, R_OK|W_OK|X_OK) != 0) + { + fprintf(stderr, "Must specify a writable directory with -j! (%s)\n", + errno ? rrd_strerror(errno) : ""); + return 6; + } + + journal_cur = malloc(PATH_MAX + 1); + journal_old = malloc(PATH_MAX + 1); + if (journal_cur == NULL || journal_old == NULL) + { + fprintf(stderr, "malloc failure for journal files\n"); + return 6; + } + else + { + snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir); + snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir); + } + } + break; + case 'h': case '?': printf ("RRDd %s Copyright (C) 2008 Florian octo Forster\n" @@ -1568,6 +2040,7 @@ static int read_options (int argc, char **argv) /* {{{ */ "Valid options are:\n" " -l
Socket address to listen to.\n" " -w Interval in which to write data.\n" + " -z Delay writes up to seconds to spread load" \ " -f Interval in which to flush dead data.\n" " -p Location of the PID-file.\n" " -b Base directory to change to.\n" @@ -1581,6 +2054,14 @@ static int read_options (int argc, char **argv) /* {{{ */ } /* switch (option) */ } /* while (getopt) */ + /* advise the user when values are not sane */ + if (config_flush_interval < 2 * config_write_interval) + fprintf(stderr, "WARNING: flush interval (-f) should be at least" + " 2x write interval (-w) !\n"); + if (config_write_jitter > config_write_interval) + fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than" + " write interval (-w) !\n"); + return (status); } /* }}} int read_options */ @@ -1613,8 +2094,40 @@ int main (int argc, char **argv) return (1); } - listen_thread_main (NULL); + if (journal_cur != NULL) + { + int had_journal = 0; + + pthread_mutex_lock(&journal_lock); + + RRDD_LOG(LOG_INFO, "checking for journal files"); + + had_journal += journal_replay(journal_old); + had_journal += journal_replay(journal_cur); + + if (had_journal) + flush_old_values(-1); + pthread_mutex_unlock(&journal_lock); + journal_rotate(); + + RRDD_LOG(LOG_INFO, "journal processing complete"); + } + + /* start the queue thread */ + memset (&queue_thread, 0, sizeof (queue_thread)); + status = pthread_create (&queue_thread, + NULL, /* attr */ + queue_thread_main, + NULL); /* args */ + if (status != 0) + { + RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread"); + cleanup(); + return (1); + } + + listen_thread_main (NULL); cleanup (); return (0);