X-Git-Url: https://git.octo.it/?p=rrdtool.git;a=blobdiff_plain;f=src%2Frrd_daemon.c;h=4c3d7ed4653d30256c35909a15cc7b1c7e2fb285;hp=19460032ad911d9c98613ec5faebf959da769e7c;hb=cea28dc31a16fcc1f1b8b95aee3fada2fbd2aca0;hpb=14e4172649ba3fd67f5d82addf3813928ee3ba23 diff --git a/src/rrd_daemon.c b/src/rrd_daemon.c index 1946003..4c3d7ed 100644 --- a/src/rrd_daemon.c +++ b/src/rrd_daemon.c @@ -1,6 +1,6 @@ /** * RRDTool - src/rrd_daemon.c - * Copyright (C) 2008,2009 Florian octo Forster + * Copyright (C) 2008-2010 Florian octo Forster * Copyright (C) 2008,2009 Kevin Brintnall * * This program is free software; you can redistribute it and/or modify it @@ -73,6 +73,7 @@ #include "rrd.h" #include "rrd_client.h" +#include "unused.h" #include @@ -106,15 +107,17 @@ #include #include #include +#include #include /* }}} */ -#define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__) - -#ifndef __GNUC__ -# define __attribute__(x) /**/ -#endif +#define RRDD_LOG(severity, ...) \ + do { \ + if (stay_foreground) \ + fprintf(stderr, __VA_ARGS__); \ + syslog ((severity), __VA_ARGS__); \ + } while (0) /* * Types @@ -140,18 +143,21 @@ struct listen_socket_s ssize_t wbuf_len; uint32_t permissions; + + gid_t socket_group; + mode_t socket_permissions; }; typedef struct listen_socket_s listen_socket_t; struct command_s; typedef struct command_s command_t; /* note: guard against "unused" warnings in the handlers */ -#define DISPATCH_PROTO listen_socket_t *sock __attribute__((unused)),\ - time_t now __attribute__((unused)),\ - char *buffer __attribute__((unused)),\ - size_t buffer_size __attribute__((unused)) +#define DISPATCH_PROTO listen_socket_t UNUSED(*sock),\ + time_t UNUSED(now),\ + char UNUSED(*buffer),\ + size_t UNUSED(buffer_size) -#define HANDLER_PROTO command_t *cmd __attribute__((unused)),\ +#define HANDLER_PROTO command_t UNUSED(*cmd),\ DISPATCH_PROTO struct command_s { @@ -174,7 +180,8 @@ struct cache_item_s { char *file; char **values; - size_t values_num; + size_t values_num; /* number of valid pointers */ + size_t values_alloc; /* number of allocated pointers */ time_t last_flush_time; time_t last_update_stamp; #define CI_FLAGS_IN_TREE (1<<0) @@ -251,6 +258,7 @@ static char *config_pid_file = NULL; static char *config_base_dir = NULL; static size_t _config_base_dir_len = 0; static int config_write_base_only = 0; +static size_t config_alloc_chunk = 1; static listen_socket_t **config_listen_address_list = NULL; static size_t config_listen_address_list_len = 0; @@ -265,6 +273,7 @@ static uint64_t stats_journal_rotate = 0; static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER; /* Journaled updates */ +#define JOURNAL_REPLAY(s) ((s) == NULL) #define JOURNAL_BASE "rrd.journal" static journal_set *journal_cur = NULL; static journal_set *journal_old = NULL; @@ -291,23 +300,23 @@ static void sig_common (const char *sig) /* {{{ */ pthread_cond_broadcast(&queue_cond); } /* }}} void sig_common */ -static void sig_int_handler (int s __attribute__((unused))) /* {{{ */ +static void sig_int_handler (int UNUSED(s)) /* {{{ */ { sig_common("INT"); } /* }}} void sig_int_handler */ -static void sig_term_handler (int s __attribute__((unused))) /* {{{ */ +static void sig_term_handler (int UNUSED(s)) /* {{{ */ { sig_common("TERM"); } /* }}} void sig_term_handler */ -static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */ +static void sig_usr1_handler (int UNUSED(s)) /* {{{ */ { config_flush_at_shutdown = 1; sig_common("USR1"); } /* }}} void sig_usr1_handler */ -static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */ +static void sig_usr2_handler (int UNUSED(s)) /* {{{ */ { config_flush_at_shutdown = 0; sig_common("USR2"); @@ -528,7 +537,7 @@ static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */ char buffer[CMD_MAX]; int len; - if (sock == NULL) return 0; /* journal replay mode */ + if (JOURNAL_REPLAY(sock)) return 0; if (sock->batch_start) return 0; /* no extra info returned when in BATCH */ va_start(argp, fmt); @@ -575,7 +584,7 @@ static int send_response (listen_socket_t *sock, response_code rc, ssize_t wrote; int rclen, len; - if (sock == NULL) return rc; /* journal replay mode */ + if (JOURNAL_REPLAY(sock)) return rc; if (sock->batch_start) { @@ -637,6 +646,7 @@ static void wipe_ci_values(cache_item_t *ci, time_t when) { ci->values = NULL; ci->values_num = 0; + ci->values_alloc = 0; ci->last_flush_time = when; if (config_write_jitter > 0) @@ -816,9 +826,10 @@ static int flush_old_values (int max_age) for (k = 0; k < cfd.keys_num; k++) { + gboolean status = g_tree_remove(cache_tree, cfd.keys[k]); /* should never fail, since we have held the cache_lock * the entire time */ - assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE ); + assert(status == TRUE); } if (cfd.keys != NULL) @@ -830,7 +841,7 @@ static int flush_old_values (int max_age) return (0); } /* int flush_old_values */ -static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */ +static void *flush_thread_main (void UNUSED(*args)) /* {{{ */ { struct timeval now; struct timespec next_flush; @@ -883,7 +894,7 @@ static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */ return NULL; } /* void *flush_thread_main */ -static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ +static void *queue_thread_main (void UNUSED(*args)) /* {{{ */ { pthread_mutex_lock (&cache_lock); @@ -1043,7 +1054,7 @@ static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */ assert(file != NULL); if (!config_write_base_only - || sock == NULL /* journal replay */ + || JOURNAL_REPLAY(sock) || config_base_dir == NULL) return 1; @@ -1272,7 +1283,7 @@ static int handle_request_forget(HANDLER_PROTO) /* {{{ */ if (found == TRUE) { - if (sock != NULL) + if (!JOURNAL_REPLAY(sock)) journal_write("forget", file); return send_response(sock, RESP_OK, "Gone!\n"); @@ -1312,7 +1323,7 @@ static int handle_request_update (HANDLER_PROTO) /* {{{ */ cache_item_t *ci; /* save it for the journal later */ - if (sock != NULL) + if (!JOURNAL_REPLAY(sock)) strncpy(orig_buf, buffer, buffer_size); status = buffer_get_field (&buffer, &buffer_size, &file); @@ -1398,7 +1409,7 @@ static int handle_request_update (HANDLER_PROTO) /* {{{ */ assert (ci != NULL); /* don't re-write updates in replay mode */ - if (sock != NULL) + if (!JOURNAL_REPLAY(sock)) journal_write("update", orig_buf); while (buffer_size > 0) @@ -1433,7 +1444,8 @@ static int handle_request_update (HANDLER_PROTO) /* {{{ */ else ci->last_update_stamp = stamp; - if (!rrd_add_strdup(&ci->values, &ci->values_num, value)) + if (!rrd_add_strdup_chunk(&ci->values, &ci->values_num, value, + &ci->values_alloc, config_alloc_chunk)) { RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed."); continue; @@ -1462,6 +1474,197 @@ static int handle_request_update (HANDLER_PROTO) /* {{{ */ } /* }}} int handle_request_update */ +static int handle_request_fetch (HANDLER_PROTO) /* {{{ */ +{ + char *file, file_tmp[PATH_MAX]; + char *cf; + + char *start_str; + char *end_str; + time_t start_tm; + time_t end_tm; + + unsigned long step; + unsigned long ds_cnt; + char **ds_namv; + rrd_value_t *data; + + int status; + unsigned long i; + time_t t; + rrd_value_t *data_ptr; + + file = NULL; + cf = NULL; + start_str = NULL; + end_str = NULL; + + /* Read the arguments */ + do /* while (0) */ + { + status = buffer_get_field (&buffer, &buffer_size, &file); + if (status != 0) + break; + + status = buffer_get_field (&buffer, &buffer_size, &cf); + if (status != 0) + break; + + status = buffer_get_field (&buffer, &buffer_size, &start_str); + if (status != 0) + { + start_str = NULL; + status = 0; + break; + } + + status = buffer_get_field (&buffer, &buffer_size, &end_str); + if (status != 0) + { + end_str = NULL; + status = 0; + break; + } + } while (0); + + if (status != 0) + return (syntax_error(sock,cmd)); + + get_abs_path(&file, file_tmp); + if (!check_file_access(file, sock)) return 0; + + status = flush_file (file); + if ((status != 0) && (status != ENOENT)) + return (send_response (sock, RESP_ERR, + "flush_file (%s) failed with status %i.\n", file, status)); + + t = time (NULL); /* "now" */ + + /* Parse start time */ + if (start_str != NULL) + { + char *endptr; + long value; + + endptr = NULL; + errno = 0; + value = strtol (start_str, &endptr, /* base = */ 0); + if ((endptr == start_str) || (errno != 0)) + return (send_response(sock, RESP_ERR, + "Cannot parse start time `%s': Only simple integers are allowed.\n", + start_str)); + + if (value > 0) + start_tm = (time_t) value; + else + start_tm = (time_t) (t + value); + } + else + { + start_tm = t - 86400; + } + + /* Parse end time */ + if (end_str != NULL) + { + char *endptr; + long value; + + endptr = NULL; + errno = 0; + value = strtol (end_str, &endptr, /* base = */ 0); + if ((endptr == end_str) || (errno != 0)) + return (send_response(sock, RESP_ERR, + "Cannot parse end time `%s': Only simple integers are allowed.\n", + end_str)); + + if (value > 0) + end_tm = (time_t) value; + else + end_tm = (time_t) (t + value); + } + else + { + end_tm = t; + } + + step = -1; + ds_cnt = 0; + ds_namv = NULL; + data = NULL; + + status = rrd_fetch_r (file, cf, &start_tm, &end_tm, &step, + &ds_cnt, &ds_namv, &data); + if (status != 0) + return (send_response(sock, RESP_ERR, + "rrd_fetch_r failed: %s\n", rrd_get_error ())); + + add_response_info (sock, "FlushVersion: %lu\n", 1); + add_response_info (sock, "Start: %lu\n", (unsigned long) start_tm); + add_response_info (sock, "End: %lu\n", (unsigned long) end_tm); + add_response_info (sock, "Step: %lu\n", step); + add_response_info (sock, "DSCount: %lu\n", ds_cnt); + +#define SSTRCAT(buffer,str,buffer_fill) do { \ + size_t str_len = strlen (str); \ + if ((buffer_fill + str_len) > sizeof (buffer)) \ + str_len = sizeof (buffer) - buffer_fill; \ + if (str_len > 0) { \ + strncpy (buffer + buffer_fill, str, str_len); \ + buffer_fill += str_len; \ + assert (buffer_fill <= sizeof (buffer)); \ + if (buffer_fill == sizeof (buffer)) \ + buffer[buffer_fill - 1] = 0; \ + else \ + buffer[buffer_fill] = 0; \ + } \ + } while (0) + + { /* Add list of DS names */ + char linebuf[1024]; + size_t linebuf_fill; + + memset (linebuf, 0, sizeof (linebuf)); + linebuf_fill = 0; + for (i = 0; i < ds_cnt; i++) + { + if (i > 0) + SSTRCAT (linebuf, " ", linebuf_fill); + SSTRCAT (linebuf, ds_namv[i], linebuf_fill); + rrd_freemem(ds_namv[i]); + } + rrd_freemem(ds_namv); + add_response_info (sock, "DSName: %s\n", linebuf); + } + + /* Add the actual data */ + assert (step > 0); + data_ptr = data; + for (t = start_tm + step; t <= end_tm; t += step) + { + char linebuf[1024]; + size_t linebuf_fill; + char tmp[128]; + + memset (linebuf, 0, sizeof (linebuf)); + linebuf_fill = 0; + for (i = 0; i < ds_cnt; i++) + { + snprintf (tmp, sizeof (tmp), " %0.10e", *data_ptr); + tmp[sizeof (tmp) - 1] = 0; + SSTRCAT (linebuf, tmp, linebuf_fill); + + data_ptr++; + } + + add_response_info (sock, "%10lu:%s\n", (unsigned long) t, linebuf); + } /* for (t) */ + rrd_freemem(data); + + return (send_response (sock, RESP_OK, "Success\n")); +#undef SSTRCAT +} /* }}} int handle_request_fetch */ + /* we came across a "WROTE" entry during journal replay. * throw away any values that we have accumulated for this file */ @@ -1634,6 +1837,14 @@ static command_t list_of_commands[] = { /* {{{ */ NULL }, { + "FETCH", + handle_request_fetch, + CMD_CONTEXT_CLIENT, + "FETCH [ []]\n" + , + "The 'FETCH' can be used by the client to retrieve values from an RRD file.\n" + }, + { "QUIT", handle_request_quit, CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH, @@ -1674,7 +1885,7 @@ static int socket_permission_check (listen_socket_t *sock, /* {{{ */ { ssize_t i; - if (sock == NULL) /* journal replay */ + if (JOURNAL_REPLAY(sock)) return (1); if (cmd == NULL) @@ -1713,7 +1924,7 @@ static int socket_permission_add (listen_socket_t *sock, /* {{{ */ /* check whether commands are received in the expected context */ static int command_check_context(listen_socket_t *sock, command_t *cmd) { - if (sock == NULL) + if (JOURNAL_REPLAY(sock)) return (cmd->context & CMD_CONTEXT_JOURNAL); else if (sock->batch_start) return (cmd->context & CMD_CONTEXT_BATCH); @@ -1765,7 +1976,6 @@ static int handle_request_help (HANDLER_PROTO) /* {{{ */ return send_response(sock, RESP_OK, resp_txt); } /* }}} int handle_request_help */ -/* if sock==NULL, we are in journal replay mode */ static int handle_request (DISPATCH_PROTO) /* {{{ */ { char *buffer_ptr = buffer; @@ -2104,6 +2314,10 @@ static void journal_init(void) /* {{{ */ } dir = opendir(journal_dir); + if (!dir) { + RRDD_LOG(LOG_CRIT, "journal_init: opendir(%s) failed\n", journal_dir); + return; + } while ((dent = readdir(dir)) != NULL) { /* looks like a journal file? */ @@ -2326,6 +2540,23 @@ static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */ return (-1); } + /* tweak the sockets group ownership */ + if (sock->socket_group != (gid_t)-1) + { + if ( (chown(path, getuid(), sock->socket_group) != 0) || + (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) ) + { + fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno)); + } + } + + if (sock->socket_permissions != (mode_t)-1) + { + if (chmod(path, sock->socket_permissions) != 0) + fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n", + (unsigned int)sock->socket_permissions, strerror(errno)); + } + status = listen (fd, /* backlog = */ 10); if (status != 0) { @@ -2498,7 +2729,7 @@ static int close_listen_sockets (void) /* {{{ */ return (0); } /* }}} int close_listen_sockets */ -static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */ +static void *listen_thread_main (void UNUSED(*args)) /* {{{ */ { struct pollfd *pollfds; int pollfds_num; @@ -2746,7 +2977,10 @@ static int read_options (int argc, char **argv) /* {{{ */ char **permissions = NULL; size_t permissions_len = 0; - while ((option = getopt(argc, argv, "gl:P:f:w:z:t:Bb:p:Fj:h?")) != -1) + gid_t socket_group = (gid_t)-1; + mode_t socket_permissions = (mode_t)-1; + + while ((option = getopt(argc, argv, "gl:s:m:P:f:w:z:t:Bb:p:Fj:a:h?")) != -1) { switch (option) { @@ -2802,6 +3036,9 @@ static int read_options (int argc, char **argv) /* {{{ */ } /* }}} Done adding permissions. */ + new->socket_group = socket_group; + new->socket_permissions = socket_permissions; + if (!rrd_add_ptr((void ***)&config_listen_address_list, &config_listen_address_list_len, new)) { @@ -2811,6 +3048,54 @@ static int read_options (int argc, char **argv) /* {{{ */ } break; + /* set socket group permissions */ + case 's': + { + gid_t group_gid; + struct group *grp; + + group_gid = strtoul(optarg, NULL, 10); + if (errno != EINVAL && group_gid>0) + { + /* we were passed a number */ + grp = getgrgid(group_gid); + } + else + { + grp = getgrnam(optarg); + } + + if (grp) + { + socket_group = grp->gr_gid; + } + else + { + /* no idea what the user wanted... */ + fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg); + return (5); + } + } + break; + + /* set socket file permissions */ + case 'm': + { + long tmp; + char *endptr = NULL; + + tmp = strtol (optarg, &endptr, 8); + if ((endptr == optarg) || (! endptr) || (*endptr != '\0') + || (tmp > 07777) || (tmp < 0)) { + fprintf (stderr, "read_options: Invalid file mode \"%s\".\n", + optarg); + return (5); + } + + socket_permissions = (mode_t)tmp; + } + break; + case 'P': { char *optcopy; @@ -2984,7 +3269,9 @@ static int read_options (int argc, char **argv) /* {{{ */ case 'j': { - const char *dir = journal_dir = strdup(optarg); + char journal_dir_actual[PATH_MAX]; + const char *dir; + dir = journal_dir = strdup(realpath((const char *)optarg, journal_dir_actual)); status = rrd_mkdir_p(dir, 0777); if (status != 0) @@ -3003,6 +3290,19 @@ static int read_options (int argc, char **argv) /* {{{ */ } break; + case 'a': + { + int temp = atoi(optarg); + if (temp > 0) + config_alloc_chunk = temp; + else + { + fprintf(stderr, "Invalid allocation size: %s\n", optarg); + return 10; + } + } + break; + case 'h': case '?': printf ("RRDCacheD %s\n" @@ -3024,12 +3324,21 @@ static int read_options (int argc, char **argv) /* {{{ */ " -g Do not fork and run in the foreground.\n" " -j Directory in which to create the journal files.\n" " -F Always flush all updates at shutdown\n" + " -s Group owner of all following UNIX sockets\n" + " (the socket will also have read/write permissions " + "for that group)\n" + " -m File permissions (octal) of all following UNIX " + "sockets\n" + " -a Memory allocation chunk size. Default is 1." "\n" "For more information and a detailed description of all options " "please refer\n" "to the rrdcached(1) manual page.\n", VERSION); - status = -1; + if (option == 'h') + status = -1; + else + status = 1; break; } /* switch (option) */ } /* while (getopt) */