X-Git-Url: https://git.octo.it/?p=rrdtool.git;a=blobdiff_plain;f=src%2Frrd_daemon.c;h=2a35ec2abe59cfc2f87a316a0e097c051879a98d;hp=a0e6bdbf9f629d62e8408548cffd39edc9843154;hb=ff4657f155a7acb054aaa702a7c2f610991bd0e9;hpb=fe50f514d5602b06ff585ad3aa168d609f6a1fcc diff --git a/src/rrd_daemon.c b/src/rrd_daemon.c index a0e6bdb..2a35ec2 100644 --- a/src/rrd_daemon.c +++ b/src/rrd_daemon.c @@ -62,23 +62,37 @@ /* * Now for some includes.. */ -#include "rrd.h" /* {{{ */ +/* {{{ */ +#if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H) +#include "../win32/config.h" +#else +#ifdef HAVE_CONFIG_H +#include "../rrd_config.h" +#endif +#endif + +#include "rrd.h" #include "rrd_client.h" #include + +#ifndef WIN32 #include -#include #include -#include #include -#include #include +# include + +#else + +#endif +#include +#include #include #include #include #include -#include #include #include #include @@ -107,27 +121,68 @@ typedef enum PRIV_HIGH } socket_privilege; +typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code; + struct listen_socket_s { int fd; char addr[PATH_MAX + 1]; int family; socket_privilege privilege; + + /* state for BATCH processing */ + time_t batch_start; + int batch_cmd; + + /* buffered IO */ + char *rbuf; + off_t next_cmd; + off_t next_read; + + char *wbuf; + ssize_t wbuf_len; }; typedef struct listen_socket_s listen_socket_t; +struct command; +/* note: guard against "unused" warnings in the handlers */ +#define DISPATCH_PROTO listen_socket_t *sock __attribute__((unused)),\ + time_t now __attribute__((unused)),\ + char *buffer __attribute__((unused)),\ + size_t buffer_size __attribute__((unused)) + +#define HANDLER_PROTO struct command *cmd __attribute__((unused)),\ + DISPATCH_PROTO + +struct command { + char *cmd; + int (*handler)(HANDLER_PROTO); + socket_privilege min_priv; + + char context; /* where we expect to see it */ +#define CMD_CONTEXT_CLIENT (1<<0) +#define CMD_CONTEXT_BATCH (1<<1) +#define CMD_CONTEXT_JOURNAL (1<<2) +#define CMD_CONTEXT_ANY (0x7f) + + char *syntax; + char *help; +}; + struct cache_item_s; typedef struct cache_item_s cache_item_t; struct cache_item_s { char *file; char **values; - int values_num; + size_t values_num; time_t last_flush_time; + time_t last_update_stamp; #define CI_FLAGS_IN_TREE (1<<0) #define CI_FLAGS_IN_QUEUE (1<<1) int flags; pthread_cond_t flushed; + cache_item_t *prev; cache_item_t *next; }; @@ -149,21 +204,32 @@ typedef enum queue_side_e queue_side_t; /* max length of socket command or response */ #define CMD_MAX 4096 +#define RBUF_SIZE (CMD_MAX*2) /* * Variables */ static int stay_foreground = 0; +static uid_t daemon_uid; static listen_socket_t *listen_fds = NULL; static size_t listen_fds_num = 0; -static int do_shutdown = 0; +enum { + RUNNING, /* normal operation */ + FLUSHING, /* flushing remaining values */ + SHUTDOWN /* shutting down */ +} state = RUNNING; -static pthread_t queue_thread; +static pthread_t *queue_threads; +static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER; +static int config_queue_threads = 4; + +static pthread_t flush_thread; +static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER; -static pthread_t *connection_threads = NULL; static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t connection_threads_done = PTHREAD_COND_INITIALIZER; static int connection_threads_num = 0; /* Cache stuff */ @@ -171,7 +237,6 @@ static GTree *cache_tree = NULL; static cache_item_t *cache_queue_head = NULL; static cache_item_t *cache_queue_tail = NULL; static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER; -static pthread_cond_t cache_cond = PTHREAD_COND_INITIALIZER; static int config_write_interval = 300; static int config_write_jitter = 0; @@ -179,9 +244,11 @@ static int config_flush_interval = 3600; static int config_flush_at_shutdown = 0; static char *config_pid_file = NULL; static char *config_base_dir = NULL; +static size_t _config_base_dir_len = 0; +static int config_write_base_only = 0; static listen_socket_t **config_listen_address_list = NULL; -static int config_listen_address_list_len = 0; +static size_t config_listen_address_list_len = 0; static uint64_t stats_queue_length = 0; static uint64_t stats_updates_received = 0; @@ -201,14 +268,18 @@ static int journal_write(char *cmd, char *args); static void journal_done(void); static void journal_rotate(void); +/* prototypes for forward refernces */ +static int handle_request_help (HANDLER_PROTO); + /* * Functions */ static void sig_common (const char *sig) /* {{{ */ { RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig); - do_shutdown++; - pthread_cond_broadcast(&cache_cond); + state = FLUSHING; + pthread_cond_broadcast(&flush_cond); + pthread_cond_broadcast(&queue_cond); } /* }}} void sig_common */ static void sig_int_handler (int s __attribute__((unused))) /* {{{ */ @@ -266,7 +337,7 @@ static void install_signal_handlers(void) /* {{{ */ } /* }}} void install_signal_handlers */ -static int open_pidfile(void) /* {{{ */ +static int open_pidfile(char *action, int oflag) /* {{{ */ { int fd; char *file; @@ -275,14 +346,58 @@ static int open_pidfile(void) /* {{{ */ ? config_pid_file : LOCALSTATEDIR "/run/rrdcached.pid"; - fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH); + fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH); if (fd < 0) - fprintf(stderr, "FATAL: cannot create '%s' (%s)\n", - file, rrd_strerror(errno)); + fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n", + action, file, rrd_strerror(errno)); return(fd); } /* }}} static int open_pidfile */ +/* check existing pid file to see whether a daemon is running */ +static int check_pidfile(void) +{ + int pid_fd; + pid_t pid; + char pid_str[16]; + + pid_fd = open_pidfile("open", O_RDWR); + if (pid_fd < 0) + return pid_fd; + + if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0) + return -1; + + pid = atoi(pid_str); + if (pid <= 0) + return -1; + + /* another running process that we can signal COULD be + * a competing rrdcached */ + if (pid != getpid() && kill(pid, 0) == 0) + { + fprintf(stderr, + "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid); + close(pid_fd); + return -1; + } + + lseek(pid_fd, 0, SEEK_SET); + if (ftruncate(pid_fd, 0) == -1) + { + fprintf(stderr, + "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid); + close(pid_fd); + return -1; + } + + fprintf(stderr, + "rrdcached: removed stale PID file (no rrdcached on pid %d)\n" + "rrdcached: starting normally.\n", pid); + + return pid_fd; +} /* }}} static int check_pidfile */ + static int write_pidfile (int fd) /* {{{ */ { pid_t pid; @@ -319,99 +434,239 @@ static int remove_pidfile (void) /* {{{ */ return (errno); } /* }}} int remove_pidfile */ -static ssize_t sread (int fd, void *buffer_void, size_t buffer_size) /* {{{ */ +static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */ { - char *buffer; - size_t buffer_used; - size_t buffer_free; - ssize_t status; + char *eol; - buffer = (char *) buffer_void; - buffer_used = 0; - buffer_free = buffer_size; + eol = memchr(sock->rbuf + sock->next_cmd, '\n', + sock->next_read - sock->next_cmd); - while (buffer_free > 0) + if (eol == NULL) { - status = read (fd, buffer + buffer_used, buffer_free); - if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR))) - continue; - - if (status < 0) - return (-1); + /* no commands left, move remainder back to front of rbuf */ + memmove(sock->rbuf, sock->rbuf + sock->next_cmd, + sock->next_read - sock->next_cmd); + sock->next_read -= sock->next_cmd; + sock->next_cmd = 0; + *len = 0; + return NULL; + } + else + { + char *cmd = sock->rbuf + sock->next_cmd; + *eol = '\0'; - if (status == 0) - return (0); + sock->next_cmd = eol - sock->rbuf + 1; - assert ((0 > status) || (buffer_free >= (size_t) status)); + if (eol > sock->rbuf && *(eol-1) == '\r') + *(--eol) = '\0'; /* handle "\r\n" EOL */ - buffer_free = buffer_free - status; - buffer_used = buffer_used + status; + *len = eol - cmd; - if (buffer[buffer_used - 1] == '\n') - break; + return cmd; } - assert (buffer_used > 0); + /* NOTREACHED */ + assert(1==0); +} - if (buffer[buffer_used - 1] != '\n') +/* add the characters directly to the write buffer */ +static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */ +{ + char *new_buf; + + assert(sock != NULL); + + new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1); + if (new_buf == NULL) { - errno = ENOBUFS; - return (-1); + RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed"); + return -1; } - buffer[buffer_used - 1] = 0; + strncpy(new_buf + sock->wbuf_len, str, len + 1); + + sock->wbuf = new_buf; + sock->wbuf_len += len; - /* Fix network line endings. */ - if ((buffer_used > 1) && (buffer[buffer_used - 2] == '\r')) + return 0; +} /* }}} static int add_to_wbuf */ + +/* add the text to the "extra" info that's sent after the status line */ +static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */ +{ + va_list argp; + char buffer[CMD_MAX]; + int len; + + if (sock == NULL) return 0; /* journal replay mode */ + if (sock->batch_start) return 0; /* no extra info returned when in BATCH */ + + va_start(argp, fmt); +#ifdef HAVE_VSNPRINTF + len = vsnprintf(buffer, sizeof(buffer), fmt, argp); +#else + len = vsprintf(buffer, fmt, argp); +#endif + va_end(argp); + if (len < 0) { - buffer_used--; - buffer[buffer_used - 1] = 0; + RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed"); + return -1; } - return (buffer_used); -} /* }}} ssize_t sread */ + return add_to_wbuf(sock, buffer, len); +} /* }}} static int add_response_info */ -static ssize_t swrite (int fd, const void *buf, size_t count) /* {{{ */ +static int count_lines(char *str) /* {{{ */ { - const char *ptr; - size_t nleft; - ssize_t status; + int lines = 0; + + if (str != NULL) + { + while ((str = strchr(str, '\n')) != NULL) + { + ++lines; + ++str; + } + } - /* special case for journal replay */ - if (fd < 0) return 0; + return lines; +} /* }}} static int count_lines */ - ptr = (const char *) buf; - nleft = count; +/* send the response back to the user. + * returns 0 on success, -1 on error + * write buffer is always zeroed after this call */ +static int send_response (listen_socket_t *sock, response_code rc, + char *fmt, ...) /* {{{ */ +{ + va_list argp; + char buffer[CMD_MAX]; + int lines; + ssize_t wrote; + int rclen, len; + + if (sock == NULL) return rc; /* journal replay mode */ - while (nleft > 0) + if (sock->batch_start) { - status = write (fd, (const void *) ptr, nleft); + if (rc == RESP_OK) + return rc; /* no response on success during BATCH */ + lines = sock->batch_cmd; + } + else if (rc == RESP_OK) + lines = count_lines(sock->wbuf); + else + lines = -1; + + rclen = sprintf(buffer, "%d ", lines); + va_start(argp, fmt); +#ifdef HAVE_VSNPRINTF + len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp); +#else + len = vsprintf(buffer+rclen, fmt, argp); +#endif + va_end(argp); + if (len < 0) + return -1; - if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR))) - continue; + len += rclen; - if (status < 0) - return (status); + /* append the result to the wbuf, don't write to the user */ + if (sock->batch_start) + return add_to_wbuf(sock, buffer, len); + + /* first write must be complete */ + if (len != write(sock->fd, buffer, len)) + { + RRDD_LOG(LOG_INFO, "send_response: could not write status message"); + return -1; + } - nleft -= status; - ptr += status; + if (sock->wbuf != NULL && rc == RESP_OK) + { + wrote = 0; + while (wrote < sock->wbuf_len) + { + ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote); + if (wb <= 0) + { + RRDD_LOG(LOG_INFO, "send_response: could not write results"); + return -1; + } + wrote += wb; + } } - return (0); -} /* }}} ssize_t swrite */ + free(sock->wbuf); sock->wbuf = NULL; + sock->wbuf_len = 0; + + return 0; +} /* }}} */ -static void _wipe_ci_values(cache_item_t *ci, time_t when) +static void wipe_ci_values(cache_item_t *ci, time_t when) { ci->values = NULL; ci->values_num = 0; ci->last_flush_time = when; if (config_write_jitter > 0) - ci->last_flush_time += (random() % config_write_jitter); - - ci->flags &= ~(CI_FLAGS_IN_QUEUE); + ci->last_flush_time += (rrd_random() % config_write_jitter); } +/* remove_from_queue + * remove a "cache_item_t" item from the queue. + * must hold 'cache_lock' when calling this + */ +static void remove_from_queue(cache_item_t *ci) /* {{{ */ +{ + if (ci == NULL) return; + if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */ + + if (ci->prev == NULL) + cache_queue_head = ci->next; /* reset head */ + else + ci->prev->next = ci->next; + + if (ci->next == NULL) + cache_queue_tail = ci->prev; /* reset the tail */ + else + ci->next->prev = ci->prev; + + ci->next = ci->prev = NULL; + ci->flags &= ~CI_FLAGS_IN_QUEUE; + + pthread_mutex_lock (&stats_lock); + assert (stats_queue_length > 0); + stats_queue_length--; + pthread_mutex_unlock (&stats_lock); + +} /* }}} static void remove_from_queue */ + +/* free the resources associated with the cache_item_t + * must hold cache_lock when calling this function + */ +static void *free_cache_item(cache_item_t *ci) /* {{{ */ +{ + if (ci == NULL) return NULL; + + remove_from_queue(ci); + + for (size_t i=0; i < ci->values_num; i++) + free(ci->values[i]); + + free (ci->values); + free (ci->file); + + /* in case anyone is waiting */ + pthread_cond_broadcast(&ci->flushed); + pthread_cond_destroy(&ci->flushed); + + free (ci); + + return NULL; +} /* }}} static void *free_cache_item */ + /* * enqueue_cache_item: * `cache_lock' must be acquired before calling this function! @@ -419,8 +674,6 @@ static void _wipe_ci_values(cache_item_t *ci, time_t when) static int enqueue_cache_item (cache_item_t *ci, /* {{{ */ queue_side_t side) { - int did_insert = 0; - if (ci == NULL) return (-1); @@ -429,74 +682,53 @@ static int enqueue_cache_item (cache_item_t *ci, /* {{{ */ if (side == HEAD) { - if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) - { - assert (ci->next == NULL); - ci->next = cache_queue_head; - cache_queue_head = ci; - - if (cache_queue_tail == NULL) - cache_queue_tail = cache_queue_head; + if (cache_queue_head == ci) + return 0; - did_insert = 1; - } - else if (cache_queue_head == ci) - { - /* do nothing */ - } - else /* enqueued, but not first entry */ - { - cache_item_t *prev; + /* remove if further down in queue */ + remove_from_queue(ci); - /* find previous entry */ - for (prev = cache_queue_head; prev != NULL; prev = prev->next) - if (prev->next == ci) - break; - assert (prev != NULL); + ci->prev = NULL; + ci->next = cache_queue_head; + if (ci->next != NULL) + ci->next->prev = ci; + cache_queue_head = ci; - /* move to the front */ - prev->next = ci->next; - ci->next = cache_queue_head; - cache_queue_head = ci; - - /* check if we need to adapt the tail */ - if (cache_queue_tail == ci) - cache_queue_tail = prev; - } + if (cache_queue_tail == NULL) + cache_queue_tail = cache_queue_head; } else /* (side == TAIL) */ { /* We don't move values back in the list.. */ - if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0) + if (ci->flags & CI_FLAGS_IN_QUEUE) return (0); assert (ci->next == NULL); + assert (ci->prev == NULL); + + ci->prev = cache_queue_tail; if (cache_queue_tail == NULL) cache_queue_head = ci; else cache_queue_tail->next = ci; - cache_queue_tail = ci; - did_insert = 1; + cache_queue_tail = ci; } ci->flags |= CI_FLAGS_IN_QUEUE; - if (did_insert) - { - pthread_cond_broadcast(&cache_cond); - pthread_mutex_lock (&stats_lock); - stats_queue_length++; - pthread_mutex_unlock (&stats_lock); - } + pthread_cond_signal(&queue_cond); + pthread_mutex_lock (&stats_lock); + stats_queue_length++; + pthread_mutex_unlock (&stats_lock); return (0); } /* }}} int enqueue_cache_item */ /* * tree_callback_flush: - * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held + * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held * while this is in progress. */ static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */ @@ -508,36 +740,23 @@ static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */ ci = (cache_item_t *) value; cfd = (callback_flush_data_t *) data; - if ((ci->last_flush_time <= cfd->abs_timeout) - && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) - && (ci->values_num > 0)) - { - enqueue_cache_item (ci, TAIL); - } - else if ((do_shutdown != 0) - && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) - && (ci->values_num > 0)) + if (ci->flags & CI_FLAGS_IN_QUEUE) + return FALSE; + + if (ci->values_num > 0 + && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING)) { enqueue_cache_item (ci, TAIL); } else if (((cfd->now - ci->last_flush_time) >= config_flush_interval) - && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) && (ci->values_num <= 0)) { - char **temp; - - temp = (char **) realloc (cfd->keys, - sizeof (char *) * (cfd->keys_num + 1)); - if (temp == NULL) + assert ((char *) key == ci->file); + if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key)) { - RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed."); + RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed."); return (FALSE); } - cfd->keys = temp; - /* Make really sure this points to the _same_ place */ - assert ((char *) key == ci->file); - cfd->keys[cfd->keys_num] = (char *) key; - cfd->keys_num++; } return (FALSE); @@ -568,26 +787,10 @@ static int flush_old_values (int max_age) for (k = 0; k < cfd.keys_num; k++) { - cache_item_t *ci; - - /* This must not fail. */ - ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]); - assert (ci != NULL); - - /* If we end up here with values available, something's seriously - * messed up. */ - assert (ci->values_num == 0); - - /* Remove the node from the tree */ - g_tree_remove (cache_tree, cfd.keys[k]); - cfd.keys[k] = NULL; - - /* Now free and clean up `ci'. */ - free (ci->file); - ci->file = NULL; - free (ci); - ci = NULL; - } /* for (k = 0; k < cfd.keys_num; k++) */ + /* should never fail, since we have held the cache_lock + * the entire time */ + assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE ); + } if (cfd.keys != NULL) { @@ -598,40 +801,34 @@ static int flush_old_values (int max_age) return (0); } /* int flush_old_values */ -static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ +static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */ { struct timeval now; struct timespec next_flush; - int final_flush = 0; /* make sure we only flush once on shutdown */ + int status; gettimeofday (&now, NULL); next_flush.tv_sec = now.tv_sec + config_flush_interval; next_flush.tv_nsec = 1000 * now.tv_usec; - pthread_mutex_lock (&cache_lock); - while ((do_shutdown == 0) || (cache_queue_head != NULL)) - { - cache_item_t *ci; - char *file; - char **values; - int values_num; - int status; - int i; + pthread_mutex_lock(&cache_lock); - /* First, check if it's time to do the cache flush. */ + while (state == RUNNING) + { gettimeofday (&now, NULL); if ((now.tv_sec > next_flush.tv_sec) || ((now.tv_sec == next_flush.tv_sec) && ((1000 * now.tv_usec) > next_flush.tv_nsec))) { + RRDD_LOG(LOG_DEBUG, "flushing old values"); + + /* Determine the time of the next cache flush. */ + next_flush.tv_sec = now.tv_sec + config_flush_interval; + /* Flush all values that haven't been written in the last * `config_write_interval' seconds. */ flush_old_values (config_write_interval); - /* Determine the time of the next cache flush. */ - while (next_flush.tv_sec <= now.tv_sec) - next_flush.tv_sec += config_flush_interval; - /* unlock the cache while we rotate so we don't block incoming * updates if the fsync() blocks on disk I/O */ pthread_mutex_unlock(&cache_lock); @@ -639,28 +836,49 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ pthread_mutex_lock(&cache_lock); } + status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush); + if (status != 0 && status != ETIMEDOUT) + { + RRDD_LOG (LOG_ERR, "flush_thread_main: " + "pthread_cond_timedwait returned %i.", status); + } + } + + if (config_flush_at_shutdown) + flush_old_values (-1); /* flush everything */ + + state = SHUTDOWN; + + pthread_mutex_unlock(&cache_lock); + + return NULL; +} /* void *flush_thread_main */ + +static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ +{ + pthread_mutex_lock (&cache_lock); + + while (state != SHUTDOWN + || (cache_queue_head != NULL && config_flush_at_shutdown)) + { + cache_item_t *ci; + char *file; + char **values; + size_t values_num; + int status; + /* Now, check if there's something to store away. If not, wait until - * something comes in or it's time to do the cache flush. if we are - * shutting down, do not wait around. */ - if (cache_queue_head == NULL && !do_shutdown) + * something comes in. */ + if (cache_queue_head == NULL) { - status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush); + status = pthread_cond_wait (&queue_cond, &cache_lock); if ((status != 0) && (status != ETIMEDOUT)) { RRDD_LOG (LOG_ERR, "queue_thread_main: " - "pthread_cond_timedwait returned %i.", status); + "pthread_cond_wait returned %i.", status); } } - /* We're about to shut down */ - if (do_shutdown != 0 && !final_flush++) - { - if (config_flush_at_shutdown) - flush_old_values (-1); /* flush everything */ - else - break; - } - /* Check if a value has arrived. This may be NULL if we timed out or there * was an interrupt such as a signal. */ if (cache_queue_head == NULL) @@ -682,22 +900,13 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ values = ci->values; values_num = ci->values_num; - _wipe_ci_values(ci, time(NULL)); - - cache_queue_head = ci->next; - if (cache_queue_head == NULL) - cache_queue_tail = NULL; - ci->next = NULL; - - pthread_mutex_lock (&stats_lock); - assert (stats_queue_length > 0); - stats_queue_length--; - pthread_mutex_unlock (&stats_lock); + wipe_ci_values(ci, time(NULL)); + remove_from_queue(ci); pthread_mutex_unlock (&cache_lock); rrd_clear_error (); - status = rrd_update_r (file, NULL, values_num, (void *) values); + status = rrd_update_r (file, NULL, (int) values_num, (void *) values); if (status != 0) { RRDD_LOG (LOG_NOTICE, "queue_thread_main: " @@ -706,12 +915,16 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ } journal_write("wrote", file); - pthread_cond_broadcast(&ci->flushed); - for (i = 0; i < values_num; i++) - free (values[i]); + /* Search again in the tree. It's possible someone issued a "FORGET" + * while we were writing the update values. */ + pthread_mutex_lock(&cache_lock); + ci = (cache_item_t *) g_tree_lookup(cache_tree, file); + if (ci) + pthread_cond_broadcast(&ci->flushed); + pthread_mutex_unlock(&cache_lock); - free(values); + rrd_free_ptrs((void ***) &values, &values_num); free(file); if (status == 0) @@ -723,25 +936,8 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ } pthread_mutex_lock (&cache_lock); - - /* We're about to shut down */ - if (do_shutdown != 0 && !final_flush++) - { - if (config_flush_at_shutdown) - flush_old_values (-1); /* flush everything */ - else - break; - } - } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */ - pthread_mutex_unlock (&cache_lock); - - if (config_flush_at_shutdown) - { - assert(cache_queue_head == NULL); - RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed"); } - - journal_done(); + pthread_mutex_unlock (&cache_lock); return (NULL); } /* }}} void *queue_thread_main */ @@ -809,6 +1005,72 @@ static int buffer_get_field (char **buffer_ret, /* {{{ */ return (0); } /* }}} int buffer_get_field */ +/* if we're restricting writes to the base directory, + * check whether the file falls within the dir + * returns 1 if OK, otherwise 0 + */ +static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */ +{ + assert(file != NULL); + + if (!config_write_base_only + || sock == NULL /* journal replay */ + || config_base_dir == NULL) + return 1; + + if (strstr(file, "../") != NULL) goto err; + + /* relative paths without "../" are ok */ + if (*file != '/') return 1; + + /* file must be of the format base + "/" + <1+ char filename> */ + if (strlen(file) < _config_base_dir_len + 2) goto err; + if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err; + if (*(file + _config_base_dir_len) != '/') goto err; + + return 1; + +err: + if (sock != NULL && sock->fd >= 0) + send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES)); + + return 0; +} /* }}} static int check_file_access */ + +/* when using a base dir, convert relative paths to absolute paths. + * if necessary, modifies the "filename" pointer to point + * to the new path created in "tmp". "tmp" is provided + * by the caller and sizeof(tmp) must be >= PATH_MAX. + * + * this allows us to optimize for the expected case (absolute path) + * with a no-op. + */ +static void get_abs_path(char **filename, char *tmp) +{ + assert(tmp != NULL); + assert(filename != NULL && *filename != NULL); + + if (config_base_dir == NULL || **filename == '/') + return; + + snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename); + *filename = tmp; +} /* }}} static int get_abs_path */ + +/* returns 1 if we have the required privilege level, + * otherwise issue an error to the user on sock */ +static int has_privilege (listen_socket_t *sock, /* {{{ */ + socket_privilege priv) +{ + if (sock == NULL) /* journal replay */ + return 1; + + if (sock->privilege >= priv) + return 1; + + return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES)); +} /* }}} static int has_privilege */ + static int flush_file (const char *filename) /* {{{ */ { cache_item_t *ci; @@ -829,131 +1091,26 @@ static int flush_file (const char *filename) /* {{{ */ pthread_cond_wait(&ci->flushed, &cache_lock); } + /* DO NOT DO ANYTHING WITH ci HERE!! The entry + * may have been purged during our cond_wait() */ + pthread_mutex_unlock(&cache_lock); return (0); } /* }}} int flush_file */ -static int handle_request_help (int fd, /* {{{ */ - char *buffer, size_t buffer_size) +static int syntax_error(listen_socket_t *sock, struct command *cmd) /* {{{ */ { - int status; - char **help_text; - size_t help_text_len; - char *command; - size_t i; - - char *help_help[] = - { - "5 Command overview\n", - "FLUSH \n", - "FLUSHALL\n", - "HELP []\n", - "UPDATE [ ...]\n", - "STATS\n" - }; - size_t help_help_len = sizeof (help_help) / sizeof (help_help[0]); - - char *help_flush[] = - { - "4 Help for FLUSH\n", - "Usage: FLUSH \n", - "\n", - "Adds the given filename to the head of the update queue and returns\n", - "after is has been dequeued.\n" - }; - size_t help_flush_len = sizeof (help_flush) / sizeof (help_flush[0]); - - char *help_flushall[] = - { - "3 Help for FLUSHALL\n", - "Usage: FLUSHALL\n", - "\n", - "Triggers writing of all pending updates. Returns immediately.\n" - }; - size_t help_flushall_len = sizeof(help_flushall) / sizeof(help_flushall[0]); - - char *help_update[] = - { - "9 Help for UPDATE\n", - "Usage: UPDATE [ ...]\n" - "\n", - "Adds the given file to the internal cache if it is not yet known and\n", - "appends the given value(s) to the entry. See the rrdcached(1) manpage\n", - "for details.\n", - "\n", - "Each has the following form:\n", - " =