X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Frrd_daemon.c;h=ecb82616ac8c98b40bba42f0989b982a9aceb657;hb=refs%2Fheads%2Fff%2Frrdc_fetch-graph;hp=308d3b6b1f42bc062469209da91ca5da0adaff02;hpb=7339e36323bf1e6954be28e317b7ca0db8e56def;p=rrdtool.git diff --git a/src/rrd_daemon.c b/src/rrd_daemon.c index 308d3b6..ecb8261 100644 --- a/src/rrd_daemon.c +++ b/src/rrd_daemon.c @@ -1,6 +1,6 @@ /** * RRDTool - src/rrd_daemon.c - * Copyright (C) 2008,2009 Florian octo Forster + * Copyright (C) 2008-2010 Florian octo Forster * Copyright (C) 2008,2009 Kevin Brintnall * * This program is free software; you can redistribute it and/or modify it @@ -105,6 +105,7 @@ #include #include #include +#include #include /* }}} */ @@ -264,6 +265,7 @@ static uint64_t stats_journal_rotate = 0; static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER; /* Journaled updates */ +#define JOURNAL_REPLAY(s) ((s) == NULL) #define JOURNAL_BASE "rrd.journal" static journal_set *journal_cur = NULL; static journal_set *journal_old = NULL; @@ -348,12 +350,32 @@ static void install_signal_handlers(void) /* {{{ */ static int open_pidfile(char *action, int oflag) /* {{{ */ { int fd; - char *file; + const char *file; + char *file_copy, *dir; file = (config_pid_file != NULL) ? config_pid_file : LOCALSTATEDIR "/run/rrdcached.pid"; + /* dirname may modify its argument */ + file_copy = strdup(file); + if (file_copy == NULL) + { + fprintf(stderr, "rrdcached: strdup(): %s\n", + rrd_strerror(errno)); + return -1; + } + + dir = dirname(file_copy); + if (rrd_mkdir_p(dir, 0777) != 0) + { + fprintf(stderr, "Failed to create pidfile directory '%s': %s\n", + dir, rrd_strerror(errno)); + return -1; + } + + free(file_copy); + fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH); if (fd < 0) fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n", @@ -507,7 +529,7 @@ static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */ char buffer[CMD_MAX]; int len; - if (sock == NULL) return 0; /* journal replay mode */ + if (JOURNAL_REPLAY(sock)) return 0; if (sock->batch_start) return 0; /* no extra info returned when in BATCH */ va_start(argp, fmt); @@ -554,7 +576,7 @@ static int send_response (listen_socket_t *sock, response_code rc, ssize_t wrote; int rclen, len; - if (sock == NULL) return rc; /* journal replay mode */ + if (JOURNAL_REPLAY(sock)) return rc; if (sock->batch_start) { @@ -1022,7 +1044,7 @@ static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */ assert(file != NULL); if (!config_write_base_only - || sock == NULL /* journal replay */ + || JOURNAL_REPLAY(sock) || config_base_dir == NULL) return 1; @@ -1251,7 +1273,7 @@ static int handle_request_forget(HANDLER_PROTO) /* {{{ */ if (found == TRUE) { - if (sock != NULL) + if (!JOURNAL_REPLAY(sock)) journal_write("forget", file); return send_response(sock, RESP_OK, "Gone!\n"); @@ -1291,7 +1313,8 @@ static int handle_request_update (HANDLER_PROTO) /* {{{ */ cache_item_t *ci; /* save it for the journal later */ - strncpy(orig_buf, buffer, sizeof(orig_buf)-1); + if (!JOURNAL_REPLAY(sock)) + strncpy(orig_buf, buffer, buffer_size); status = buffer_get_field (&buffer, &buffer_size, &file); if (status != 0) @@ -1376,7 +1399,7 @@ static int handle_request_update (HANDLER_PROTO) /* {{{ */ assert (ci != NULL); /* don't re-write updates in replay mode */ - if (sock != NULL) + if (!JOURNAL_REPLAY(sock)) journal_write("update", orig_buf); while (buffer_size > 0) @@ -1440,6 +1463,176 @@ static int handle_request_update (HANDLER_PROTO) /* {{{ */ } /* }}} int handle_request_update */ +static int handle_request_fetch (HANDLER_PROTO) /* {{{ */ +{ + char *file; + char *cf; + + char *start_str; + char *end_str; + rrd_time_value_t start_tv; + rrd_time_value_t end_tv; + time_t start_tm; + time_t end_tm; + + unsigned long step; + unsigned long ds_cnt; + char **ds_namv; + rrd_value_t *data; + + int status; + unsigned long i; + time_t t; + rrd_value_t *data_ptr; + + file = NULL; + cf = NULL; + start_str = NULL; + end_str = NULL; + + /* Read the arguments */ + do /* while (0) */ + { + status = buffer_get_field (&buffer, &buffer_size, &file); + if (status != 0) + break; + + status = buffer_get_field (&buffer, &buffer_size, &cf); + if (status != 0) + break; + + status = buffer_get_field (&buffer, &buffer_size, &start_str); + if (status != 0) + { + start_str = NULL; + status = 0; + break; + } + + status = buffer_get_field (&buffer, &buffer_size, &end_str); + if (status != 0) + { + end_str = NULL; + status = 0; + break; + } + } while (0); + + if (status != 0) + return (syntax_error(sock,cmd)); + + status = flush_file (file); + if ((status != 0) && (status != ENOENT)) + return (send_response (sock, RESP_ERR, + "flush_file (%s) failed with status %i.\n", file, status)); + + /* Parse start time */ + if (start_str != NULL) + { + const char *errmsg; + + errmsg = rrd_parsetime (start_str, &start_tv); + if (errmsg != NULL) + return (send_response(sock, RESP_ERR, + "Cannot parse start time `%s': %s\n", start_str, errmsg)); + } + else + rrd_parsetime ("-86400", &start_tv); + + /* Parse end time */ + if (end_str != NULL) + { + const char *errmsg; + + errmsg = rrd_parsetime (end_str, &end_tv); + if (errmsg != NULL) + return (send_response(sock, RESP_ERR, + "Cannot parse end time `%s': %s\n", end_str, errmsg)); + } + else + rrd_parsetime ("now", &end_tv); + + start_tm = 0; + end_tm = 0; + status = rrd_proc_start_end (&start_tv, &end_tv, &start_tm, &end_tm); + if (status != 0) + return (send_response(sock, RESP_ERR, + "rrd_proc_start_end failed: %s\n", rrd_get_error ())); + + step = -1; + ds_cnt = 0; + ds_namv = NULL; + data = NULL; + + status = rrd_fetch_r (file, cf, &start_tm, &end_tm, &step, + &ds_cnt, &ds_namv, &data); + if (status != 0) + return (send_response(sock, RESP_ERR, + "rrd_fetch_r failed: %s\n", rrd_get_error ())); + + add_response_info (sock, "FlushVersion: %lu\n", 1); + add_response_info (sock, "Start: %lu\n", (unsigned long) start_tm); + add_response_info (sock, "End: %lu\n", (unsigned long) end_tm); + add_response_info (sock, "Step: %lu\n", step); + add_response_info (sock, "DSCount: %lu\n", ds_cnt); + +#define SSTRCAT(buffer,str,buffer_fill) do { \ + size_t str_len = strlen (str); \ + if ((buffer_fill + str_len) > sizeof (buffer)) \ + str_len = sizeof (buffer) - buffer_fill; \ + if (str_len > 0) { \ + strncpy (buffer + buffer_fill, str, str_len); \ + buffer_fill += str_len; \ + assert (buffer_fill <= sizeof (buffer)); \ + if (buffer_fill == sizeof (buffer)) \ + buffer[buffer_fill - 1] = 0; \ + else \ + buffer[buffer_fill] = 0; \ + } \ + } while (0) + + { /* Add list of DS names */ + char linebuf[1024]; + size_t linebuf_fill; + + memset (linebuf, 0, sizeof (linebuf)); + linebuf_fill = 0; + for (i = 0; i < ds_cnt; i++) + { + if (i > 0) + SSTRCAT (linebuf, " ", linebuf_fill); + SSTRCAT (linebuf, ds_namv[i], linebuf_fill); + } + add_response_info (sock, "DSName: %s\n", linebuf); + } + + /* Add the actual data */ + assert (step > 0); + data_ptr = data; + for (t = start_tm + step; t <= end_tm; t += step) + { + char linebuf[1024]; + size_t linebuf_fill; + char tmp[128]; + + memset (linebuf, 0, sizeof (linebuf)); + linebuf_fill = 0; + for (i = 0; i < ds_cnt; i++) + { + snprintf (tmp, sizeof (tmp), " %0.10e", *data_ptr); + tmp[sizeof (tmp) - 1] = 0; + SSTRCAT (linebuf, tmp, linebuf_fill); + + data_ptr++; + } + + add_response_info (sock, "%10lu:%s\n", (unsigned long) t, linebuf); + } /* for (t) */ + + return (send_response (sock, RESP_OK, "Success\n")); +#undef SSTRCAT +} /* }}} int handle_request_fetch */ + /* we came across a "WROTE" entry during journal replay. * throw away any values that we have accumulated for this file */ @@ -1612,6 +1805,14 @@ static command_t list_of_commands[] = { /* {{{ */ NULL }, { + "FETCH", + handle_request_fetch, + CMD_CONTEXT_CLIENT, + "FETCH [ []]\n" + , + "The 'FETCH' can be used by the client to retrieve values from an RRD file.\n" + }, + { "QUIT", handle_request_quit, CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH, @@ -1652,6 +1853,9 @@ static int socket_permission_check (listen_socket_t *sock, /* {{{ */ { ssize_t i; + if (JOURNAL_REPLAY(sock)) + return (1); + if (cmd == NULL) return (-1); @@ -1688,7 +1892,7 @@ static int socket_permission_add (listen_socket_t *sock, /* {{{ */ /* check whether commands are received in the expected context */ static int command_check_context(listen_socket_t *sock, command_t *cmd) { - if (sock == NULL) + if (JOURNAL_REPLAY(sock)) return (cmd->context & CMD_CONTEXT_JOURNAL); else if (sock->batch_start) return (cmd->context & CMD_CONTEXT_BATCH); @@ -1740,7 +1944,6 @@ static int handle_request_help (HANDLER_PROTO) /* {{{ */ return send_response(sock, RESP_OK, resp_txt); } /* }}} int handle_request_help */ -/* if sock==NULL, we are in journal replay mode */ static int handle_request (DISPATCH_PROTO) /* {{{ */ { char *buffer_ptr = buffer; @@ -2239,11 +2442,31 @@ static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */ listen_socket_t *temp; int status; const char *path; + char *path_copy, *dir; path = sock->addr; if (strncmp(path, "unix:", strlen("unix:")) == 0) path += strlen("unix:"); + /* dirname may modify its argument */ + path_copy = strdup(path); + if (path_copy == NULL) + { + fprintf(stderr, "rrdcached: strdup(): %s\n", + rrd_strerror(errno)); + return (-1); + } + + dir = dirname(path_copy); + if (rrd_mkdir_p(dir, 0777) != 0) + { + fprintf(stderr, "Failed to create socket directory '%s': %s\n", + dir, rrd_strerror(errno)); + return (-1); + } + + free(path_copy); + temp = (listen_socket_t *) rrd_realloc (listen_fds, sizeof (listen_fds[0]) * (listen_fds_num + 1)); if (temp == NULL) @@ -2346,8 +2569,8 @@ static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */ fprintf (stderr, "rrdcached: Garbage after address: %s\n", port); return (-1); } - } /* if (*addr = ']') */ - else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */ + } /* if (*addr == '[') */ + else { port = rindex(addr, ':'); if (port != NULL) @@ -2677,7 +2900,6 @@ static int cleanup (void) /* {{{ */ free(queue_threads); free(config_base_dir); - free(config_pid_file); pthread_mutex_lock(&cache_lock); g_tree_destroy(cache_tree); @@ -2689,6 +2911,7 @@ static int cleanup (void) /* {{{ */ closelog (); remove_pidfile (); + free(config_pid_file); return (0); } /* }}} int cleanup */ @@ -2866,6 +3089,13 @@ static int read_options (int argc, char **argv) /* {{{ */ return (3); } + if (rrd_mkdir_p (config_base_dir, 0777) != 0) + { + fprintf (stderr, "Failed to create base directory '%s': %s\n", + config_base_dir, rrd_strerror (errno)); + return (3); + } + /* make sure that the base directory is not resolved via * symbolic links. this makes some performance-enhancing * assumptions possible (we don't have to resolve paths @@ -2873,17 +3103,8 @@ static int read_options (int argc, char **argv) /* {{{ */ */ if (realpath(config_base_dir, base_realpath) == NULL) { - fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir); - return 5; - } - else if (strncmp(config_base_dir, - base_realpath, sizeof(base_realpath)) != 0) - { - fprintf(stderr, - "Base directory (-b) resolved via file system links!\n" - "Please consult rrdcached '-b' documentation!\n" - "Consider specifying the real directory (%s)\n", - base_realpath); + fprintf (stderr, "Failed to canonicalize the base directory '%s': " + "%s\n", config_base_dir, rrd_strerror(errno)); return 5; } @@ -2901,6 +3122,24 @@ static int read_options (int argc, char **argv) /* {{{ */ } _config_base_dir_len = len; + + len = strlen (base_realpath); + while ((len > 0) && (base_realpath[len - 1] == '/')) + { + base_realpath[len - 1] = '\0'; + len--; + } + + if (strncmp(config_base_dir, + base_realpath, sizeof(base_realpath)) != 0) + { + fprintf(stderr, + "Base directory (-b) resolved via file system links!\n" + "Please consult rrdcached '-b' documentation!\n" + "Consider specifying the real directory (%s)\n", + base_realpath); + return 5; + } } break;