X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Frrd_daemon.c;h=ecb82616ac8c98b40bba42f0989b982a9aceb657;hb=refs%2Fheads%2Fff%2Frrdc_fetch-graph;hp=0422389a7567706b82ae7bc1fb6a4e1065ee2e99;hpb=47e742a9618494029c6c4c32edeeb1353bf963d4;p=rrdtool.git diff --git a/src/rrd_daemon.c b/src/rrd_daemon.c index 0422389..ecb8261 100644 --- a/src/rrd_daemon.c +++ b/src/rrd_daemon.c @@ -1,6 +1,6 @@ /** * RRDTool - src/rrd_daemon.c - * Copyright (C) 2008,2009 Florian octo Forster + * Copyright (C) 2008-2010 Florian octo Forster * Copyright (C) 2008,2009 Kevin Brintnall * * This program is free software; you can redistribute it and/or modify it @@ -265,6 +265,7 @@ static uint64_t stats_journal_rotate = 0; static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER; /* Journaled updates */ +#define JOURNAL_REPLAY(s) ((s) == NULL) #define JOURNAL_BASE "rrd.journal" static journal_set *journal_cur = NULL; static journal_set *journal_old = NULL; @@ -528,7 +529,7 @@ static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */ char buffer[CMD_MAX]; int len; - if (sock == NULL) return 0; /* journal replay mode */ + if (JOURNAL_REPLAY(sock)) return 0; if (sock->batch_start) return 0; /* no extra info returned when in BATCH */ va_start(argp, fmt); @@ -575,7 +576,7 @@ static int send_response (listen_socket_t *sock, response_code rc, ssize_t wrote; int rclen, len; - if (sock == NULL) return rc; /* journal replay mode */ + if (JOURNAL_REPLAY(sock)) return rc; if (sock->batch_start) { @@ -1043,7 +1044,7 @@ static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */ assert(file != NULL); if (!config_write_base_only - || sock == NULL /* journal replay */ + || JOURNAL_REPLAY(sock) || config_base_dir == NULL) return 1; @@ -1272,7 +1273,7 @@ static int handle_request_forget(HANDLER_PROTO) /* {{{ */ if (found == TRUE) { - if (sock != NULL) + if (!JOURNAL_REPLAY(sock)) journal_write("forget", file); return send_response(sock, RESP_OK, "Gone!\n"); @@ -1312,7 +1313,8 @@ static int handle_request_update (HANDLER_PROTO) /* {{{ */ cache_item_t *ci; /* save it for the journal later */ - strncpy(orig_buf, buffer, sizeof(orig_buf)-1); + if (!JOURNAL_REPLAY(sock)) + strncpy(orig_buf, buffer, buffer_size); status = buffer_get_field (&buffer, &buffer_size, &file); if (status != 0) @@ -1397,7 +1399,7 @@ static int handle_request_update (HANDLER_PROTO) /* {{{ */ assert (ci != NULL); /* don't re-write updates in replay mode */ - if (sock != NULL) + if (!JOURNAL_REPLAY(sock)) journal_write("update", orig_buf); while (buffer_size > 0) @@ -1461,6 +1463,176 @@ static int handle_request_update (HANDLER_PROTO) /* {{{ */ } /* }}} int handle_request_update */ +static int handle_request_fetch (HANDLER_PROTO) /* {{{ */ +{ + char *file; + char *cf; + + char *start_str; + char *end_str; + rrd_time_value_t start_tv; + rrd_time_value_t end_tv; + time_t start_tm; + time_t end_tm; + + unsigned long step; + unsigned long ds_cnt; + char **ds_namv; + rrd_value_t *data; + + int status; + unsigned long i; + time_t t; + rrd_value_t *data_ptr; + + file = NULL; + cf = NULL; + start_str = NULL; + end_str = NULL; + + /* Read the arguments */ + do /* while (0) */ + { + status = buffer_get_field (&buffer, &buffer_size, &file); + if (status != 0) + break; + + status = buffer_get_field (&buffer, &buffer_size, &cf); + if (status != 0) + break; + + status = buffer_get_field (&buffer, &buffer_size, &start_str); + if (status != 0) + { + start_str = NULL; + status = 0; + break; + } + + status = buffer_get_field (&buffer, &buffer_size, &end_str); + if (status != 0) + { + end_str = NULL; + status = 0; + break; + } + } while (0); + + if (status != 0) + return (syntax_error(sock,cmd)); + + status = flush_file (file); + if ((status != 0) && (status != ENOENT)) + return (send_response (sock, RESP_ERR, + "flush_file (%s) failed with status %i.\n", file, status)); + + /* Parse start time */ + if (start_str != NULL) + { + const char *errmsg; + + errmsg = rrd_parsetime (start_str, &start_tv); + if (errmsg != NULL) + return (send_response(sock, RESP_ERR, + "Cannot parse start time `%s': %s\n", start_str, errmsg)); + } + else + rrd_parsetime ("-86400", &start_tv); + + /* Parse end time */ + if (end_str != NULL) + { + const char *errmsg; + + errmsg = rrd_parsetime (end_str, &end_tv); + if (errmsg != NULL) + return (send_response(sock, RESP_ERR, + "Cannot parse end time `%s': %s\n", end_str, errmsg)); + } + else + rrd_parsetime ("now", &end_tv); + + start_tm = 0; + end_tm = 0; + status = rrd_proc_start_end (&start_tv, &end_tv, &start_tm, &end_tm); + if (status != 0) + return (send_response(sock, RESP_ERR, + "rrd_proc_start_end failed: %s\n", rrd_get_error ())); + + step = -1; + ds_cnt = 0; + ds_namv = NULL; + data = NULL; + + status = rrd_fetch_r (file, cf, &start_tm, &end_tm, &step, + &ds_cnt, &ds_namv, &data); + if (status != 0) + return (send_response(sock, RESP_ERR, + "rrd_fetch_r failed: %s\n", rrd_get_error ())); + + add_response_info (sock, "FlushVersion: %lu\n", 1); + add_response_info (sock, "Start: %lu\n", (unsigned long) start_tm); + add_response_info (sock, "End: %lu\n", (unsigned long) end_tm); + add_response_info (sock, "Step: %lu\n", step); + add_response_info (sock, "DSCount: %lu\n", ds_cnt); + +#define SSTRCAT(buffer,str,buffer_fill) do { \ + size_t str_len = strlen (str); \ + if ((buffer_fill + str_len) > sizeof (buffer)) \ + str_len = sizeof (buffer) - buffer_fill; \ + if (str_len > 0) { \ + strncpy (buffer + buffer_fill, str, str_len); \ + buffer_fill += str_len; \ + assert (buffer_fill <= sizeof (buffer)); \ + if (buffer_fill == sizeof (buffer)) \ + buffer[buffer_fill - 1] = 0; \ + else \ + buffer[buffer_fill] = 0; \ + } \ + } while (0) + + { /* Add list of DS names */ + char linebuf[1024]; + size_t linebuf_fill; + + memset (linebuf, 0, sizeof (linebuf)); + linebuf_fill = 0; + for (i = 0; i < ds_cnt; i++) + { + if (i > 0) + SSTRCAT (linebuf, " ", linebuf_fill); + SSTRCAT (linebuf, ds_namv[i], linebuf_fill); + } + add_response_info (sock, "DSName: %s\n", linebuf); + } + + /* Add the actual data */ + assert (step > 0); + data_ptr = data; + for (t = start_tm + step; t <= end_tm; t += step) + { + char linebuf[1024]; + size_t linebuf_fill; + char tmp[128]; + + memset (linebuf, 0, sizeof (linebuf)); + linebuf_fill = 0; + for (i = 0; i < ds_cnt; i++) + { + snprintf (tmp, sizeof (tmp), " %0.10e", *data_ptr); + tmp[sizeof (tmp) - 1] = 0; + SSTRCAT (linebuf, tmp, linebuf_fill); + + data_ptr++; + } + + add_response_info (sock, "%10lu:%s\n", (unsigned long) t, linebuf); + } /* for (t) */ + + return (send_response (sock, RESP_OK, "Success\n")); +#undef SSTRCAT +} /* }}} int handle_request_fetch */ + /* we came across a "WROTE" entry during journal replay. * throw away any values that we have accumulated for this file */ @@ -1633,6 +1805,14 @@ static command_t list_of_commands[] = { /* {{{ */ NULL }, { + "FETCH", + handle_request_fetch, + CMD_CONTEXT_CLIENT, + "FETCH [ []]\n" + , + "The 'FETCH' can be used by the client to retrieve values from an RRD file.\n" + }, + { "QUIT", handle_request_quit, CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH, @@ -1673,6 +1853,9 @@ static int socket_permission_check (listen_socket_t *sock, /* {{{ */ { ssize_t i; + if (JOURNAL_REPLAY(sock)) + return (1); + if (cmd == NULL) return (-1); @@ -1709,7 +1892,7 @@ static int socket_permission_add (listen_socket_t *sock, /* {{{ */ /* check whether commands are received in the expected context */ static int command_check_context(listen_socket_t *sock, command_t *cmd) { - if (sock == NULL) + if (JOURNAL_REPLAY(sock)) return (cmd->context & CMD_CONTEXT_JOURNAL); else if (sock->batch_start) return (cmd->context & CMD_CONTEXT_BATCH); @@ -1761,7 +1944,6 @@ static int handle_request_help (HANDLER_PROTO) /* {{{ */ return send_response(sock, RESP_OK, resp_txt); } /* }}} int handle_request_help */ -/* if sock==NULL, we are in journal replay mode */ static int handle_request (DISPATCH_PROTO) /* {{{ */ { char *buffer_ptr = buffer; @@ -2387,8 +2569,8 @@ static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */ fprintf (stderr, "rrdcached: Garbage after address: %s\n", port); return (-1); } - } /* if (*addr = ']') */ - else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */ + } /* if (*addr == '[') */ + else { port = rindex(addr, ':'); if (port != NULL)