X-Git-Url: https://git.octo.it/?p=rrdtool.git;a=blobdiff_plain;f=src%2Frrd_daemon.c;h=3be6e7845487186af62697a67761a2ec0aa5d05f;hp=9b8d9ee30cb59ae59d8d27de6b96f4b0a72cde95;hb=46a2db0465372eae273b1ff827e465a779339198;hpb=9e2eb0abd095264f584490ede57ae5afe8beb748 diff --git a/src/rrd_daemon.c b/src/rrd_daemon.c index 9b8d9ee..3be6e78 100644 --- a/src/rrd_daemon.c +++ b/src/rrd_daemon.c @@ -1,6 +1,6 @@ /** * RRDTool - src/rrd_daemon.c - * Copyright (C) 2008,2009 Florian octo Forster + * Copyright (C) 2008-2010 Florian octo Forster * Copyright (C) 2008,2009 Kevin Brintnall * * This program is free software; you can redistribute it and/or modify it @@ -73,6 +73,7 @@ #include "rrd.h" #include "rrd_client.h" +#include "unused.h" #include @@ -118,10 +119,6 @@ syslog ((severity), __VA_ARGS__); \ } while (0) -#ifndef __GNUC__ -# define __attribute__(x) /**/ -#endif - /* * Types */ @@ -155,12 +152,12 @@ typedef struct listen_socket_s listen_socket_t; struct command_s; typedef struct command_s command_t; /* note: guard against "unused" warnings in the handlers */ -#define DISPATCH_PROTO listen_socket_t *sock __attribute__((unused)),\ - time_t now __attribute__((unused)),\ - char *buffer __attribute__((unused)),\ - size_t buffer_size __attribute__((unused)) +#define DISPATCH_PROTO listen_socket_t UNUSED(*sock),\ + time_t UNUSED(now),\ + char UNUSED(*buffer),\ + size_t UNUSED(buffer_size) -#define HANDLER_PROTO command_t *cmd __attribute__((unused)),\ +#define HANDLER_PROTO command_t UNUSED(*cmd),\ DISPATCH_PROTO struct command_s { @@ -303,23 +300,23 @@ static void sig_common (const char *sig) /* {{{ */ pthread_cond_broadcast(&queue_cond); } /* }}} void sig_common */ -static void sig_int_handler (int s __attribute__((unused))) /* {{{ */ +static void sig_int_handler (int UNUSED(s)) /* {{{ */ { sig_common("INT"); } /* }}} void sig_int_handler */ -static void sig_term_handler (int s __attribute__((unused))) /* {{{ */ +static void sig_term_handler (int UNUSED(s)) /* {{{ */ { sig_common("TERM"); } /* }}} void sig_term_handler */ -static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */ +static void sig_usr1_handler (int UNUSED(s)) /* {{{ */ { config_flush_at_shutdown = 1; sig_common("USR1"); } /* }}} void sig_usr1_handler */ -static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */ +static void sig_usr2_handler (int UNUSED(s)) /* {{{ */ { config_flush_at_shutdown = 0; sig_common("USR2"); @@ -844,7 +841,7 @@ static int flush_old_values (int max_age) return (0); } /* int flush_old_values */ -static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */ +static void *flush_thread_main (void UNUSED(*args)) /* {{{ */ { struct timeval now; struct timespec next_flush; @@ -897,7 +894,7 @@ static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */ return NULL; } /* void *flush_thread_main */ -static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ +static void *queue_thread_main (void UNUSED(*args)) /* {{{ */ { pthread_mutex_lock (&cache_lock); @@ -1477,6 +1474,176 @@ static int handle_request_update (HANDLER_PROTO) /* {{{ */ } /* }}} int handle_request_update */ +static int handle_request_fetch (HANDLER_PROTO) /* {{{ */ +{ + char *file; + char *cf; + + char *start_str; + char *end_str; + rrd_time_value_t start_tv; + rrd_time_value_t end_tv; + time_t start_tm; + time_t end_tm; + + unsigned long step; + unsigned long ds_cnt; + char **ds_namv; + rrd_value_t *data; + + int status; + unsigned long i; + time_t t; + rrd_value_t *data_ptr; + + file = NULL; + cf = NULL; + start_str = NULL; + end_str = NULL; + + /* Read the arguments */ + do /* while (0) */ + { + status = buffer_get_field (&buffer, &buffer_size, &file); + if (status != 0) + break; + + status = buffer_get_field (&buffer, &buffer_size, &cf); + if (status != 0) + break; + + status = buffer_get_field (&buffer, &buffer_size, &start_str); + if (status != 0) + { + start_str = NULL; + status = 0; + break; + } + + status = buffer_get_field (&buffer, &buffer_size, &end_str); + if (status != 0) + { + end_str = NULL; + status = 0; + break; + } + } while (0); + + if (status != 0) + return (syntax_error(sock,cmd)); + + status = flush_file (file); + if ((status != 0) && (status != ENOENT)) + return (send_response (sock, RESP_ERR, + "flush_file (%s) failed with status %i.\n", file, status)); + + /* Parse start time */ + if (start_str != NULL) + { + const char *errmsg; + + errmsg = rrd_parsetime (start_str, &start_tv); + if (errmsg != NULL) + return (send_response(sock, RESP_ERR, + "Cannot parse start time `%s': %s\n", start_str, errmsg)); + } + else + rrd_parsetime ("-86400", &start_tv); + + /* Parse end time */ + if (end_str != NULL) + { + const char *errmsg; + + errmsg = rrd_parsetime (end_str, &end_tv); + if (errmsg != NULL) + return (send_response(sock, RESP_ERR, + "Cannot parse end time `%s': %s\n", end_str, errmsg)); + } + else + rrd_parsetime ("now", &end_tv); + + start_tm = 0; + end_tm = 0; + status = rrd_proc_start_end (&start_tv, &end_tv, &start_tm, &end_tm); + if (status != 0) + return (send_response(sock, RESP_ERR, + "rrd_proc_start_end failed: %s\n", rrd_get_error ())); + + step = -1; + ds_cnt = 0; + ds_namv = NULL; + data = NULL; + + status = rrd_fetch_r (file, cf, &start_tm, &end_tm, &step, + &ds_cnt, &ds_namv, &data); + if (status != 0) + return (send_response(sock, RESP_ERR, + "rrd_fetch_r failed: %s\n", rrd_get_error ())); + + add_response_info (sock, "FlushVersion: %lu\n", 1); + add_response_info (sock, "Start: %lu\n", (unsigned long) start_tm); + add_response_info (sock, "End: %lu\n", (unsigned long) end_tm); + add_response_info (sock, "Step: %lu\n", step); + add_response_info (sock, "DSCount: %lu\n", ds_cnt); + +#define SSTRCAT(buffer,str,buffer_fill) do { \ + size_t str_len = strlen (str); \ + if ((buffer_fill + str_len) > sizeof (buffer)) \ + str_len = sizeof (buffer) - buffer_fill; \ + if (str_len > 0) { \ + strncpy (buffer + buffer_fill, str, str_len); \ + buffer_fill += str_len; \ + assert (buffer_fill <= sizeof (buffer)); \ + if (buffer_fill == sizeof (buffer)) \ + buffer[buffer_fill - 1] = 0; \ + else \ + buffer[buffer_fill] = 0; \ + } \ + } while (0) + + { /* Add list of DS names */ + char linebuf[1024]; + size_t linebuf_fill; + + memset (linebuf, 0, sizeof (linebuf)); + linebuf_fill = 0; + for (i = 0; i < ds_cnt; i++) + { + if (i > 0) + SSTRCAT (linebuf, " ", linebuf_fill); + SSTRCAT (linebuf, ds_namv[i], linebuf_fill); + } + add_response_info (sock, "DSName: %s\n", linebuf); + } + + /* Add the actual data */ + assert (step > 0); + data_ptr = data; + for (t = start_tm + step; t <= end_tm; t += step) + { + char linebuf[1024]; + size_t linebuf_fill; + char tmp[128]; + + memset (linebuf, 0, sizeof (linebuf)); + linebuf_fill = 0; + for (i = 0; i < ds_cnt; i++) + { + snprintf (tmp, sizeof (tmp), " %0.10e", *data_ptr); + tmp[sizeof (tmp) - 1] = 0; + SSTRCAT (linebuf, tmp, linebuf_fill); + + data_ptr++; + } + + add_response_info (sock, "%10lu:%s\n", (unsigned long) t, linebuf); + } /* for (t) */ + + return (send_response (sock, RESP_OK, "Success\n")); +#undef SSTRCAT +} /* }}} int handle_request_fetch */ + /* we came across a "WROTE" entry during journal replay. * throw away any values that we have accumulated for this file */ @@ -1649,6 +1816,14 @@ static command_t list_of_commands[] = { /* {{{ */ NULL }, { + "FETCH", + handle_request_fetch, + CMD_CONTEXT_CLIENT, + "FETCH [ []]\n" + , + "The 'FETCH' can be used by the client to retrieve values from an RRD file.\n" + }, + { "QUIT", handle_request_quit, CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH, @@ -2529,7 +2704,7 @@ static int close_listen_sockets (void) /* {{{ */ return (0); } /* }}} int close_listen_sockets */ -static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */ +static void *listen_thread_main (void UNUSED(*args)) /* {{{ */ { struct pollfd *pollfds; int pollfds_num; @@ -2780,7 +2955,7 @@ static int read_options (int argc, char **argv) /* {{{ */ gid_t socket_group = (gid_t)-1; mode_t socket_permissions = (mode_t)-1; - while ((option = getopt(argc, argv, "gl:s:m:P:f:w:z:t:Bb:p:Fj:m:h?")) != -1) + while ((option = getopt(argc, argv, "gl:s:m:P:f:w:z:t:Bb:p:Fj:a:h?")) != -1) { switch (option) { @@ -3088,7 +3263,7 @@ static int read_options (int argc, char **argv) /* {{{ */ } break; - case 'm': + case 'a': { int temp = atoi(optarg); if (temp > 0) @@ -3096,7 +3271,7 @@ static int read_options (int argc, char **argv) /* {{{ */ else { fprintf(stderr, "Invalid allocation size: %s\n", optarg); - status = 10; + return 10; } } break; @@ -3127,6 +3302,7 @@ static int read_options (int argc, char **argv) /* {{{ */ "for that group)\n" " -m File permissions (octal) of all following UNIX " "sockets\n" + " -a Memory allocation chunk size. Default is 1." "\n" "For more information and a detailed description of all options " "please refer\n"