X-Git-Url: https://git.octo.it/?p=rrdtool.git;a=blobdiff_plain;f=src%2Frrd_daemon.c;h=4c3d7ed4653d30256c35909a15cc7b1c7e2fb285;hp=dcca55d755f53d52513d3e1ba1f9c3a08e5d76bd;hb=cea28dc31a16fcc1f1b8b95aee3fada2fbd2aca0;hpb=18ff159c38b7d919bafbdcc46d03c3f83c17476f diff --git a/src/rrd_daemon.c b/src/rrd_daemon.c index dcca55d..4c3d7ed 100644 --- a/src/rrd_daemon.c +++ b/src/rrd_daemon.c @@ -1,7 +1,7 @@ /** * RRDTool - src/rrd_daemon.c - * Copyright (C) 2008 Florian octo Forster - * Copyright (C) 2008 Kevin Brintnall + * Copyright (C) 2008-2010 Florian octo Forster + * Copyright (C) 2008,2009 Kevin Brintnall * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the @@ -73,15 +73,18 @@ #include "rrd.h" #include "rrd_client.h" +#include "unused.h" #include #ifndef WIN32 -#include +#ifdef HAVE_STDINT_H +# include +#endif #include #include #include -# include +#include #else @@ -91,6 +94,7 @@ #include #include +#include #include #include #include @@ -102,25 +106,22 @@ #include #include #include +#include +#include #include /* }}} */ -#define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__) - -#ifndef __GNUC__ -# define __attribute__(x) /**/ -#endif +#define RRDD_LOG(severity, ...) \ + do { \ + if (stay_foreground) \ + fprintf(stderr, __VA_ARGS__); \ + syslog ((severity), __VA_ARGS__); \ + } while (0) /* * Types */ -typedef enum -{ - PRIV_LOW, - PRIV_HIGH -} socket_privilege; - typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code; struct listen_socket_s @@ -128,7 +129,6 @@ struct listen_socket_s int fd; char addr[PATH_MAX + 1]; int family; - socket_privilege privilege; /* state for BATCH processing */ time_t batch_start; @@ -141,23 +141,28 @@ struct listen_socket_s char *wbuf; ssize_t wbuf_len; + + uint32_t permissions; + + gid_t socket_group; + mode_t socket_permissions; }; typedef struct listen_socket_s listen_socket_t; -struct command; +struct command_s; +typedef struct command_s command_t; /* note: guard against "unused" warnings in the handlers */ -#define DISPATCH_PROTO listen_socket_t *sock __attribute__((unused)),\ - time_t now __attribute__((unused)),\ - char *buffer __attribute__((unused)),\ - size_t buffer_size __attribute__((unused)) +#define DISPATCH_PROTO listen_socket_t UNUSED(*sock),\ + time_t UNUSED(now),\ + char UNUSED(*buffer),\ + size_t UNUSED(buffer_size) -#define HANDLER_PROTO struct command *cmd __attribute__((unused)),\ +#define HANDLER_PROTO command_t UNUSED(*cmd),\ DISPATCH_PROTO -struct command { +struct command_s { char *cmd; int (*handler)(HANDLER_PROTO); - socket_privilege min_priv; char context; /* where we expect to see it */ #define CMD_CONTEXT_CLIENT (1<<0) @@ -175,7 +180,8 @@ struct cache_item_s { char *file; char **values; - int values_num; + size_t values_num; /* number of valid pointers */ + size_t values_alloc; /* number of allocated pointers */ time_t last_flush_time; time_t last_update_stamp; #define CI_FLAGS_IN_TREE (1<<0) @@ -202,6 +208,12 @@ enum queue_side_e }; typedef enum queue_side_e queue_side_t; +/* describe a set of journal files */ +typedef struct { + char **files; + size_t files_num; +} journal_set; + /* max length of socket command or response */ #define CMD_MAX 4096 #define RBUF_SIZE (CMD_MAX*2) @@ -215,7 +227,11 @@ static uid_t daemon_uid; static listen_socket_t *listen_fds = NULL; static size_t listen_fds_num = 0; -static int do_shutdown = 0; +enum { + RUNNING, /* normal operation */ + FLUSHING, /* flushing remaining values */ + SHUTDOWN /* shutting down */ +} state = RUNNING; static pthread_t *queue_threads; static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER; @@ -242,9 +258,10 @@ static char *config_pid_file = NULL; static char *config_base_dir = NULL; static size_t _config_base_dir_len = 0; static int config_write_base_only = 0; +static size_t config_alloc_chunk = 1; static listen_socket_t **config_listen_address_list = NULL; -static int config_listen_address_list_len = 0; +static size_t config_listen_address_list_len = 0; static uint64_t stats_queue_length = 0; static uint64_t stats_updates_received = 0; @@ -256,9 +273,14 @@ static uint64_t stats_journal_rotate = 0; static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER; /* Journaled updates */ -static char *journal_cur = NULL; -static char *journal_old = NULL; -static FILE *journal_fh = NULL; +#define JOURNAL_REPLAY(s) ((s) == NULL) +#define JOURNAL_BASE "rrd.journal" +static journal_set *journal_cur = NULL; +static journal_set *journal_old = NULL; +static char *journal_dir = NULL; +static FILE *journal_fh = NULL; /* current journal file handle */ +static long journal_size = 0; /* current journal size */ +#define JOURNAL_MAX (1 * 1024 * 1024 * 1024) static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER; static int journal_write(char *cmd, char *args); static void journal_done(void); @@ -273,28 +295,28 @@ static int handle_request_help (HANDLER_PROTO); static void sig_common (const char *sig) /* {{{ */ { RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig); - do_shutdown++; + state = FLUSHING; pthread_cond_broadcast(&flush_cond); pthread_cond_broadcast(&queue_cond); } /* }}} void sig_common */ -static void sig_int_handler (int s __attribute__((unused))) /* {{{ */ +static void sig_int_handler (int UNUSED(s)) /* {{{ */ { sig_common("INT"); } /* }}} void sig_int_handler */ -static void sig_term_handler (int s __attribute__((unused))) /* {{{ */ +static void sig_term_handler (int UNUSED(s)) /* {{{ */ { sig_common("TERM"); } /* }}} void sig_term_handler */ -static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */ +static void sig_usr1_handler (int UNUSED(s)) /* {{{ */ { config_flush_at_shutdown = 1; sig_common("USR1"); } /* }}} void sig_usr1_handler */ -static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */ +static void sig_usr2_handler (int UNUSED(s)) /* {{{ */ { config_flush_at_shutdown = 0; sig_common("USR2"); @@ -336,12 +358,32 @@ static void install_signal_handlers(void) /* {{{ */ static int open_pidfile(char *action, int oflag) /* {{{ */ { int fd; - char *file; + const char *file; + char *file_copy, *dir; file = (config_pid_file != NULL) ? config_pid_file : LOCALSTATEDIR "/run/rrdcached.pid"; + /* dirname may modify its argument */ + file_copy = strdup(file); + if (file_copy == NULL) + { + fprintf(stderr, "rrdcached: strdup(): %s\n", + rrd_strerror(errno)); + return -1; + } + + dir = dirname(file_copy); + if (rrd_mkdir_p(dir, 0777) != 0) + { + fprintf(stderr, "Failed to create pidfile directory '%s': %s\n", + dir, rrd_strerror(errno)); + return -1; + } + + free(file_copy); + fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH); if (fd < 0) fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n", @@ -379,7 +421,13 @@ static int check_pidfile(void) } lseek(pid_fd, 0, SEEK_SET); - ftruncate(pid_fd, 0); + if (ftruncate(pid_fd, 0) == -1) + { + fprintf(stderr, + "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid); + close(pid_fd); + return -1; + } fprintf(stderr, "rrdcached: removed stale PID file (no rrdcached on pid %d)\n" @@ -458,7 +506,7 @@ static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */ /* NOTREACHED */ assert(1==0); -} +} /* }}} char *next_cmd */ /* add the characters directly to the write buffer */ static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */ @@ -489,7 +537,7 @@ static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */ char buffer[CMD_MAX]; int len; - if (sock == NULL) return 0; /* journal replay mode */ + if (JOURNAL_REPLAY(sock)) return 0; if (sock->batch_start) return 0; /* no extra info returned when in BATCH */ va_start(argp, fmt); @@ -536,7 +584,7 @@ static int send_response (listen_socket_t *sock, response_code rc, ssize_t wrote; int rclen, len; - if (sock == NULL) return rc; /* journal replay mode */ + if (JOURNAL_REPLAY(sock)) return rc; if (sock->batch_start) { @@ -598,10 +646,11 @@ static void wipe_ci_values(cache_item_t *ci, time_t when) { ci->values = NULL; ci->values_num = 0; + ci->values_alloc = 0; ci->last_flush_time = when; if (config_write_jitter > 0) - ci->last_flush_time += (random() % config_write_jitter); + ci->last_flush_time += (rrd_random() % config_write_jitter); } /* remove_from_queue @@ -642,7 +691,7 @@ static void *free_cache_item(cache_item_t *ci) /* {{{ */ remove_from_queue(ci); - for (int i=0; i < ci->values_num; i++) + for (size_t i=0; i < ci->values_num; i++) free(ci->values[i]); free (ci->values); @@ -650,6 +699,7 @@ static void *free_cache_item(cache_item_t *ci) /* {{{ */ /* in case anyone is waiting */ pthread_cond_broadcast(&ci->flushed); + pthread_cond_destroy(&ci->flushed); free (ci); @@ -732,33 +782,20 @@ static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */ if (ci->flags & CI_FLAGS_IN_QUEUE) return FALSE; - if ((ci->last_flush_time <= cfd->abs_timeout) - && (ci->values_num > 0)) - { - enqueue_cache_item (ci, TAIL); - } - else if ((do_shutdown != 0) - && (ci->values_num > 0)) + if (ci->values_num > 0 + && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING)) { enqueue_cache_item (ci, TAIL); } else if (((cfd->now - ci->last_flush_time) >= config_flush_interval) && (ci->values_num <= 0)) { - char **temp; - - temp = (char **) rrd_realloc (cfd->keys, - sizeof (char *) * (cfd->keys_num + 1)); - if (temp == NULL) + assert ((char *) key == ci->file); + if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key)) { - RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed."); + RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed."); return (FALSE); } - cfd->keys = temp; - /* Make really sure this points to the _same_ place */ - assert ((char *) key == ci->file); - cfd->keys[cfd->keys_num] = (char *) key; - cfd->keys_num++; } return (FALSE); @@ -789,9 +826,10 @@ static int flush_old_values (int max_age) for (k = 0; k < cfd.keys_num; k++) { + gboolean status = g_tree_remove(cache_tree, cfd.keys[k]); /* should never fail, since we have held the cache_lock * the entire time */ - assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE ); + assert(status == TRUE); } if (cfd.keys != NULL) @@ -803,7 +841,7 @@ static int flush_old_values (int max_age) return (0); } /* int flush_old_values */ -static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */ +static void *flush_thread_main (void UNUSED(*args)) /* {{{ */ { struct timeval now; struct timespec next_flush; @@ -815,21 +853,22 @@ static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */ pthread_mutex_lock(&cache_lock); - while (!do_shutdown) + while (state == RUNNING) { gettimeofday (&now, NULL); if ((now.tv_sec > next_flush.tv_sec) || ((now.tv_sec == next_flush.tv_sec) && ((1000 * now.tv_usec) > next_flush.tv_nsec))) { + RRDD_LOG(LOG_DEBUG, "flushing old values"); + + /* Determine the time of the next cache flush. */ + next_flush.tv_sec = now.tv_sec + config_flush_interval; + /* Flush all values that haven't been written in the last * `config_write_interval' seconds. */ flush_old_values (config_write_interval); - /* Determine the time of the next cache flush. */ - next_flush.tv_sec = - now.tv_sec + next_flush.tv_sec % config_flush_interval; - /* unlock the cache while we rotate so we don't block incoming * updates if the fsync() blocks on disk I/O */ pthread_mutex_unlock(&cache_lock); @@ -848,28 +887,29 @@ static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */ if (config_flush_at_shutdown) flush_old_values (-1); /* flush everything */ + state = SHUTDOWN; + pthread_mutex_unlock(&cache_lock); return NULL; } /* void *flush_thread_main */ -static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ +static void *queue_thread_main (void UNUSED(*args)) /* {{{ */ { pthread_mutex_lock (&cache_lock); - while (!do_shutdown + while (state != SHUTDOWN || (cache_queue_head != NULL && config_flush_at_shutdown)) { cache_item_t *ci; char *file; char **values; - int values_num; + size_t values_num; int status; - int i; /* Now, check if there's something to store away. If not, wait until - * something comes in. if we are shutting down, do not wait around. */ - if (cache_queue_head == NULL && !do_shutdown) + * something comes in. */ + if (cache_queue_head == NULL) { status = pthread_cond_wait (&queue_cond, &cache_lock); if ((status != 0) && (status != ETIMEDOUT)) @@ -906,7 +946,7 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ pthread_mutex_unlock (&cache_lock); rrd_clear_error (); - status = rrd_update_r (file, NULL, values_num, (void *) values); + status = rrd_update_r (file, NULL, (int) values_num, (void *) values); if (status != 0) { RRDD_LOG (LOG_NOTICE, "queue_thread_main: " @@ -915,13 +955,14 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ } journal_write("wrote", file); - pthread_cond_broadcast(&ci->flushed); - for (i = 0; i < values_num; i++) - free (values[i]); - - free(values); - free(file); + /* Search again in the tree. It's possible someone issued a "FORGET" + * while we were writing the update values. */ + pthread_mutex_lock(&cache_lock); + ci = (cache_item_t *) g_tree_lookup(cache_tree, file); + if (ci) + pthread_cond_broadcast(&ci->flushed); + pthread_mutex_unlock(&cache_lock); if (status == 0) { @@ -931,6 +972,9 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ pthread_mutex_unlock (&stats_lock); } + rrd_free_ptrs((void ***) &values, &values_num); + free(file); + pthread_mutex_lock (&cache_lock); } pthread_mutex_unlock (&cache_lock); @@ -1010,7 +1054,7 @@ static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */ assert(file != NULL); if (!config_write_base_only - || sock == NULL /* journal replay */ + || JOURNAL_REPLAY(sock) || config_base_dir == NULL) return 1; @@ -1053,20 +1097,6 @@ static void get_abs_path(char **filename, char *tmp) *filename = tmp; } /* }}} static int get_abs_path */ -/* returns 1 if we have the required privilege level, - * otherwise issue an error to the user on sock */ -static int has_privilege (listen_socket_t *sock, /* {{{ */ - socket_privilege priv) -{ - if (sock == NULL) /* journal replay */ - return 1; - - if (sock->privilege >= priv) - return 1; - - return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES)); -} /* }}} static int has_privilege */ - static int flush_file (const char *filename) /* {{{ */ { cache_item_t *ci; @@ -1095,7 +1125,7 @@ static int flush_file (const char *filename) /* {{{ */ return (0); } /* }}} int flush_file */ -static int syntax_error(listen_socket_t *sock, struct command *cmd) /* {{{ */ +static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */ { char *err = "Syntax error.\n"; @@ -1227,7 +1257,7 @@ static int handle_request_pending(HANDLER_PROTO) /* {{{ */ return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT)); } - for (int i=0; i < ci->values_num; i++) + for (size_t i=0; i < ci->values_num; i++) add_response_info(sock, "%s\n", ci->values[i]); pthread_mutex_unlock(&cache_lock); @@ -1253,7 +1283,7 @@ static int handle_request_forget(HANDLER_PROTO) /* {{{ */ if (found == TRUE) { - if (sock != NULL) + if (!JOURNAL_REPLAY(sock)) journal_write("forget", file); return send_response(sock, RESP_OK, "Gone!\n"); @@ -1293,7 +1323,8 @@ static int handle_request_update (HANDLER_PROTO) /* {{{ */ cache_item_t *ci; /* save it for the journal later */ - strncpy(orig_buf, buffer, sizeof(orig_buf)-1); + if (!JOURNAL_REPLAY(sock)) + strncpy(orig_buf, buffer, buffer_size); status = buffer_get_field (&buffer, &buffer_size, &file); if (status != 0) @@ -1312,6 +1343,7 @@ static int handle_request_update (HANDLER_PROTO) /* {{{ */ if (ci == NULL) /* {{{ */ { struct stat statbuf; + cache_item_t *tmp; /* don't hold the lock while we setup; stat(2) might block */ pthread_mutex_unlock(&cache_lock); @@ -1359,17 +1391,29 @@ static int handle_request_update (HANDLER_PROTO) /* {{{ */ pthread_cond_init(&ci->flushed, NULL); pthread_mutex_lock(&cache_lock); - g_tree_replace (cache_tree, (void *) ci->file, (void *) ci); + + /* another UPDATE might have added this entry in the meantime */ + tmp = g_tree_lookup (cache_tree, file); + if (tmp == NULL) + g_tree_replace (cache_tree, (void *) ci->file, (void *) ci); + else + { + free_cache_item (ci); + ci = tmp; + } + + /* state may have changed while we were unlocked */ + if (state == SHUTDOWN) + return -1; } /* }}} */ assert (ci != NULL); /* don't re-write updates in replay mode */ - if (sock != NULL) + if (!JOURNAL_REPLAY(sock)) journal_write("update", orig_buf); while (buffer_size > 0) { - char **temp; char *value; time_t stamp; char *eostamp; @@ -1400,22 +1444,12 @@ static int handle_request_update (HANDLER_PROTO) /* {{{ */ else ci->last_update_stamp = stamp; - temp = (char **) rrd_realloc (ci->values, - sizeof (char *) * (ci->values_num + 1)); - if (temp == NULL) - { - RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed."); - continue; - } - ci->values = temp; - - ci->values[ci->values_num] = strdup (value); - if (ci->values[ci->values_num] == NULL) + if (!rrd_add_strdup_chunk(&ci->values, &ci->values_num, value, + &ci->values_alloc, config_alloc_chunk)) { - RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed."); + RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed."); continue; } - ci->values_num++; values_num++; } @@ -1440,12 +1474,202 @@ static int handle_request_update (HANDLER_PROTO) /* {{{ */ } /* }}} int handle_request_update */ +static int handle_request_fetch (HANDLER_PROTO) /* {{{ */ +{ + char *file, file_tmp[PATH_MAX]; + char *cf; + + char *start_str; + char *end_str; + time_t start_tm; + time_t end_tm; + + unsigned long step; + unsigned long ds_cnt; + char **ds_namv; + rrd_value_t *data; + + int status; + unsigned long i; + time_t t; + rrd_value_t *data_ptr; + + file = NULL; + cf = NULL; + start_str = NULL; + end_str = NULL; + + /* Read the arguments */ + do /* while (0) */ + { + status = buffer_get_field (&buffer, &buffer_size, &file); + if (status != 0) + break; + + status = buffer_get_field (&buffer, &buffer_size, &cf); + if (status != 0) + break; + + status = buffer_get_field (&buffer, &buffer_size, &start_str); + if (status != 0) + { + start_str = NULL; + status = 0; + break; + } + + status = buffer_get_field (&buffer, &buffer_size, &end_str); + if (status != 0) + { + end_str = NULL; + status = 0; + break; + } + } while (0); + + if (status != 0) + return (syntax_error(sock,cmd)); + + get_abs_path(&file, file_tmp); + if (!check_file_access(file, sock)) return 0; + + status = flush_file (file); + if ((status != 0) && (status != ENOENT)) + return (send_response (sock, RESP_ERR, + "flush_file (%s) failed with status %i.\n", file, status)); + + t = time (NULL); /* "now" */ + + /* Parse start time */ + if (start_str != NULL) + { + char *endptr; + long value; + + endptr = NULL; + errno = 0; + value = strtol (start_str, &endptr, /* base = */ 0); + if ((endptr == start_str) || (errno != 0)) + return (send_response(sock, RESP_ERR, + "Cannot parse start time `%s': Only simple integers are allowed.\n", + start_str)); + + if (value > 0) + start_tm = (time_t) value; + else + start_tm = (time_t) (t + value); + } + else + { + start_tm = t - 86400; + } + + /* Parse end time */ + if (end_str != NULL) + { + char *endptr; + long value; + + endptr = NULL; + errno = 0; + value = strtol (end_str, &endptr, /* base = */ 0); + if ((endptr == end_str) || (errno != 0)) + return (send_response(sock, RESP_ERR, + "Cannot parse end time `%s': Only simple integers are allowed.\n", + end_str)); + + if (value > 0) + end_tm = (time_t) value; + else + end_tm = (time_t) (t + value); + } + else + { + end_tm = t; + } + + step = -1; + ds_cnt = 0; + ds_namv = NULL; + data = NULL; + + status = rrd_fetch_r (file, cf, &start_tm, &end_tm, &step, + &ds_cnt, &ds_namv, &data); + if (status != 0) + return (send_response(sock, RESP_ERR, + "rrd_fetch_r failed: %s\n", rrd_get_error ())); + + add_response_info (sock, "FlushVersion: %lu\n", 1); + add_response_info (sock, "Start: %lu\n", (unsigned long) start_tm); + add_response_info (sock, "End: %lu\n", (unsigned long) end_tm); + add_response_info (sock, "Step: %lu\n", step); + add_response_info (sock, "DSCount: %lu\n", ds_cnt); + +#define SSTRCAT(buffer,str,buffer_fill) do { \ + size_t str_len = strlen (str); \ + if ((buffer_fill + str_len) > sizeof (buffer)) \ + str_len = sizeof (buffer) - buffer_fill; \ + if (str_len > 0) { \ + strncpy (buffer + buffer_fill, str, str_len); \ + buffer_fill += str_len; \ + assert (buffer_fill <= sizeof (buffer)); \ + if (buffer_fill == sizeof (buffer)) \ + buffer[buffer_fill - 1] = 0; \ + else \ + buffer[buffer_fill] = 0; \ + } \ + } while (0) + + { /* Add list of DS names */ + char linebuf[1024]; + size_t linebuf_fill; + + memset (linebuf, 0, sizeof (linebuf)); + linebuf_fill = 0; + for (i = 0; i < ds_cnt; i++) + { + if (i > 0) + SSTRCAT (linebuf, " ", linebuf_fill); + SSTRCAT (linebuf, ds_namv[i], linebuf_fill); + rrd_freemem(ds_namv[i]); + } + rrd_freemem(ds_namv); + add_response_info (sock, "DSName: %s\n", linebuf); + } + + /* Add the actual data */ + assert (step > 0); + data_ptr = data; + for (t = start_tm + step; t <= end_tm; t += step) + { + char linebuf[1024]; + size_t linebuf_fill; + char tmp[128]; + + memset (linebuf, 0, sizeof (linebuf)); + linebuf_fill = 0; + for (i = 0; i < ds_cnt; i++) + { + snprintf (tmp, sizeof (tmp), " %0.10e", *data_ptr); + tmp[sizeof (tmp) - 1] = 0; + SSTRCAT (linebuf, tmp, linebuf_fill); + + data_ptr++; + } + + add_response_info (sock, "%10lu:%s\n", (unsigned long) t, linebuf); + } /* for (t) */ + rrd_freemem(data); + + return (send_response (sock, RESP_OK, "Success\n")); +#undef SSTRCAT +} /* }}} int handle_request_fetch */ + /* we came across a "WROTE" entry during journal replay. * throw away any values that we have accumulated for this file */ static int handle_request_wrote (HANDLER_PROTO) /* {{{ */ { - int i; cache_item_t *ci; const char *file = buffer; @@ -1459,12 +1683,7 @@ static int handle_request_wrote (HANDLER_PROTO) /* {{{ */ } if (ci->values) - { - for (i=0; i < ci->values_num; i++) - free(ci->values[i]); - - free(ci->values); - } + rrd_free_ptrs((void ***) &ci->values, &ci->values_num); wipe_ci_values(ci, now); remove_from_queue(ci); @@ -1502,11 +1721,10 @@ static int handle_request_quit (HANDLER_PROTO) /* {{{ */ return -1; } /* }}} static int handle_request_quit */ -struct command COMMANDS[] = { +static command_t list_of_commands[] = { /* {{{ */ { "UPDATE", handle_request_update, - PRIV_HIGH, CMD_CONTEXT_ANY, "UPDATE [ ...]\n" , @@ -1521,7 +1739,6 @@ struct command COMMANDS[] = { { "WROTE", handle_request_wrote, - PRIV_HIGH, CMD_CONTEXT_JOURNAL, NULL, NULL @@ -1529,7 +1746,6 @@ struct command COMMANDS[] = { { "FLUSH", handle_request_flush, - PRIV_LOW, CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH, "FLUSH \n" , @@ -1539,7 +1755,6 @@ struct command COMMANDS[] = { { "FLUSHALL", handle_request_flushall, - PRIV_HIGH, CMD_CONTEXT_CLIENT, "FLUSHALL\n" , @@ -1548,7 +1763,6 @@ struct command COMMANDS[] = { { "PENDING", handle_request_pending, - PRIV_HIGH, CMD_CONTEXT_CLIENT, "PENDING \n" , @@ -1558,7 +1772,6 @@ struct command COMMANDS[] = { { "FORGET", handle_request_forget, - PRIV_HIGH, CMD_CONTEXT_ANY, "FORGET \n" , @@ -1568,7 +1781,6 @@ struct command COMMANDS[] = { { "QUEUE", handle_request_queue, - PRIV_LOW, CMD_CONTEXT_CLIENT, "QUEUE\n" , @@ -1581,7 +1793,6 @@ struct command COMMANDS[] = { { "STATS", handle_request_stats, - PRIV_LOW, CMD_CONTEXT_CLIENT, "STATS\n" , @@ -1591,7 +1802,6 @@ struct command COMMANDS[] = { { "HELP", handle_request_help, - PRIV_LOW, CMD_CONTEXT_CLIENT, "HELP []\n", NULL, /* special! */ @@ -1599,7 +1809,6 @@ struct command COMMANDS[] = { { "BATCH", batch_start, - PRIV_LOW, CMD_CONTEXT_CLIENT, "BATCH\n" , @@ -1623,44 +1832,99 @@ struct command COMMANDS[] = { { ".", /* BATCH terminator */ batch_done, - PRIV_LOW, CMD_CONTEXT_BATCH, NULL, NULL }, { + "FETCH", + handle_request_fetch, + CMD_CONTEXT_CLIENT, + "FETCH [ []]\n" + , + "The 'FETCH' can be used by the client to retrieve values from an RRD file.\n" + }, + { "QUIT", handle_request_quit, - PRIV_LOW, CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH, "QUIT\n" , "Disconnect from rrdcached.\n" - }, - {NULL,NULL,0,0,NULL,NULL} /* LAST ENTRY */ -}; + } +}; /* }}} command_t list_of_commands[] */ +static size_t list_of_commands_len = sizeof (list_of_commands) + / sizeof (list_of_commands[0]); -static struct command *find_command(char *cmd) +static command_t *find_command(char *cmd) { - struct command *c = COMMANDS; - - while (c->cmd != NULL) - { - if (strcasecmp(cmd, c->cmd) == 0) - break; - c++; - } + size_t i; - if (c->cmd == NULL) - return NULL; - else - return c; + for (i = 0; i < list_of_commands_len; i++) + if (strcasecmp(cmd, list_of_commands[i].cmd) == 0) + return (&list_of_commands[i]); + return NULL; } +/* We currently use the index in the `list_of_commands' array as a bit position + * in `listen_socket_t.permissions'. This member schould NEVER be accessed from + * outside these functions so that switching to a more elegant storage method + * is easily possible. */ +static ssize_t find_command_index (const char *cmd) /* {{{ */ +{ + size_t i; + + for (i = 0; i < list_of_commands_len; i++) + if (strcasecmp(cmd, list_of_commands[i].cmd) == 0) + return ((ssize_t) i); + return (-1); +} /* }}} ssize_t find_command_index */ + +static int socket_permission_check (listen_socket_t *sock, /* {{{ */ + const char *cmd) +{ + ssize_t i; + + if (JOURNAL_REPLAY(sock)) + return (1); + + if (cmd == NULL) + return (-1); + + if ((strcasecmp ("QUIT", cmd) == 0) + || (strcasecmp ("HELP", cmd) == 0)) + return (1); + else if (strcmp (".", cmd) == 0) + cmd = "BATCH"; + + i = find_command_index (cmd); + if (i < 0) + return (-1); + assert (i < 32); + + if ((sock->permissions & (1 << i)) != 0) + return (1); + return (0); +} /* }}} int socket_permission_check */ + +static int socket_permission_add (listen_socket_t *sock, /* {{{ */ + const char *cmd) +{ + ssize_t i; + + i = find_command_index (cmd); + if (i < 0) + return (-1); + assert (i < 32); + + sock->permissions |= (1 << i); + return (0); +} /* }}} int socket_permission_add */ + /* check whether commands are received in the expected context */ -static int command_check_context(listen_socket_t *sock, struct command *cmd) +static int command_check_context(listen_socket_t *sock, command_t *cmd) { - if (sock == NULL) + if (JOURNAL_REPLAY(sock)) return (cmd->context & CMD_CONTEXT_JOURNAL); else if (sock->batch_start) return (cmd->context & CMD_CONTEXT_BATCH); @@ -1676,7 +1940,7 @@ static int handle_request_help (HANDLER_PROTO) /* {{{ */ int status; char *cmd_str; char *resp_txt; - struct command *help = NULL; + command_t *help = NULL; status = buffer_get_field (&buffer, &buffer_size, &cmd_str); if (status == 0) @@ -1697,26 +1961,26 @@ static int handle_request_help (HANDLER_PROTO) /* {{{ */ } else { - help = COMMANDS; + size_t i; + resp_txt = "Command overview\n"; - while (help->cmd) + for (i = 0; i < list_of_commands_len; i++) { - if (help->syntax) - add_response_info(sock, "%s", help->syntax); - help++; + if (list_of_commands[i].syntax == NULL) + continue; + add_response_info (sock, "%s", list_of_commands[i].syntax); } } return send_response(sock, RESP_OK, resp_txt); } /* }}} int handle_request_help */ -/* if sock==NULL, we are in journal replay mode */ static int handle_request (DISPATCH_PROTO) /* {{{ */ { char *buffer_ptr = buffer; char *cmd_str = NULL; - struct command *cmd = NULL; + command_t *cmd = NULL; int status; assert (buffer[buffer_size - 1] == '\0'); @@ -1735,9 +1999,8 @@ static int handle_request (DISPATCH_PROTO) /* {{{ */ if (!cmd) return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str); - status = has_privilege(sock, cmd->min_priv); - if (status <= 0) - return status; + if (!socket_permission_check (sock, cmd->cmd)) + return send_response(sock, RESP_ERR, "Permission denied.\n"); if (!command_check_context(sock, cmd)) return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str); @@ -1745,74 +2008,136 @@ static int handle_request (DISPATCH_PROTO) /* {{{ */ return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size); } /* }}} int handle_request */ -/* MUST NOT hold journal_lock before calling this */ -static void journal_rotate(void) /* {{{ */ +static void journal_set_free (journal_set *js) /* {{{ */ { - FILE *old_fh = NULL; - int new_fd; - - if (journal_cur == NULL || journal_old == NULL) + if (js == NULL) return; - pthread_mutex_lock(&journal_lock); + rrd_free_ptrs((void ***) &js->files, &js->files_num); - /* we rotate this way (rename before close) so that the we can release - * the journal lock as fast as possible. Journal writes to the new - * journal can proceed immediately after the new file is opened. The - * fclose can then block without affecting new updates. - */ - if (journal_fh != NULL) + free(js); +} /* }}} journal_set_free */ + +static void journal_set_remove (journal_set *js) /* {{{ */ +{ + if (js == NULL) + return; + + for (uint i=0; i < js->files_num; i++) { - old_fh = journal_fh; - journal_fh = NULL; - rename(journal_cur, journal_old); - ++stats_journal_rotate; + RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]); + unlink(js->files[i]); } +} /* }}} journal_set_remove */ - new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND, - S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH); - if (new_fd >= 0) +/* close current journal file handle. + * MUST hold journal_lock before calling */ +static void journal_close(void) /* {{{ */ +{ + if (journal_fh != NULL) { - journal_fh = fdopen(new_fd, "a"); - if (journal_fh == NULL) - close(new_fd); + if (fclose(journal_fh) != 0) + RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno)); } - pthread_mutex_unlock(&journal_lock); + journal_fh = NULL; + journal_size = 0; +} /* }}} journal_close */ + +/* MUST hold journal_lock before calling */ +static void journal_new_file(void) /* {{{ */ +{ + struct timeval now; + int new_fd; + char new_file[PATH_MAX + 1]; + + assert(journal_dir != NULL); + assert(journal_cur != NULL); - if (old_fh != NULL) - fclose(old_fh); + journal_close(); + gettimeofday(&now, NULL); + /* this format assures that the files sort in strcmp() order */ + snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d", + journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec); + + new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND, + S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH); + if (new_fd < 0) + goto error; + + journal_fh = fdopen(new_fd, "a"); if (journal_fh == NULL) - { - RRDD_LOG(LOG_CRIT, - "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)", - journal_cur, rrd_strerror(errno)); + goto error; - RRDD_LOG(LOG_ERR, - "JOURNALING DISABLED: All values will be flushed at shutdown"); - config_flush_at_shutdown = 1; - } + journal_size = ftell(journal_fh); + RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file); + + /* record the file in the journal set */ + rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file); + + return; + +error: + RRDD_LOG(LOG_CRIT, + "JOURNALING DISABLED: Error while trying to create %s : %s", + new_file, rrd_strerror(errno)); + RRDD_LOG(LOG_CRIT, + "JOURNALING DISABLED: All values will be flushed at shutdown"); + + close(new_fd); + config_flush_at_shutdown = 1; + +} /* }}} journal_new_file */ + +/* MUST NOT hold journal_lock before calling this */ +static void journal_rotate(void) /* {{{ */ +{ + journal_set *old_js = NULL; + + if (journal_dir == NULL) + return; + + RRDD_LOG(LOG_DEBUG, "rotating journals"); + + pthread_mutex_lock(&stats_lock); + ++stats_journal_rotate; + pthread_mutex_unlock(&stats_lock); + + pthread_mutex_lock(&journal_lock); + + journal_close(); + + /* rotate the journal sets */ + old_js = journal_old; + journal_old = journal_cur; + journal_cur = calloc(1, sizeof(journal_set)); + + if (journal_cur != NULL) + journal_new_file(); + else + RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n"); + + pthread_mutex_unlock(&journal_lock); + + journal_set_remove(old_js); + journal_set_free (old_js); } /* }}} static void journal_rotate */ +/* MUST hold journal_lock when calling */ static void journal_done(void) /* {{{ */ { if (journal_cur == NULL) return; - pthread_mutex_lock(&journal_lock); - if (journal_fh != NULL) - { - fclose(journal_fh); - journal_fh = NULL; - } + journal_close(); if (config_flush_at_shutdown) { RRDD_LOG(LOG_INFO, "removing journals"); - unlink(journal_old); - unlink(journal_cur); + journal_set_remove(journal_old); + journal_set_remove(journal_cur); } else { @@ -1820,7 +2145,9 @@ static void journal_done(void) /* {{{ */ "journals will be used at next startup"); } - pthread_mutex_unlock(&journal_lock); + journal_set_free(journal_cur); + journal_set_free(journal_old); + free(journal_dir); } /* }}} static void journal_done */ @@ -1833,6 +2160,11 @@ static int journal_write(char *cmd, char *args) /* {{{ */ pthread_mutex_lock(&journal_lock); chars = fprintf(journal_fh, "%s %s\n", cmd, args); + journal_size += chars; + + if (journal_size > JOURNAL_MAX) + journal_new_file(); + pthread_mutex_unlock(&journal_lock); if (chars > 0) @@ -1864,9 +2196,6 @@ static int journal_replay (const char *file) /* {{{ */ memset(&statbuf, 0, sizeof(statbuf)); if (stat(file, &statbuf) != 0) { - if (errno == ENOENT) - return 0; - reason = "stat error"; status = errno; } @@ -1942,25 +2271,83 @@ static int journal_replay (const char *file) /* {{{ */ return entry_cnt > 0 ? 1 : 0; } /* }}} static int journal_replay */ +static int journal_sort(const void *v1, const void *v2) +{ + char **jn1 = (char **) v1; + char **jn2 = (char **) v2; + + return strcmp(*jn1,*jn2); +} + static void journal_init(void) /* {{{ */ { int had_journal = 0; + DIR *dir; + struct dirent *dent; + char path[PATH_MAX+1]; - if (journal_cur == NULL) return; + if (journal_dir == NULL) return; pthread_mutex_lock(&journal_lock); + journal_cur = calloc(1, sizeof(journal_set)); + if (journal_cur == NULL) + { + RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n"); + return; + } + RRDD_LOG(LOG_INFO, "checking for journal files"); - had_journal += journal_replay(journal_old); - had_journal += journal_replay(journal_cur); + /* Handle old journal files during transition. This gives them the + * correct sort order. TODO: remove after first release + */ + { + char old_path[PATH_MAX+1]; + snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" ); + snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000"); + rename(old_path, path); + + snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ); + snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001"); + rename(old_path, path); + } + + dir = opendir(journal_dir); + if (!dir) { + RRDD_LOG(LOG_CRIT, "journal_init: opendir(%s) failed\n", journal_dir); + return; + } + while ((dent = readdir(dir)) != NULL) + { + /* looks like a journal file? */ + if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE))) + continue; + + snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name); + + if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path)) + { + RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!", + dent->d_name); + break; + } + } + closedir(dir); + + qsort(journal_cur->files, journal_cur->files_num, + sizeof(journal_cur->files[0]), journal_sort); + + for (uint i=0; i < journal_cur->files_num; i++) + had_journal += journal_replay(journal_cur->files[i]); + + journal_new_file(); /* it must have been a crash. start a flush */ if (had_journal && config_flush_at_shutdown) flush_old_values(-1); pthread_mutex_unlock(&journal_lock); - journal_rotate(); RRDD_LOG(LOG_INFO, "journal processing complete"); @@ -2009,7 +2396,7 @@ static void *connection_thread_main (void *args) /* {{{ */ connection_threads_num++; pthread_mutex_unlock (&connection_threads_lock); - while (do_shutdown == 0) + while (state == RUNNING) { char *cmd; ssize_t cmd_len; @@ -2024,7 +2411,7 @@ static void *connection_thread_main (void *args) /* {{{ */ pollfd.revents = 0; status = poll (&pollfd, 1, /* timeout = */ 500); - if (do_shutdown) + if (state != RUNNING) break; else if (status == 0) /* timeout */ continue; @@ -2091,11 +2478,31 @@ static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */ listen_socket_t *temp; int status; const char *path; + char *path_copy, *dir; path = sock->addr; if (strncmp(path, "unix:", strlen("unix:")) == 0) path += strlen("unix:"); + /* dirname may modify its argument */ + path_copy = strdup(path); + if (path_copy == NULL) + { + fprintf(stderr, "rrdcached: strdup(): %s\n", + rrd_strerror(errno)); + return (-1); + } + + dir = dirname(path_copy); + if (rrd_mkdir_p(dir, 0777) != 0) + { + fprintf(stderr, "Failed to create socket directory '%s': %s\n", + dir, rrd_strerror(errno)); + return (-1); + } + + free(path_copy); + temp = (listen_socket_t *) rrd_realloc (listen_fds, sizeof (listen_fds[0]) * (listen_fds_num + 1)); if (temp == NULL) @@ -2133,6 +2540,23 @@ static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */ return (-1); } + /* tweak the sockets group ownership */ + if (sock->socket_group != (gid_t)-1) + { + if ( (chown(path, getuid(), sock->socket_group) != 0) || + (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) ) + { + fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno)); + } + } + + if (sock->socket_permissions != (mode_t)-1) + { + if (chmod(path, sock->socket_permissions) != 0) + fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n", + (unsigned int)sock->socket_permissions, strerror(errno)); + } + status = listen (fd, /* backlog = */ 10); if (status != 0) { @@ -2198,8 +2622,8 @@ static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */ fprintf (stderr, "rrdcached: Garbage after address: %s\n", port); return (-1); } - } /* if (*addr = ']') */ - else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */ + } /* if (*addr == '[') */ + else { port = rindex(addr, ':'); if (port != NULL) @@ -2305,7 +2729,7 @@ static int close_listen_sockets (void) /* {{{ */ return (0); } /* }}} int close_listen_sockets */ -static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */ +static void *listen_thread_main (void UNUSED(*args)) /* {{{ */ { struct pollfd *pollfds; int pollfds_num; @@ -2329,7 +2753,7 @@ static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */ RRDD_LOG(LOG_INFO, "listening for connections"); - while (do_shutdown == 0) + while (state == RUNNING) { for (i = 0; i < pollfds_num; i++) { @@ -2339,7 +2763,7 @@ static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */ } status = poll (pollfds, pollfds_num, /* timeout = */ 1000); - if (do_shutdown) + if (state != RUNNING) break; else if (status == 0) /* timeout */ continue; @@ -2402,7 +2826,7 @@ static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */ continue; } } /* for (pollfds_num) */ - } /* while (do_shutdown == 0) */ + } /* while (state == RUNNING) */ RRDD_LOG(LOG_INFO, "starting shutdown"); @@ -2434,13 +2858,11 @@ static int daemonize (void) /* {{{ */ /* open all the listen sockets */ if (config_listen_address_list_len > 0) { - for (int i = 0; i < config_listen_address_list_len; i++) - { + for (size_t i = 0; i < config_listen_address_list_len; i++) open_listen_socket (config_listen_address_list[i]); - free_listen_socket (config_listen_address_list[i]); - } - free(config_listen_address_list); + rrd_free_ptrs((void ***) &config_listen_address_list, + &config_listen_address_list_len); } else { @@ -2478,8 +2900,9 @@ static int daemonize (void) /* {{{ */ close (0); open ("/dev/null", O_RDWR); - dup (0); - dup (0); + if (dup(0) == -1 || dup(0) == -1){ + RRDD_LOG (LOG_ERR, "faild to run dup.\n"); + } } /* if (!stay_foreground) */ /* Change into the /tmp directory. */ @@ -2515,8 +2938,6 @@ error: static int cleanup (void) /* {{{ */ { - do_shutdown++; - pthread_cond_broadcast (&flush_cond); pthread_join (flush_thread, NULL); @@ -2530,21 +2951,21 @@ static int cleanup (void) /* {{{ */ RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed"); } - journal_done(); - remove_pidfile (); - free(queue_threads); free(config_base_dir); - free(config_pid_file); - free(journal_cur); - free(journal_old); pthread_mutex_lock(&cache_lock); g_tree_destroy(cache_tree); + pthread_mutex_lock(&journal_lock); + journal_done(); + RRDD_LOG(LOG_INFO, "goodbye"); closelog (); + remove_pidfile (); + free(config_pid_file); + return (0); } /* }}} int cleanup */ @@ -2553,7 +2974,13 @@ static int read_options (int argc, char **argv) /* {{{ */ int option; int status = 0; - while ((option = getopt(argc, argv, "gl:L:f:w:z:t:Bb:p:Fj:h?")) != -1) + char **permissions = NULL; + size_t permissions_len = 0; + + gid_t socket_group = (gid_t)-1; + mode_t socket_permissions = (mode_t)-1; + + while ((option = getopt(argc, argv, "gl:s:m:P:f:w:z:t:Bb:p:Fj:a:h?")) != -1) { switch (option) { @@ -2561,10 +2988,8 @@ static int read_options (int argc, char **argv) /* {{{ */ stay_foreground=1; break; - case 'L': case 'l': { - listen_socket_t **temp; listen_socket_t *new; new = malloc(sizeof(listen_socket_t)); @@ -2575,20 +3000,121 @@ static int read_options (int argc, char **argv) /* {{{ */ } memset(new, 0, sizeof(listen_socket_t)); - temp = (listen_socket_t **) rrd_realloc (config_listen_address_list, - sizeof (listen_socket_t *) * (config_listen_address_list_len + 1)); - if (temp == NULL) + strncpy(new->addr, optarg, sizeof(new->addr)-1); + + /* Add permissions to the socket {{{ */ + if (permissions_len != 0) + { + size_t i; + for (i = 0; i < permissions_len; i++) + { + status = socket_permission_add (new, permissions[i]); + if (status != 0) + { + fprintf (stderr, "read_options: Adding permission \"%s\" to " + "socket failed. Most likely, this permission doesn't " + "exist. Check your command line.\n", permissions[i]); + status = 4; + } + } + } + else /* if (permissions_len == 0) */ { - fprintf (stderr, "read_options: realloc failed.\n"); + /* Add permission for ALL commands to the socket. */ + size_t i; + for (i = 0; i < list_of_commands_len; i++) + { + status = socket_permission_add (new, list_of_commands[i].cmd); + if (status != 0) + { + fprintf (stderr, "read_options: Adding permission \"%s\" to " + "socket failed. This should never happen, ever! Sorry.\n", + permissions[i]); + status = 4; + } + } + } + /* }}} Done adding permissions. */ + + new->socket_group = socket_group; + new->socket_permissions = socket_permissions; + + if (!rrd_add_ptr((void ***)&config_listen_address_list, + &config_listen_address_list_len, new)) + { + fprintf(stderr, "read_options: rrd_add_ptr failed.\n"); return (2); } - config_listen_address_list = temp; + } + break; - strncpy(new->addr, optarg, sizeof(new->addr)-1); - new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW; + /* set socket group permissions */ + case 's': + { + gid_t group_gid; + struct group *grp; + + group_gid = strtoul(optarg, NULL, 10); + if (errno != EINVAL && group_gid>0) + { + /* we were passed a number */ + grp = getgrgid(group_gid); + } + else + { + grp = getgrnam(optarg); + } + + if (grp) + { + socket_group = grp->gr_gid; + } + else + { + /* no idea what the user wanted... */ + fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg); + return (5); + } + } + break; - temp[config_listen_address_list_len] = new; - config_listen_address_list_len++; + /* set socket file permissions */ + case 'm': + { + long tmp; + char *endptr = NULL; + + tmp = strtol (optarg, &endptr, 8); + if ((endptr == optarg) || (! endptr) || (*endptr != '\0') + || (tmp > 07777) || (tmp < 0)) { + fprintf (stderr, "read_options: Invalid file mode \"%s\".\n", + optarg); + return (5); + } + + socket_permissions = (mode_t)tmp; + } + break; + + case 'P': + { + char *optcopy; + char *saveptr; + char *dummy; + char *ptr; + + rrd_free_ptrs ((void *) &permissions, &permissions_len); + + optcopy = strdup (optarg); + dummy = optcopy; + saveptr = NULL; + while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL) + { + dummy = NULL; + rrd_add_strdup ((void *) &permissions, &permissions_len, ptr); + } + + free (optcopy); } break; @@ -2670,6 +3196,13 @@ static int read_options (int argc, char **argv) /* {{{ */ return (3); } + if (rrd_mkdir_p (config_base_dir, 0777) != 0) + { + fprintf (stderr, "Failed to create base directory '%s': %s\n", + config_base_dir, rrd_strerror (errno)); + return (3); + } + /* make sure that the base directory is not resolved via * symbolic links. this makes some performance-enhancing * assumptions possible (we don't have to resolve paths @@ -2677,17 +3210,8 @@ static int read_options (int argc, char **argv) /* {{{ */ */ if (realpath(config_base_dir, base_realpath) == NULL) { - fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir); - return 5; - } - else if (strncmp(config_base_dir, - base_realpath, sizeof(base_realpath)) != 0) - { - fprintf(stderr, - "Base directory (-b) resolved via file system links!\n" - "Please consult rrdcached '-b' documentation!\n" - "Consider specifying the real directory (%s)\n", - base_realpath); + fprintf (stderr, "Failed to canonicalize the base directory '%s': " + "%s\n", config_base_dir, rrd_strerror(errno)); return 5; } @@ -2705,6 +3229,24 @@ static int read_options (int argc, char **argv) /* {{{ */ } _config_base_dir_len = len; + + len = strlen (base_realpath); + while ((len > 0) && (base_realpath[len - 1] == '/')) + { + base_realpath[len - 1] = '\0'; + len--; + } + + if (strncmp(config_base_dir, + base_realpath, sizeof(base_realpath)) != 0) + { + fprintf(stderr, + "Base directory (-b) resolved via file system links!\n" + "Please consult rrdcached '-b' documentation!\n" + "Consider specifying the real directory (%s)\n", + base_realpath); + return 5; + } } break; @@ -2727,48 +3269,51 @@ static int read_options (int argc, char **argv) /* {{{ */ case 'j': { - struct stat statbuf; - const char *dir = optarg; + char journal_dir_actual[PATH_MAX]; + const char *dir; + dir = journal_dir = strdup(realpath((const char *)optarg, journal_dir_actual)); - status = stat(dir, &statbuf); + status = rrd_mkdir_p(dir, 0777); if (status != 0) { - fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno)); + fprintf(stderr, "Failed to create journal directory '%s': %s\n", + dir, rrd_strerror(errno)); return 6; } - if (!S_ISDIR(statbuf.st_mode) - || access(dir, R_OK|W_OK|X_OK) != 0) + if (access(dir, R_OK|W_OK|X_OK) != 0) { fprintf(stderr, "Must specify a writable directory with -j! (%s)\n", errno ? rrd_strerror(errno) : ""); return 6; } + } + break; - journal_cur = malloc(PATH_MAX + 1); - journal_old = malloc(PATH_MAX + 1); - if (journal_cur == NULL || journal_old == NULL) - { - fprintf(stderr, "malloc failure for journal files\n"); - return 6; - } - else + case 'a': + { + int temp = atoi(optarg); + if (temp > 0) + config_alloc_chunk = temp; + else { - snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir); - snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir); + fprintf(stderr, "Invalid allocation size: %s\n", optarg); + return 10; } } break; case 'h': case '?': - printf ("RRDCacheD %s Copyright (C) 2008 Florian octo Forster\n" + printf ("RRDCacheD %s\n" + "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n" "\n" "Usage: rrdcached [options]\n" "\n" "Valid options are:\n" " -l
Socket address to listen to.\n" - " -L
Socket address to listen to ('FLUSH' only).\n" + " -P Sets the permissions to assign to all following " + "sockets\n" " -w Interval in which to write data.\n" " -z Delay writes up to seconds to spread load\n" " -t Number of write threads.\n" @@ -2779,12 +3324,21 @@ static int read_options (int argc, char **argv) /* {{{ */ " -g Do not fork and run in the foreground.\n" " -j Directory in which to create the journal files.\n" " -F Always flush all updates at shutdown\n" + " -s Group owner of all following UNIX sockets\n" + " (the socket will also have read/write permissions " + "for that group)\n" + " -m File permissions (octal) of all following UNIX " + "sockets\n" + " -a Memory allocation chunk size. Default is 1." "\n" "For more information and a detailed description of all options " "please refer\n" "to the rrdcached(1) manual page.\n", VERSION); - status = -1; + if (option == 'h') + status = -1; + else + status = 1; break; } /* switch (option) */ } /* while (getopt) */ @@ -2801,9 +3355,11 @@ static int read_options (int argc, char **argv) /* {{{ */ fprintf(stderr, "WARNING: -B does not make sense without -b!\n" " Consult the rrdcached documentation\n"); - if (journal_cur == NULL) + if (journal_dir == NULL) config_flush_at_shutdown = 1; + rrd_free_ptrs ((void *) &permissions, &permissions_len); + return (status); } /* }}} int read_options */