X-Git-Url: https://git.octo.it/?p=rrdtool.git;a=blobdiff_plain;f=src%2Frrd_daemon.c;h=2c81424a34d0426d8680b751f13221b9837d268c;hp=199ebdb13c1b0071cec8a55bc6db8f438442c703;hb=b63a6268ac7c3668f6731c0a0972e4575c3f6dcf;hpb=9d1a64da41154f4d2f6c803c572f6f7d2684d732 diff --git a/src/rrd_daemon.c b/src/rrd_daemon.c index 199ebdb..2c81424 100644 --- a/src/rrd_daemon.c +++ b/src/rrd_daemon.c @@ -1,7 +1,7 @@ /** * RRDTool - src/rrd_daemon.c - * Copyright (C) 2008 Florian octo Forster - * Copyright (C) 2008 Kevin Brintnall + * Copyright (C) 2008-2010 Florian octo Forster + * Copyright (C) 2008,2009 Kevin Brintnall * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the @@ -63,25 +63,21 @@ * Now for some includes.. */ /* {{{ */ -#if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H) -#include "../win32/config.h" -#else -#ifdef HAVE_CONFIG_H -#include "../rrd_config.h" -#endif -#endif -#include "rrd.h" +#include "rrd_tool.h" #include "rrd_client.h" +#include "unused.h" #include #ifndef WIN32 -#include +#ifdef HAVE_STDINT_H +# include +#endif #include #include #include -# include +#include #else @@ -103,25 +99,27 @@ #include #include #include +#include +#include + +#ifdef HAVE_LIBWRAP +#include +#endif /* HAVE_LIBWRAP */ #include /* }}} */ -#define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__) - -#ifndef __GNUC__ -# define __attribute__(x) /**/ -#endif +#define RRDD_LOG(severity, ...) \ + do { \ + if (stay_foreground) { \ + fprintf(stderr, __VA_ARGS__); \ + fprintf(stderr, "\n"); } \ + syslog ((severity), __VA_ARGS__); \ + } while (0) /* * Types */ -typedef enum -{ - PRIV_LOW, - PRIV_HIGH -} socket_privilege; - typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code; struct listen_socket_s @@ -129,7 +127,6 @@ struct listen_socket_s int fd; char addr[PATH_MAX + 1]; int family; - socket_privilege privilege; /* state for BATCH processing */ time_t batch_start; @@ -142,23 +139,28 @@ struct listen_socket_s char *wbuf; ssize_t wbuf_len; + + uint32_t permissions; + + gid_t socket_group; + mode_t socket_permissions; }; typedef struct listen_socket_s listen_socket_t; -struct command; +struct command_s; +typedef struct command_s command_t; /* note: guard against "unused" warnings in the handlers */ -#define DISPATCH_PROTO listen_socket_t *sock __attribute__((unused)),\ - time_t now __attribute__((unused)),\ - char *buffer __attribute__((unused)),\ - size_t buffer_size __attribute__((unused)) +#define DISPATCH_PROTO listen_socket_t UNUSED(*sock),\ + time_t UNUSED(now),\ + char UNUSED(*buffer),\ + size_t UNUSED(buffer_size) -#define HANDLER_PROTO struct command *cmd __attribute__((unused)),\ +#define HANDLER_PROTO command_t UNUSED(*cmd),\ DISPATCH_PROTO -struct command { +struct command_s { char *cmd; int (*handler)(HANDLER_PROTO); - socket_privilege min_priv; char context; /* where we expect to see it */ #define CMD_CONTEXT_CLIENT (1<<0) @@ -176,9 +178,10 @@ struct cache_item_s { char *file; char **values; - size_t values_num; + size_t values_num; /* number of valid pointers */ + size_t values_alloc; /* number of allocated pointers */ time_t last_flush_time; - time_t last_update_stamp; + double last_update_stamp; #define CI_FLAGS_IN_TREE (1<<0) #define CI_FLAGS_IN_QUEUE (1<<1) int flags; @@ -209,9 +212,7 @@ typedef struct { size_t files_num; } journal_set; -/* max length of socket command or response */ -#define CMD_MAX 4096 -#define RBUF_SIZE (CMD_MAX*2) +#define RBUF_SIZE (RRD_CMD_MAX*2) /* * Variables @@ -222,6 +223,8 @@ static uid_t daemon_uid; static listen_socket_t *listen_fds = NULL; static size_t listen_fds_num = 0; +static listen_socket_t default_socket; + enum { RUNNING, /* normal operation */ FLUSHING, /* flushing remaining values */ @@ -253,6 +256,7 @@ static char *config_pid_file = NULL; static char *config_base_dir = NULL; static size_t _config_base_dir_len = 0; static int config_write_base_only = 0; +static size_t config_alloc_chunk = 1; static listen_socket_t **config_listen_address_list = NULL; static size_t config_listen_address_list_len = 0; @@ -266,7 +270,10 @@ static uint64_t stats_journal_bytes = 0; static uint64_t stats_journal_rotate = 0; static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER; +static int opt_no_overwrite = 0; /* default for the daemon */ + /* Journaled updates */ +#define JOURNAL_REPLAY(s) ((s) == NULL) #define JOURNAL_BASE "rrd.journal" static journal_set *journal_cur = NULL; static journal_set *journal_old = NULL; @@ -288,28 +295,30 @@ static int handle_request_help (HANDLER_PROTO); static void sig_common (const char *sig) /* {{{ */ { RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig); - state = FLUSHING; + if (state == RUNNING) { + state = FLUSHING; + } pthread_cond_broadcast(&flush_cond); pthread_cond_broadcast(&queue_cond); } /* }}} void sig_common */ -static void sig_int_handler (int s __attribute__((unused))) /* {{{ */ +static void sig_int_handler (int UNUSED(s)) /* {{{ */ { sig_common("INT"); } /* }}} void sig_int_handler */ -static void sig_term_handler (int s __attribute__((unused))) /* {{{ */ +static void sig_term_handler (int UNUSED(s)) /* {{{ */ { sig_common("TERM"); } /* }}} void sig_term_handler */ -static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */ +static void sig_usr1_handler (int UNUSED(s)) /* {{{ */ { config_flush_at_shutdown = 1; sig_common("USR1"); } /* }}} void sig_usr1_handler */ -static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */ +static void sig_usr2_handler (int UNUSED(s)) /* {{{ */ { config_flush_at_shutdown = 0; sig_common("USR2"); @@ -351,12 +360,32 @@ static void install_signal_handlers(void) /* {{{ */ static int open_pidfile(char *action, int oflag) /* {{{ */ { int fd; - char *file; + const char *file; + char *file_copy, *dir; file = (config_pid_file != NULL) ? config_pid_file : LOCALSTATEDIR "/run/rrdcached.pid"; + /* dirname may modify its argument */ + file_copy = strdup(file); + if (file_copy == NULL) + { + fprintf(stderr, "rrdcached: strdup(): %s\n", + rrd_strerror(errno)); + return -1; + } + + dir = dirname(file_copy); + if (rrd_mkdir_p(dir, 0777) != 0) + { + fprintf(stderr, "Failed to create pidfile directory '%s': %s\n", + dir, rrd_strerror(errno)); + return -1; + } + + free(file_copy); + fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH); if (fd < 0) fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n", @@ -479,7 +508,7 @@ static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */ /* NOTREACHED */ assert(1==0); -} +} /* }}} char *next_cmd */ /* add the characters directly to the write buffer */ static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */ @@ -507,10 +536,10 @@ static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */ static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */ { va_list argp; - char buffer[CMD_MAX]; + char buffer[RRD_CMD_MAX]; int len; - if (sock == NULL) return 0; /* journal replay mode */ + if (JOURNAL_REPLAY(sock)) return 0; if (sock->batch_start) return 0; /* no extra info returned when in BATCH */ va_start(argp, fmt); @@ -552,12 +581,12 @@ static int send_response (listen_socket_t *sock, response_code rc, char *fmt, ...) /* {{{ */ { va_list argp; - char buffer[CMD_MAX]; + char buffer[RRD_CMD_MAX]; int lines; ssize_t wrote; int rclen, len; - if (sock == NULL) return rc; /* journal replay mode */ + if (JOURNAL_REPLAY(sock)) return rc; if (sock->batch_start) { @@ -619,6 +648,7 @@ static void wipe_ci_values(cache_item_t *ci, time_t when) { ci->values = NULL; ci->values_num = 0; + ci->values_alloc = 0; ci->last_flush_time = when; if (config_write_jitter > 0) @@ -798,9 +828,10 @@ static int flush_old_values (int max_age) for (k = 0; k < cfd.keys_num; k++) { + gboolean status = g_tree_remove(cache_tree, cfd.keys[k]); /* should never fail, since we have held the cache_lock * the entire time */ - assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE ); + assert(status == TRUE); } if (cfd.keys != NULL) @@ -812,7 +843,7 @@ static int flush_old_values (int max_age) return (0); } /* int flush_old_values */ -static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */ +static void *flush_thread_main (void UNUSED(*args)) /* {{{ */ { struct timeval now; struct timespec next_flush; @@ -865,7 +896,7 @@ static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */ return NULL; } /* void *flush_thread_main */ -static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ +static void *queue_thread_main (void UNUSED(*args)) /* {{{ */ { pthread_mutex_lock (&cache_lock); @@ -1025,7 +1056,7 @@ static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */ assert(file != NULL); if (!config_write_base_only - || sock == NULL /* journal replay */ + || JOURNAL_REPLAY(sock) || config_base_dir == NULL) return 1; @@ -1068,20 +1099,6 @@ static void get_abs_path(char **filename, char *tmp) *filename = tmp; } /* }}} static int get_abs_path */ -/* returns 1 if we have the required privilege level, - * otherwise issue an error to the user on sock */ -static int has_privilege (listen_socket_t *sock, /* {{{ */ - socket_privilege priv) -{ - if (sock == NULL) /* journal replay */ - return 1; - - if (sock->privilege >= priv) - return 1; - - return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES)); -} /* }}} static int has_privilege */ - static int flush_file (const char *filename) /* {{{ */ { cache_item_t *ci; @@ -1110,7 +1127,7 @@ static int flush_file (const char *filename) /* {{{ */ return (0); } /* }}} int flush_file */ -static int syntax_error(listen_socket_t *sock, struct command *cmd) /* {{{ */ +static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */ { char *err = "Syntax error.\n"; @@ -1268,7 +1285,7 @@ static int handle_request_forget(HANDLER_PROTO) /* {{{ */ if (found == TRUE) { - if (sock != NULL) + if (!JOURNAL_REPLAY(sock)) journal_write("forget", file); return send_response(sock, RESP_OK, "Gone!\n"); @@ -1303,12 +1320,13 @@ static int handle_request_update (HANDLER_PROTO) /* {{{ */ char *file, file_tmp[PATH_MAX]; int values_num = 0; int status; - char orig_buf[CMD_MAX]; + char orig_buf[RRD_CMD_MAX]; cache_item_t *ci; /* save it for the journal later */ - strncpy(orig_buf, buffer, sizeof(orig_buf)-1); + if (!JOURNAL_REPLAY(sock)) + strncpy(orig_buf, buffer, min(RRD_CMD_MAX,buffer_size)); status = buffer_get_field (&buffer, &buffer_size, &file); if (status != 0) @@ -1393,13 +1411,13 @@ static int handle_request_update (HANDLER_PROTO) /* {{{ */ assert (ci != NULL); /* don't re-write updates in replay mode */ - if (sock != NULL) + if (!JOURNAL_REPLAY(sock)) journal_write("update", orig_buf); while (buffer_size > 0) { char *value; - time_t stamp; + double stamp; char *eostamp; status = buffer_get_field (&buffer, &buffer_size, &value); @@ -1409,8 +1427,9 @@ static int handle_request_update (HANDLER_PROTO) /* {{{ */ break; } - /* make sure update time is always moving forward */ - stamp = strtol(value, &eostamp, 10); + /* make sure update time is always moving forward. We use double here since + update does support subsecond precision for timestamps ... */ + stamp = strtod(value, &eostamp); if (eostamp == value || eostamp == NULL || *eostamp != ':') { pthread_mutex_unlock(&cache_lock); @@ -1421,14 +1440,15 @@ static int handle_request_update (HANDLER_PROTO) /* {{{ */ { pthread_mutex_unlock(&cache_lock); return send_response(sock, RESP_ERR, - "illegal attempt to update using time %ld when last" - " update time is %ld (minimum one second step)\n", + "illegal attempt to update using time %lf when last" + " update time is %lf (minimum one second step)\n", stamp, ci->last_update_stamp); } else ci->last_update_stamp = stamp; - if (!rrd_add_strdup(&ci->values, &ci->values_num, value)) + if (!rrd_add_strdup_chunk(&ci->values, &ci->values_num, value, + &ci->values_alloc, config_alloc_chunk)) { RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed."); continue; @@ -1457,6 +1477,197 @@ static int handle_request_update (HANDLER_PROTO) /* {{{ */ } /* }}} int handle_request_update */ +static int handle_request_fetch (HANDLER_PROTO) /* {{{ */ +{ + char *file, file_tmp[PATH_MAX]; + char *cf; + + char *start_str; + char *end_str; + time_t start_tm; + time_t end_tm; + + unsigned long step; + unsigned long ds_cnt; + char **ds_namv; + rrd_value_t *data; + + int status; + unsigned long i; + time_t t; + rrd_value_t *data_ptr; + + file = NULL; + cf = NULL; + start_str = NULL; + end_str = NULL; + + /* Read the arguments */ + do /* while (0) */ + { + status = buffer_get_field (&buffer, &buffer_size, &file); + if (status != 0) + break; + + status = buffer_get_field (&buffer, &buffer_size, &cf); + if (status != 0) + break; + + status = buffer_get_field (&buffer, &buffer_size, &start_str); + if (status != 0) + { + start_str = NULL; + status = 0; + break; + } + + status = buffer_get_field (&buffer, &buffer_size, &end_str); + if (status != 0) + { + end_str = NULL; + status = 0; + break; + } + } while (0); + + if (status != 0) + return (syntax_error(sock,cmd)); + + get_abs_path(&file, file_tmp); + if (!check_file_access(file, sock)) return 0; + + status = flush_file (file); + if ((status != 0) && (status != ENOENT)) + return (send_response (sock, RESP_ERR, + "flush_file (%s) failed with status %i.\n", file, status)); + + t = time (NULL); /* "now" */ + + /* Parse start time */ + if (start_str != NULL) + { + char *endptr; + long value; + + endptr = NULL; + errno = 0; + value = strtol (start_str, &endptr, /* base = */ 0); + if ((endptr == start_str) || (errno != 0)) + return (send_response(sock, RESP_ERR, + "Cannot parse start time `%s': Only simple integers are allowed.\n", + start_str)); + + if (value > 0) + start_tm = (time_t) value; + else + start_tm = (time_t) (t + value); + } + else + { + start_tm = t - 86400; + } + + /* Parse end time */ + if (end_str != NULL) + { + char *endptr; + long value; + + endptr = NULL; + errno = 0; + value = strtol (end_str, &endptr, /* base = */ 0); + if ((endptr == end_str) || (errno != 0)) + return (send_response(sock, RESP_ERR, + "Cannot parse end time `%s': Only simple integers are allowed.\n", + end_str)); + + if (value > 0) + end_tm = (time_t) value; + else + end_tm = (time_t) (t + value); + } + else + { + end_tm = t; + } + + step = -1; + ds_cnt = 0; + ds_namv = NULL; + data = NULL; + + status = rrd_fetch_r (file, cf, &start_tm, &end_tm, &step, + &ds_cnt, &ds_namv, &data); + if (status != 0) + return (send_response(sock, RESP_ERR, + "rrd_fetch_r failed: %s\n", rrd_get_error ())); + + add_response_info (sock, "FlushVersion: %lu\n", 1); + add_response_info (sock, "Start: %lu\n", (unsigned long) start_tm); + add_response_info (sock, "End: %lu\n", (unsigned long) end_tm); + add_response_info (sock, "Step: %lu\n", step); + add_response_info (sock, "DSCount: %lu\n", ds_cnt); + +#define SSTRCAT(buffer,str,buffer_fill) do { \ + size_t str_len = strlen (str); \ + if ((buffer_fill + str_len) > sizeof (buffer)) \ + str_len = sizeof (buffer) - buffer_fill; \ + if (str_len > 0) { \ + strncpy (buffer + buffer_fill, str, str_len); \ + buffer_fill += str_len; \ + assert (buffer_fill <= sizeof (buffer)); \ + if (buffer_fill == sizeof (buffer)) \ + buffer[buffer_fill - 1] = 0; \ + else \ + buffer[buffer_fill] = 0; \ + } \ + } while (0) + + { /* Add list of DS names */ + char linebuf[1024]; + size_t linebuf_fill; + + memset (linebuf, 0, sizeof (linebuf)); + linebuf_fill = 0; + for (i = 0; i < ds_cnt; i++) + { + if (i > 0) + SSTRCAT (linebuf, " ", linebuf_fill); + SSTRCAT (linebuf, ds_namv[i], linebuf_fill); + rrd_freemem(ds_namv[i]); + } + rrd_freemem(ds_namv); + add_response_info (sock, "DSName: %s\n", linebuf); + } + + /* Add the actual data */ + assert (step > 0); + data_ptr = data; + for (t = start_tm + step; t <= end_tm; t += step) + { + char linebuf[1024]; + size_t linebuf_fill; + char tmp[128]; + + memset (linebuf, 0, sizeof (linebuf)); + linebuf_fill = 0; + for (i = 0; i < ds_cnt; i++) + { + snprintf (tmp, sizeof (tmp), " %0.10e", *data_ptr); + tmp[sizeof (tmp) - 1] = 0; + SSTRCAT (linebuf, tmp, linebuf_fill); + + data_ptr++; + } + + add_response_info (sock, "%10lu:%s\n", (unsigned long) t, linebuf); + } /* for (t) */ + rrd_freemem(data); + + return (send_response (sock, RESP_OK, "Success\n")); +#undef SSTRCAT +} /* }}} int handle_request_fetch */ + /* we came across a "WROTE" entry during journal replay. * throw away any values that we have accumulated for this file */ @@ -1484,6 +1695,192 @@ static int handle_request_wrote (HANDLER_PROTO) /* {{{ */ return (0); } /* }}} int handle_request_wrote */ +static int handle_request_info (HANDLER_PROTO) /* {{{ */ +{ + char *file, file_tmp[PATH_MAX]; + int status; + rrd_info_t *info; + + /* obtain filename */ + status = buffer_get_field(&buffer, &buffer_size, &file); + if (status != 0) + return syntax_error(sock,cmd); + /* get full pathname */ + get_abs_path(&file, file_tmp); + if (!check_file_access(file, sock)) { + return send_response(sock, RESP_ERR, "Cannot read: %s\n", file); + } + /* get data */ + rrd_clear_error (); + info = rrd_info_r(file); + if(!info) { + return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error()); + } + for (rrd_info_t *data = info; data != NULL; data = data->next) { + switch (data->type) { + case RD_I_VAL: + if (isnan(data->value.u_val)) + add_response_info(sock,"%s %d NaN\n",data->key, data->type); + else + add_response_info(sock,"%s %d %0.10e\n", data->key, data->type, data->value.u_val); + break; + case RD_I_CNT: + add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_cnt); + break; + case RD_I_INT: + add_response_info(sock,"%s %d %d\n", data->key, data->type, data->value.u_int); + break; + case RD_I_STR: + add_response_info(sock,"%s %d %s\n", data->key, data->type, data->value.u_str); + break; + case RD_I_BLO: + add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_blo.size); + break; + } + } + + rrd_info_free(info); + + return send_response(sock, RESP_OK, "Info for %s follows\n",file); +} /* }}} static int handle_request_info */ + +static int handle_request_first (HANDLER_PROTO) /* {{{ */ +{ + char *i, *file, file_tmp[PATH_MAX]; + int status; + int idx; + time_t t; + + /* obtain filename */ + status = buffer_get_field(&buffer, &buffer_size, &file); + if (status != 0) + return syntax_error(sock,cmd); + /* get full pathname */ + get_abs_path(&file, file_tmp); + if (!check_file_access(file, sock)) { + return send_response(sock, RESP_ERR, "Cannot read: %s\n", file); + } + + status = buffer_get_field(&buffer, &buffer_size, &i); + if (status != 0) + return syntax_error(sock,cmd); + idx = atoi(i); + if(idx<0) { + return send_response(sock, RESP_ERR, "Invalid index specified (%d)\n", idx); + } + + /* get data */ + rrd_clear_error (); + t = rrd_first_r(file,idx); + if(t<1) { + return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error()); + } + return send_response(sock, RESP_OK, "%lu\n",(unsigned)t); +} /* }}} static int handle_request_first */ + + +static int handle_request_last (HANDLER_PROTO) /* {{{ */ +{ + char *file, file_tmp[PATH_MAX]; + int status; + time_t t, from_file, step; + rrd_file_t * rrd_file; + cache_item_t * ci; + rrd_t rrd; + + /* obtain filename */ + status = buffer_get_field(&buffer, &buffer_size, &file); + if (status != 0) + return syntax_error(sock,cmd); + /* get full pathname */ + get_abs_path(&file, file_tmp); + if (!check_file_access(file, sock)) { + return send_response(sock, RESP_ERR, "Cannot read: %s\n", file); + } + rrd_clear_error(); + rrd_init(&rrd); + rrd_file = rrd_open(file,&rrd,RRD_READONLY); + if(!rrd_file) { + return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error()); + } + from_file = rrd.live_head->last_up; + step = rrd.stat_head->pdp_step; + rrd_close(rrd_file); + pthread_mutex_lock(&cache_lock); + ci = g_tree_lookup(cache_tree, file); + if (ci) + t = ci->last_update_stamp; + else + t = from_file; + pthread_mutex_unlock(&cache_lock); + t -= t % step; + rrd_free(&rrd); + if(t<1) { + return send_response(sock, RESP_ERR, "Error: rrdcached: Invalid timestamp returned\n"); + } + return send_response(sock, RESP_OK, "%lu\n",(unsigned)t); +} /* }}} static int handle_request_last */ + +static int handle_request_create (HANDLER_PROTO) /* {{{ */ +{ + char *file, file_tmp[PATH_MAX]; + char *tok; + int ac = 0; + char *av[128]; + int status; + unsigned long step = 300; + time_t last_up = time(NULL)-10; + int no_overwrite = opt_no_overwrite; + + + /* obtain filename */ + status = buffer_get_field(&buffer, &buffer_size, &file); + if (status != 0) + return syntax_error(sock,cmd); + /* get full pathname */ + get_abs_path(&file, file_tmp); + if (!check_file_access(file, sock)) { + return send_response(sock, RESP_ERR, "Cannot read: %s\n", file); + } + RRDD_LOG(LOG_INFO, "rrdcreate request for %s",file); + + while ((status = buffer_get_field(&buffer, &buffer_size, &tok)) == 0 && tok) { + if( ! strncmp(tok,"-b",2) ) { + status = buffer_get_field(&buffer, &buffer_size, &tok ); + if (status != 0) return syntax_error(sock,cmd); + last_up = (time_t) atol(tok); + continue; + } + if( ! strncmp(tok,"-s",2) ) { + status = buffer_get_field(&buffer, &buffer_size, &tok ); + if (status != 0) return syntax_error(sock,cmd); + step = atol(tok); + continue; + } + if( ! strncmp(tok,"-O",2) ) { + no_overwrite = 1; + continue; + } + if( ! strncmp(tok,"DS:",3) ) { av[ac++]=tok; continue; } + if( ! strncmp(tok,"RRA:",4) ) { av[ac++]=tok; continue; } + return syntax_error(sock,cmd); + } + if(step<1) { + return send_response(sock, RESP_ERR, "The step size cannot be less than 1 second.\n"); + } + if (last_up < 3600 * 24 * 365 * 10) { + return send_response(sock, RESP_ERR, "The first entry must be after 1980.\n"); + } + + rrd_clear_error (); + status = rrd_create_r2(file,step,last_up,no_overwrite,ac,(const char **)av); + + if(!status) { + return send_response(sock, RESP_OK, "RRD created OK\n"); + } + return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error()); +} /* }}} static int handle_request_create */ + /* start "BATCH" processing */ static int batch_start (HANDLER_PROTO) /* {{{ */ { @@ -1513,11 +1910,10 @@ static int handle_request_quit (HANDLER_PROTO) /* {{{ */ return -1; } /* }}} static int handle_request_quit */ -struct command COMMANDS[] = { +static command_t list_of_commands[] = { /* {{{ */ { "UPDATE", handle_request_update, - PRIV_HIGH, CMD_CONTEXT_ANY, "UPDATE [ ...]\n" , @@ -1532,7 +1928,6 @@ struct command COMMANDS[] = { { "WROTE", handle_request_wrote, - PRIV_HIGH, CMD_CONTEXT_JOURNAL, NULL, NULL @@ -1540,7 +1935,6 @@ struct command COMMANDS[] = { { "FLUSH", handle_request_flush, - PRIV_LOW, CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH, "FLUSH \n" , @@ -1550,7 +1944,6 @@ struct command COMMANDS[] = { { "FLUSHALL", handle_request_flushall, - PRIV_HIGH, CMD_CONTEXT_CLIENT, "FLUSHALL\n" , @@ -1559,7 +1952,6 @@ struct command COMMANDS[] = { { "PENDING", handle_request_pending, - PRIV_HIGH, CMD_CONTEXT_CLIENT, "PENDING \n" , @@ -1569,7 +1961,6 @@ struct command COMMANDS[] = { { "FORGET", handle_request_forget, - PRIV_HIGH, CMD_CONTEXT_ANY, "FORGET \n" , @@ -1579,7 +1970,6 @@ struct command COMMANDS[] = { { "QUEUE", handle_request_queue, - PRIV_LOW, CMD_CONTEXT_CLIENT, "QUEUE\n" , @@ -1592,7 +1982,6 @@ struct command COMMANDS[] = { { "STATS", handle_request_stats, - PRIV_LOW, CMD_CONTEXT_CLIENT, "STATS\n" , @@ -1602,7 +1991,6 @@ struct command COMMANDS[] = { { "HELP", handle_request_help, - PRIV_LOW, CMD_CONTEXT_CLIENT, "HELP []\n", NULL, /* special! */ @@ -1610,7 +1998,6 @@ struct command COMMANDS[] = { { "BATCH", batch_start, - PRIV_LOW, CMD_CONTEXT_CLIENT, "BATCH\n" , @@ -1634,44 +2021,160 @@ struct command COMMANDS[] = { { ".", /* BATCH terminator */ batch_done, - PRIV_LOW, CMD_CONTEXT_BATCH, NULL, NULL }, { + "FETCH", + handle_request_fetch, + CMD_CONTEXT_CLIENT, + "FETCH [ []]\n" + , + "The 'FETCH' can be used by the client to retrieve values from an RRD file.\n" + }, + { + "INFO", + handle_request_info, + CMD_CONTEXT_CLIENT, + "INFO \n", + "The INFO command retrieves information about a specified RRD file.\n" + "This is returned in standard rrdinfo format, a sequence of lines\n" + "with the format = \n" + "Note that this is the data as of the last update of the RRD file itself,\n" + "not the last time data was received via rrdcached, so there may be pending\n" + "updates in the queue. If this bothers you, then first run a FLUSH.\n" + }, + { + "FIRST", + handle_request_first, + CMD_CONTEXT_CLIENT, + "FIRST \n", + "The FIRST command retrieves the first data time for a specified RRA in\n" + "an RRD file.\n" + }, + { + "LAST", + handle_request_last, + CMD_CONTEXT_CLIENT, + "LAST \n", + "The LAST command retrieves the last update time for a specified RRD file.\n" + "Note that this is the time of the last update of the RRD file itself, not\n" + "the last time data was received via rrdcached, so there may be pending\n" + "updates in the queue. If this bothers you, then first run a FLUSH.\n" + }, + { + "CREATE", + handle_request_create, + CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH, + "CREATE [-b start] [-s step] [-O] \n", + "The CREATE command will create an RRD file, overwriting any existing file\n" + "unless the -O option is given or rrdcached was started with the -O option.\n" + "The start parameter needs to be in seconds since 1/1/70 (AT-style syntax is\n" + "not acceptable) and the step is in seconds (default is 300).\n" + "The DS and RRA definitions are as for the 'rrdtool create' command.\n" + }, + { "QUIT", handle_request_quit, - PRIV_LOW, CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH, "QUIT\n" , "Disconnect from rrdcached.\n" - }, - {NULL,NULL,0,0,NULL,NULL} /* LAST ENTRY */ -}; + } +}; /* }}} command_t list_of_commands[] */ +static size_t list_of_commands_len = sizeof (list_of_commands) + / sizeof (list_of_commands[0]); -static struct command *find_command(char *cmd) +static command_t *find_command(char *cmd) { - struct command *c = COMMANDS; - - while (c->cmd != NULL) - { - if (strcasecmp(cmd, c->cmd) == 0) - break; - c++; - } + size_t i; - if (c->cmd == NULL) - return NULL; - else - return c; + for (i = 0; i < list_of_commands_len; i++) + if (strcasecmp(cmd, list_of_commands[i].cmd) == 0) + return (&list_of_commands[i]); + return NULL; } +/* We currently use the index in the `list_of_commands' array as a bit position + * in `listen_socket_t.permissions'. This member schould NEVER be accessed from + * outside these functions so that switching to a more elegant storage method + * is easily possible. */ +static ssize_t find_command_index (const char *cmd) /* {{{ */ +{ + size_t i; + + for (i = 0; i < list_of_commands_len; i++) + if (strcasecmp(cmd, list_of_commands[i].cmd) == 0) + return ((ssize_t) i); + return (-1); +} /* }}} ssize_t find_command_index */ + +static int socket_permission_check (listen_socket_t *sock, /* {{{ */ + const char *cmd) +{ + ssize_t i; + + if (JOURNAL_REPLAY(sock)) + return (1); + + if (cmd == NULL) + return (-1); + + if ((strcasecmp ("QUIT", cmd) == 0) + || (strcasecmp ("HELP", cmd) == 0)) + return (1); + else if (strcmp (".", cmd) == 0) + cmd = "BATCH"; + + i = find_command_index (cmd); + if (i < 0) + return (-1); + assert (i < 32); + + if ((sock->permissions & (1 << i)) != 0) + return (1); + return (0); +} /* }}} int socket_permission_check */ + +static int socket_permission_add (listen_socket_t *sock, /* {{{ */ + const char *cmd) +{ + ssize_t i; + + i = find_command_index (cmd); + if (i < 0) + return (-1); + assert (i < 32); + + sock->permissions |= (1 << i); + return (0); +} /* }}} int socket_permission_add */ + +static void socket_permission_clear (listen_socket_t *sock) /* {{{ */ +{ + sock->permissions = 0; +} /* }}} socket_permission_clear */ + +static void socket_permission_copy (listen_socket_t *dest, /* {{{ */ + listen_socket_t *src) +{ + dest->permissions = src->permissions; +} /* }}} socket_permission_copy */ + +static void socket_permission_set_all (listen_socket_t *sock) /* {{{ */ +{ + size_t i; + + sock->permissions = 0; + for (i = 0; i < list_of_commands_len; i++) + sock->permissions |= (1 << i); +} /* }}} void socket_permission_set_all */ + /* check whether commands are received in the expected context */ -static int command_check_context(listen_socket_t *sock, struct command *cmd) +static int command_check_context(listen_socket_t *sock, command_t *cmd) { - if (sock == NULL) + if (JOURNAL_REPLAY(sock)) return (cmd->context & CMD_CONTEXT_JOURNAL); else if (sock->batch_start) return (cmd->context & CMD_CONTEXT_BATCH); @@ -1687,7 +2190,7 @@ static int handle_request_help (HANDLER_PROTO) /* {{{ */ int status; char *cmd_str; char *resp_txt; - struct command *help = NULL; + command_t *help = NULL; status = buffer_get_field (&buffer, &buffer_size, &cmd_str); if (status == 0) @@ -1695,7 +2198,7 @@ static int handle_request_help (HANDLER_PROTO) /* {{{ */ if (help && (help->syntax || help->help)) { - char tmp[CMD_MAX]; + char tmp[RRD_CMD_MAX]; snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd); resp_txt = tmp; @@ -1708,26 +2211,26 @@ static int handle_request_help (HANDLER_PROTO) /* {{{ */ } else { - help = COMMANDS; + size_t i; + resp_txt = "Command overview\n"; - while (help->cmd) + for (i = 0; i < list_of_commands_len; i++) { - if (help->syntax) - add_response_info(sock, "%s", help->syntax); - help++; + if (list_of_commands[i].syntax == NULL) + continue; + add_response_info (sock, "%s", list_of_commands[i].syntax); } } return send_response(sock, RESP_OK, resp_txt); } /* }}} int handle_request_help */ -/* if sock==NULL, we are in journal replay mode */ static int handle_request (DISPATCH_PROTO) /* {{{ */ { char *buffer_ptr = buffer; char *cmd_str = NULL; - struct command *cmd = NULL; + command_t *cmd = NULL; int status; assert (buffer[buffer_size - 1] == '\0'); @@ -1746,9 +2249,8 @@ static int handle_request (DISPATCH_PROTO) /* {{{ */ if (!cmd) return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str); - status = has_privilege(sock, cmd->min_priv); - if (status <= 0) - return status; + if (!socket_permission_check (sock, cmd->cmd)) + return send_response(sock, RESP_ERR, "Permission denied.\n"); if (!command_check_context(sock, cmd)) return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str); @@ -1931,7 +2433,7 @@ static int journal_replay (const char *file) /* {{{ */ int entry_cnt = 0; int fail_cnt = 0; uint64_t line = 0; - char entry[CMD_MAX]; + char entry[RRD_CMD_MAX]; time_t now; if (file == NULL) return 0; @@ -2062,6 +2564,10 @@ static void journal_init(void) /* {{{ */ } dir = opendir(journal_dir); + if (!dir) { + RRDD_LOG(LOG_CRIT, "journal_init: opendir(%s) failed\n", journal_dir); + return; + } while ((dent = readdir(dir)) != NULL) { /* looks like a journal file? */ @@ -2137,6 +2643,21 @@ static void *connection_thread_main (void *args) /* {{{ */ } pthread_mutex_lock (&connection_threads_lock); +#ifdef HAVE_LIBWRAP + /* LIBWRAP does not support multiple threads! By putting this code + inside pthread_mutex_lock we do not have to worry about request_info + getting overwritten by another thread. + */ + struct request_info req; + request_init(&req, RQ_DAEMON, "rrdcached\0", RQ_FILE, fd, NULL ); + fromhost(&req); + if(!hosts_access(&req)) { + RRDD_LOG(LOG_INFO, "refused connection from %s", eval_client(&req)); + pthread_mutex_unlock (&connection_threads_lock); + close_connection(sock); + return NULL; + } +#endif /* HAVE_LIBWRAP */ connection_threads_num++; pthread_mutex_unlock (&connection_threads_lock); @@ -2222,11 +2743,31 @@ static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */ listen_socket_t *temp; int status; const char *path; + char *path_copy, *dir; path = sock->addr; if (strncmp(path, "unix:", strlen("unix:")) == 0) path += strlen("unix:"); + /* dirname may modify its argument */ + path_copy = strdup(path); + if (path_copy == NULL) + { + fprintf(stderr, "rrdcached: strdup(): %s\n", + rrd_strerror(errno)); + return (-1); + } + + dir = dirname(path_copy); + if (rrd_mkdir_p(dir, 0777) != 0) + { + fprintf(stderr, "Failed to create socket directory '%s': %s\n", + dir, rrd_strerror(errno)); + return (-1); + } + + free(path_copy); + temp = (listen_socket_t *) rrd_realloc (listen_fds, sizeof (listen_fds[0]) * (listen_fds_num + 1)); if (temp == NULL) @@ -2264,6 +2805,23 @@ static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */ return (-1); } + /* tweak the sockets group ownership */ + if (sock->socket_group != (gid_t)-1) + { + if ( (chown(path, getuid(), sock->socket_group) != 0) || + (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) ) + { + fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno)); + } + } + + if (sock->socket_permissions != (mode_t)-1) + { + if (chmod(path, sock->socket_permissions) != 0) + fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n", + (unsigned int)sock->socket_permissions, strerror(errno)); + } + status = listen (fd, /* backlog = */ 10); if (status != 0) { @@ -2329,8 +2887,8 @@ static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */ fprintf (stderr, "rrdcached: Garbage after address: %s\n", port); return (-1); } - } /* if (*addr = ']') */ - else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */ + } /* if (*addr == '[') */ + else { port = rindex(addr, ':'); if (port != NULL) @@ -2436,7 +2994,7 @@ static int close_listen_sockets (void) /* {{{ */ return (0); } /* }}} int close_listen_sockets */ -static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */ +static void *listen_thread_main (void UNUSED(*args)) /* {{{ */ { struct pollfd *pollfds; int pollfds_num; @@ -2573,10 +3131,14 @@ static int daemonize (void) /* {{{ */ } else { - listen_socket_t sock; - memset(&sock, 0, sizeof(sock)); - strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1); - open_listen_socket (&sock); + strncpy(default_socket.addr, RRDCACHED_DEFAULT_ADDRESS, + sizeof(default_socket.addr) - 1); + default_socket.addr[sizeof(default_socket.addr) - 1] = '\0'; + + if (default_socket.permissions == 0) + socket_permission_set_all (&default_socket); + + open_listen_socket (&default_socket); } if (listen_fds_num < 1) @@ -2660,7 +3222,6 @@ static int cleanup (void) /* {{{ */ free(queue_threads); free(config_base_dir); - free(config_pid_file); pthread_mutex_lock(&cache_lock); g_tree_destroy(cache_tree); @@ -2672,6 +3233,7 @@ static int cleanup (void) /* {{{ */ closelog (); remove_pidfile (); + free(config_pid_file); return (0); } /* }}} int cleanup */ @@ -2681,15 +3243,23 @@ static int read_options (int argc, char **argv) /* {{{ */ int option; int status = 0; - while ((option = getopt(argc, argv, "gl:L:f:w:z:t:Bb:p:Fj:h?")) != -1) + socket_permission_clear (&default_socket); + + default_socket.socket_group = (gid_t)-1; + default_socket.socket_permissions = (mode_t)-1; + + while ((option = getopt(argc, argv, "Ogl:s:m:P:f:w:z:t:Bb:p:Fj:a:h?")) != -1) { switch (option) { + case 'O': + opt_no_overwrite = 1; + break; + case 'g': stay_foreground=1; break; - case 'L': case 'l': { listen_socket_t *new; @@ -2703,7 +3273,21 @@ static int read_options (int argc, char **argv) /* {{{ */ memset(new, 0, sizeof(listen_socket_t)); strncpy(new->addr, optarg, sizeof(new->addr)-1); - new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW; + + /* Add permissions to the socket {{{ */ + if (default_socket.permissions != 0) + { + socket_permission_copy (new, &default_socket); + } + else /* if (default_socket.permissions == 0) */ + { + /* Add permission for ALL commands to the socket. */ + socket_permission_set_all (new); + } + /* }}} Done adding permissions. */ + + new->socket_group = default_socket.socket_group; + new->socket_permissions = default_socket.socket_permissions; if (!rrd_add_ptr((void ***)&config_listen_address_list, &config_listen_address_list_len, new)) @@ -2714,6 +3298,83 @@ static int read_options (int argc, char **argv) /* {{{ */ } break; + /* set socket group permissions */ + case 's': + { + gid_t group_gid; + struct group *grp; + + group_gid = strtoul(optarg, NULL, 10); + if (errno != EINVAL && group_gid>0) + { + /* we were passed a number */ + grp = getgrgid(group_gid); + } + else + { + grp = getgrnam(optarg); + } + + if (grp) + { + default_socket.socket_group = grp->gr_gid; + } + else + { + /* no idea what the user wanted... */ + fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg); + return (5); + } + } + break; + + /* set socket file permissions */ + case 'm': + { + long tmp; + char *endptr = NULL; + + tmp = strtol (optarg, &endptr, 8); + if ((endptr == optarg) || (! endptr) || (*endptr != '\0') + || (tmp > 07777) || (tmp < 0)) { + fprintf (stderr, "read_options: Invalid file mode \"%s\".\n", + optarg); + return (5); + } + + default_socket.socket_permissions = (mode_t)tmp; + } + break; + + case 'P': + { + char *optcopy; + char *saveptr; + char *dummy; + char *ptr; + + socket_permission_clear (&default_socket); + + optcopy = strdup (optarg); + dummy = optcopy; + saveptr = NULL; + while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL) + { + dummy = NULL; + status = socket_permission_add (&default_socket, ptr); + if (status != 0) + { + fprintf (stderr, "read_options: Adding permission \"%s\" to " + "socket failed. Most likely, this permission doesn't " + "exist. Check your command line.\n", ptr); + status = 4; + } + } + + free (optcopy); + } + break; + case 'f': { int temp; @@ -2792,6 +3453,13 @@ static int read_options (int argc, char **argv) /* {{{ */ return (3); } + if (rrd_mkdir_p (config_base_dir, 0777) != 0) + { + fprintf (stderr, "Failed to create base directory '%s': %s\n", + config_base_dir, rrd_strerror (errno)); + return (3); + } + /* make sure that the base directory is not resolved via * symbolic links. this makes some performance-enhancing * assumptions possible (we don't have to resolve paths @@ -2799,17 +3467,8 @@ static int read_options (int argc, char **argv) /* {{{ */ */ if (realpath(config_base_dir, base_realpath) == NULL) { - fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir); - return 5; - } - else if (strncmp(config_base_dir, - base_realpath, sizeof(base_realpath)) != 0) - { - fprintf(stderr, - "Base directory (-b) resolved via file system links!\n" - "Please consult rrdcached '-b' documentation!\n" - "Consider specifying the real directory (%s)\n", - base_realpath); + fprintf (stderr, "Failed to canonicalize the base directory '%s': " + "%s\n", config_base_dir, rrd_strerror(errno)); return 5; } @@ -2827,6 +3486,24 @@ static int read_options (int argc, char **argv) /* {{{ */ } _config_base_dir_len = len; + + len = strlen (base_realpath); + while ((len > 0) && (base_realpath[len - 1] == '/')) + { + base_realpath[len - 1] = '\0'; + len--; + } + + if (strncmp(config_base_dir, + base_realpath, sizeof(base_realpath)) != 0) + { + fprintf(stderr, + "Base directory (-b) resolved via file system links!\n" + "Please consult rrdcached '-b' documentation!\n" + "Consider specifying the real directory (%s)\n", + base_realpath); + return 5; + } } break; @@ -2849,35 +3526,59 @@ static int read_options (int argc, char **argv) /* {{{ */ case 'j': { - struct stat statbuf; - const char *dir = journal_dir = strdup(optarg); - - status = stat(dir, &statbuf); - if (status != 0) - { - fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno)); - return 6; - } + char journal_dir_actual[PATH_MAX]; + journal_dir = realpath((const char *)optarg, journal_dir_actual); + if (journal_dir) + { + // if we were able to properly resolve the path, lets have a copy + // for use outside this block. + journal_dir = strdup(journal_dir); + status = rrd_mkdir_p(journal_dir, 0777); + if (status != 0) + { + fprintf(stderr, "Failed to create journal directory '%s': %s\n", + journal_dir, rrd_strerror(errno)); + return 6; + } + if (access(journal_dir, R_OK|W_OK|X_OK) != 0) + { + fprintf(stderr, "Must specify a writable directory with -j! (%s)\n", + errno ? rrd_strerror(errno) : ""); + return 6; + } + } else { + fprintf(stderr, "Unable to resolve journal path (%s,%s)\n", optarg, + errno ? rrd_strerror(errno) : ""); + return 6; + } + } + break; - if (!S_ISDIR(statbuf.st_mode) - || access(dir, R_OK|W_OK|X_OK) != 0) + case 'a': + { + int temp = atoi(optarg); + if (temp > 0) + config_alloc_chunk = temp; + else { - fprintf(stderr, "Must specify a writable directory with -j! (%s)\n", - errno ? rrd_strerror(errno) : ""); - return 6; + fprintf(stderr, "Invalid allocation size: %s\n", optarg); + return 10; } } break; case 'h': case '?': - printf ("RRDCacheD %s Copyright (C) 2008 Florian octo Forster\n" + printf ("RRDCacheD %s\n" + "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n" "\n" "Usage: rrdcached [options]\n" "\n" "Valid options are:\n" " -l
Socket address to listen to.\n" - " -L
Socket address to listen to ('FLUSH' only).\n" + " Default: "RRDCACHED_DEFAULT_ADDRESS"\n" + " -P Sets the permissions to assign to all following " + "sockets\n" " -w Interval in which to write data.\n" " -z Delay writes up to seconds to spread load\n" " -t Number of write threads.\n" @@ -2888,12 +3589,23 @@ static int read_options (int argc, char **argv) /* {{{ */ " -g Do not fork and run in the foreground.\n" " -j Directory in which to create the journal files.\n" " -F Always flush all updates at shutdown\n" + " -s Group owner of all following UNIX sockets\n" + " (the socket will also have read/write permissions " + "for that group)\n" + " -m File permissions (octal) of all following UNIX " + "sockets\n" + " -a Memory allocation chunk size. Default is 1.\n" + " -O Do not allow CREATE commands to overwrite existing\n" + " files, even if asked to.\n" "\n" "For more information and a detailed description of all options " "please refer\n" "to the rrdcached(1) manual page.\n", VERSION); - status = -1; + if (option == 'h') + status = -1; + else + status = 1; break; } /* switch (option) */ } /* while (getopt) */