X-Git-Url: https://git.octo.it/?p=rrdtool.git;a=blobdiff_plain;f=src%2Frrd_client.c;h=0d2128387a8ed7633ca3125fb5a9b2d0d571f29f;hp=d1ad5e06ef0cf4ee07b63d20e172cb0166562d06;hb=b47c2e44707f965100a561660a45f89b5759cbc1;hpb=a12627275ff8487174cbb907a066f62a00b6ae44 diff --git a/src/rrd_client.c b/src/rrd_client.c index d1ad5e0..0d21283 100644 --- a/src/rrd_client.c +++ b/src/rrd_client.c @@ -1,30 +1,38 @@ /** * RRDTool - src/rrd_client.c - * Copyright (C) 2008 Florian octo Forster + * Copyright (C) 2008-2010 Florian octo Forster * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License as published by the - * Free Software Foundation; only version 2 of the License is applicable. - * - * This program is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * General Public License for more details. - * - * You should have received a copy of the GNU General Public License along - * with this program; if not, write to the Free Software Foundation, Inc., - * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. * * Authors: * Florian octo Forster + * Sebastian tokkee Harl **/ #include "rrd.h" -#include "rrd_client.h" #include "rrd_tool.h" +#include "rrd_client.h" +#include #include #include +#include #include #include #include @@ -32,6 +40,7 @@ #include #include #include +#include #ifndef ENODATA #define ENODATA ENOENT @@ -48,84 +57,199 @@ typedef struct rrdc_response_s rrdc_response_t; static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; static int sd = -1; +static FILE *sh = NULL; static char *sd_path = NULL; /* cache the path for sd */ -static void _disconnect(void); -static ssize_t sread (void *buffer_void, size_t buffer_size) /* {{{ */ +/* get_path: Return a path name appropriate to be sent to the daemon. + * + * When talking to a local daemon (thru a UNIX socket), relative path names + * are resolved to absolute path names to allow for transparent integration + * into existing solutions (as requested by Tobi). Else, absolute path names + * are not allowed, since path name translation is done by the server. + * + * One must hold `lock' when calling this function. */ +static const char *get_path (const char *path, char *resolved_path) /* {{{ */ { - char *buffer; - size_t buffer_used; - size_t buffer_free; - ssize_t status; + const char *ret = path; + int is_unix = 0; - buffer = (char *) buffer_void; - buffer_used = 0; - buffer_free = buffer_size; + if ((path == NULL) || (resolved_path == NULL) || (sd_path == NULL)) + return (NULL); - while (buffer_free > 0) - { - status = read (sd, buffer + buffer_used, buffer_free); - if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR))) - continue; - - if (status < 0) - return (-1); + if ((*sd_path == '/') + || (strncmp ("unix:", sd_path, strlen ("unix:")) == 0)) + is_unix = 1; - if (status == 0) + if (is_unix) + { + ret = realpath(path, resolved_path); + if (ret == NULL) + rrd_set_error("realpath(%s): %s", path, rrd_strerror(errno)); + return ret; + } + else + { + if (*path == '/') /* not absolute path */ { - _disconnect(); - errno = EPROTO; - return (-1); + rrd_set_error ("absolute path names not allowed when talking " + "to a remote daemon"); + return NULL; } + } - assert ((0 > status) || (buffer_free >= (size_t) status)); + return path; +} /* }}} char *get_path */ - buffer_free -= status; - buffer_used += status; +static size_t strsplit (char *string, char **fields, size_t size) /* {{{ */ +{ + size_t i; + char *ptr; + char *saveptr; - if (buffer[buffer_used - 1] == '\n') + i = 0; + ptr = string; + saveptr = NULL; + while ((fields[i] = strtok_r (ptr, " \t\r\n", &saveptr)) != NULL) + { + ptr = NULL; + i++; + + if (i >= size) break; } - if (buffer[buffer_used - 1] != '\n') - { - errno = ENOBUFS; + return (i); +} /* }}} size_t strsplit */ + +static int parse_header (char *line, /* {{{ */ + char **ret_key, char **ret_value) +{ + char *tmp; + + *ret_key = line; + + tmp = strchr (line, ':'); + if (tmp == NULL) return (-1); + + do + { + *tmp = 0; + tmp++; } + while ((tmp[0] == ' ') || (tmp[0] == '\t')); - buffer[buffer_used - 1] = '\0'; - return (buffer_used); -} /* }}} ssize_t sread */ + if (*tmp == 0) + return (-1); -static ssize_t swrite (const void *buf, size_t count) /* {{{ */ + *ret_value = tmp; + return (0); +} /* }}} int parse_header */ + +static int parse_ulong_header (char *line, /* {{{ */ + char **ret_key, unsigned long *ret_value) { - const char *ptr; - size_t nleft; - ssize_t status; + char *str_value; + char *endptr; + int status; + + str_value = NULL; + status = parse_header (line, ret_key, &str_value); + if (status != 0) + return (status); - ptr = (const char *) buf; - nleft = count; + endptr = NULL; + errno = 0; + *ret_value = (unsigned long) strtol (str_value, &endptr, /* base = */ 0); + if ((endptr == str_value) || (errno != 0)) + return (-1); - while (nleft > 0) + return (0); +} /* }}} int parse_ulong_header */ + +static int parse_char_array_header (char *line, /* {{{ */ + char **ret_key, char **array, size_t array_len, int alloc) +{ + char *tmp_array[array_len]; + char *value; + size_t num; + int status; + + value = NULL; + status = parse_header (line, ret_key, &value); + if (status != 0) + return (-1); + + num = strsplit (value, tmp_array, array_len); + if (num != array_len) + return (-1); + + if (alloc == 0) { - status = write (sd, (const void *) ptr, nleft); + memcpy (array, tmp_array, sizeof (tmp_array)); + } + else + { + size_t i; - if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR))) - continue; + for (i = 0; i < array_len; i++) + array[i] = strdup (tmp_array[i]); + } - if (status < 0) - { - _disconnect(); - rrd_set_error("lost connection to rrdcached"); - return (status); - } + return (0); +} /* }}} int parse_char_array_header */ + +static int parse_value_array_header (char *line, /* {{{ */ + time_t *ret_time, rrd_value_t *array, size_t array_len) +{ + char *str_key; + char *str_array[array_len]; + char *endptr; + int status; + size_t i; + + str_key = NULL; + status = parse_char_array_header (line, &str_key, + str_array, array_len, /* alloc = */ 0); + if (status != 0) + return (-1); + + errno = 0; + endptr = NULL; + *ret_time = (time_t) strtol (str_key, &endptr, /* base = */ 10); + if ((endptr == str_key) || (errno != 0)) + return (-1); - nleft -= status; - ptr += status; + for (i = 0; i < array_len; i++) + { + endptr = NULL; + array[i] = (rrd_value_t) strtod (str_array[i], &endptr); + if ((endptr == str_array[i]) || (errno != 0)) + return (-1); } return (0); -} /* }}} ssize_t swrite */ +} /* }}} int parse_value_array_header */ + +/* One must hold `lock' when calling `close_connection'. */ +static void close_connection (void) /* {{{ */ +{ + if (sh != NULL) + { + fclose (sh); + sh = NULL; + sd = -1; + } + else if (sd >= 0) + { + close (sd); + sd = -1; + } + + if (sd_path != NULL) + free (sd_path); + sd_path = NULL; +} /* }}} void close_connection */ static int buffer_add_string (const char *str, /* {{{ */ char **buffer_ret, size_t *buffer_size_ret) @@ -192,103 +316,157 @@ static int buffer_add_value (const char *value, /* {{{ */ return (buffer_add_string (temp, buffer_ret, buffer_size_ret)); } /* }}} int buffer_add_value */ -static int response_parse (char *buffer, size_t buffer_size, /* {{{ */ - rrdc_response_t **ret_response) +/* Remove trailing newline (NL) and carriage return (CR) characters. Similar to + * the Perl function `chomp'. Returns the number of characters that have been + * removed. */ +static int chomp (char *str) /* {{{ */ { - rrdc_response_t *ret; + size_t len; + int removed; - char *dummy; - char *saveptr; + if (str == NULL) + return (-1); + + len = strlen (str); + removed = 0; + while ((len > 0) && ((str[len - 1] == '\n') || (str[len - 1] == '\r'))) + { + str[len - 1] = 0; + len--; + removed++; + } - char *line_ptr; - size_t line_counter; + return (removed); +} /* }}} int chomp */ - if (buffer == NULL) - return (EINVAL); - if (buffer_size <= 0) - return (EINVAL); +static void response_free (rrdc_response_t *res) /* {{{ */ +{ + if (res == NULL) + return; + + if (res->lines != NULL) + { + size_t i; - if (buffer[buffer_size - 1] != 0) + for (i = 0; i < res->lines_num; i++) + if (res->lines[i] != NULL) + free (res->lines[i]); + free (res->lines); + } + + free (res); +} /* }}} void response_free */ + +static int response_read (rrdc_response_t **ret_response) /* {{{ */ +{ + rrdc_response_t *ret; + + char buffer[4096]; + char *buffer_ptr; + + size_t i; + + if (sh == NULL) return (-1); ret = (rrdc_response_t *) malloc (sizeof (rrdc_response_t)); if (ret == NULL) - return (ENOMEM); + return (-2); memset (ret, 0, sizeof (*ret)); + ret->lines = NULL; + ret->lines_num = 0; - line_counter = 0; + buffer_ptr = fgets (buffer, sizeof (buffer), sh); + if (buffer_ptr == NULL) { + close_connection(); + return (-3); + } + chomp (buffer); - dummy = buffer; - saveptr = NULL; - while ((line_ptr = strtok_r (dummy, "\r\n", &saveptr)) != NULL) + ret->status = strtol (buffer, &ret->message, 0); + if (buffer == ret->message) { - dummy = NULL; + response_free (ret); + close_connection(); + return (-4); + } + /* Skip leading whitespace of the status message */ + ret->message += strspn (ret->message, " \t"); - if (ret->message == NULL) - { - ret->status = strtol (buffer, &ret->message, 0); - if (buffer == ret->message) - { - free (ret); - return (EPROTO); - } + if (ret->status <= 0) + { + if (ret->status < 0) + rrd_set_error("rrdcached: %s", ret->message); + *ret_response = ret; + return (0); + } - /* Skip leading whitespace of the status message */ - ret->message += strspn (ret->message, " \t"); + ret->lines = (char **) malloc (sizeof (char *) * ret->status); + if (ret->lines == NULL) + { + response_free (ret); + close_connection(); + return (-5); + } + memset (ret->lines, 0, sizeof (char *) * ret->status); + ret->lines_num = (size_t) ret->status; - if (ret->status > 0) - { - ret->lines = (char **) malloc (sizeof (char *) * ret->status); - if (ret->lines == NULL) - { - free (ret); - return (ENOMEM); - } - memset (ret->lines, 0, sizeof (char *) * ret->status); - ret->lines_num = (size_t) ret->status; - } - else - { - ret->lines = NULL; - ret->lines_num = 0; - } - } - else /* if (ret->message != NULL) */ + for (i = 0; i < ret->lines_num; i++) + { + buffer_ptr = fgets (buffer, sizeof (buffer), sh); + if (buffer_ptr == NULL) { - if (line_counter < ret->lines_num) - ret->lines[line_counter] = line_ptr; - line_counter++; + response_free (ret); + close_connection(); + return (-6); } - } /* while (strtok_r) */ + chomp (buffer); - if (ret->lines_num != line_counter) - { - errno = EPROTO; - if (ret->lines != NULL) - free (ret->lines); - free (ret); - return (-1); + ret->lines[i] = strdup (buffer); + if (ret->lines[i] == NULL) + { + response_free (ret); + close_connection(); + return (-7); + } } *ret_response = ret; return (0); -} /* }}} int response_parse */ +} /* }}} rrdc_response_t *response_read */ -static void response_free (rrdc_response_t *res) /* {{{ */ +static int request (const char *buffer, size_t buffer_size, /* {{{ */ + rrdc_response_t **ret_response) { - if (res == NULL) - return; + int status; + rrdc_response_t *res; - if (res->lines != NULL) + if (sh == NULL) + return (ENOTCONN); + + status = (int) fwrite (buffer, buffer_size, /* nmemb = */ 1, sh); + if (status != 1) { - res->lines_num = 0; - free (res->lines); - res->lines = NULL; + close_connection (); + rrd_set_error("request: socket error (%d) while talking to rrdcached", + status); + return (-1); } + fflush (sh); - free (res); -} /* }}} void response_free */ + res = NULL; + status = response_read (&res); + + if (status != 0) + { + if (status < 0) + rrd_set_error("request: internal error while talking to rrdcached"); + return (status); + } + *ret_response = res; + return (0); +} /* }}} int request */ /* determine whether we are connected to the specified daemon_addr if * NULL, return whether we are connected at all @@ -340,21 +518,37 @@ static int rrdc_connect_unix (const char *path) /* {{{ */ if (status != 0) { status = errno; + close_connection (); + return (status); + } + + sh = fdopen (sd, "r+"); + if (sh == NULL) + { + status = errno; + close_connection (); return (status); } return (0); } /* }}} int rrdc_connect_unix */ -static int rrdc_connect_network (const char *addr) /* {{{ */ +static int rrdc_connect_network (const char *addr_orig) /* {{{ */ { struct addrinfo ai_hints; struct addrinfo *ai_res; struct addrinfo *ai_ptr; + char addr_copy[NI_MAXHOST]; + char *addr; + char *port; - assert (addr != NULL); + assert (addr_orig != NULL); assert (sd == -1); + strncpy(addr_copy, addr_orig, sizeof(addr_copy)); + addr_copy[sizeof(addr_copy) - 1] = '\0'; + addr = addr_copy; + int status; memset (&ai_hints, 0, sizeof (ai_hints)); ai_hints.ai_flags = 0; @@ -364,10 +558,52 @@ static int rrdc_connect_network (const char *addr) /* {{{ */ ai_hints.ai_family = AF_UNSPEC; ai_hints.ai_socktype = SOCK_STREAM; + port = NULL; + if (*addr == '[') /* IPv6+port format */ + { + /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */ + addr++; + + port = strchr (addr, ']'); + if (port == NULL) + { + rrd_set_error("malformed address: %s", addr_orig); + return (-1); + } + *port = 0; + port++; + + if (*port == ':') + port++; + else if (*port == 0) + port = NULL; + else + { + rrd_set_error("garbage after address: %s", port); + return (-1); + } + } /* if (*addr == '[') */ + else + { + port = rindex(addr, ':'); + if (port != NULL) + { + *port = 0; + port++; + } + } + ai_res = NULL; - status = getaddrinfo (addr, RRDCACHED_DEFAULT_PORT, &ai_hints, &ai_res); + status = getaddrinfo (addr, + port == NULL ? RRDCACHED_DEFAULT_PORT : port, + &ai_hints, &ai_res); if (status != 0) - return (status); + { + rrd_set_error ("failed to resolve address `%s' (port %s): %s", + addr, port == NULL ? RRDCACHED_DEFAULT_PORT : port, + gai_strerror (status)); + return (-1); + } for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next) { @@ -383,7 +619,15 @@ static int rrdc_connect_network (const char *addr) /* {{{ */ if (status != 0) { status = errno; - _disconnect(); + close_connection(); + continue; + } + + sh = fdopen (sd, "r+"); + if (sh == NULL) + { + status = errno; + close_connection (); continue; } @@ -414,9 +658,10 @@ int rrdc_connect (const char *addr) /* {{{ */ } else { - _disconnect(); + close_connection(); } + rrd_clear_error (); if (strncmp ("unix:", addr, strlen ("unix:")) == 0) status = rrdc_connect_unix (addr + strlen ("unix:")); else if (addr[0] == '/') @@ -427,32 +672,28 @@ int rrdc_connect (const char *addr) /* {{{ */ if (status == 0 && sd >= 0) sd_path = strdup(addr); else + { + char *err = rrd_test_error () ? rrd_get_error () : "Internal error"; + /* err points the string that gets written to by rrd_set_error(), thus we + * cannot pass it to that function */ + err = strdup (err); rrd_set_error("Unable to connect to rrdcached: %s", (status < 0) - ? "Internal error" + ? (err ? err : "Internal error") : rrd_strerror (status)); + if (err != NULL) + free (err); + } pthread_mutex_unlock (&lock); return (status); } /* }}} int rrdc_connect */ -static void _disconnect(void) /* {{{ */ -{ - if (sd >= 0) - close(sd); - - if (sd_path != NULL) - free(sd_path); - - sd = -1; - sd_path = NULL; -} /* }}} static void _disconnect(void) */ - int rrdc_disconnect (void) /* {{{ */ { pthread_mutex_lock (&lock); - _disconnect(); + close_connection(); pthread_mutex_unlock (&lock); @@ -466,8 +707,10 @@ int rrdc_update (const char *filename, int values_num, /* {{{ */ char *buffer_ptr; size_t buffer_free; size_t buffer_size; + rrdc_response_t *res; int status; int i; + char file_path[PATH_MAX]; memset (buffer, 0, sizeof (buffer)); buffer_ptr = &buffer[0]; @@ -477,15 +720,29 @@ int rrdc_update (const char *filename, int values_num, /* {{{ */ if (status != 0) return (ENOBUFS); + pthread_mutex_lock (&lock); + filename = get_path (filename, file_path); + if (filename == NULL) + { + pthread_mutex_unlock (&lock); + return (-1); + } + status = buffer_add_string (filename, &buffer_ptr, &buffer_free); if (status != 0) + { + pthread_mutex_unlock (&lock); return (ENOBUFS); + } for (i = 0; i < values_num; i++) { status = buffer_add_value (values[i], &buffer_ptr, &buffer_free); if (status != 0) + { + pthread_mutex_unlock (&lock); return (ENOBUFS); + } } assert (buffer_free < sizeof (buffer)); @@ -493,37 +750,16 @@ int rrdc_update (const char *filename, int values_num, /* {{{ */ assert (buffer[buffer_size - 1] == ' '); buffer[buffer_size - 1] = '\n'; - pthread_mutex_lock (&lock); - - if (sd < 0) - { - pthread_mutex_unlock (&lock); - return (ENOTCONN); - } + res = NULL; + status = request (buffer, buffer_size, &res); + pthread_mutex_unlock (&lock); - status = swrite (buffer, buffer_size); if (status != 0) - { - pthread_mutex_unlock (&lock); return (status); - } - status = sread (buffer, sizeof (buffer)); - if (status < 0) - { - status = errno; - pthread_mutex_unlock (&lock); - return (status); - } - else if (status == 0) - { - pthread_mutex_unlock (&lock); - return (ENODATA); - } + status = res->status; + response_free (res); - pthread_mutex_unlock (&lock); - - status = atoi (buffer); return (status); } /* }}} int rrdc_update */ @@ -533,7 +769,9 @@ int rrdc_flush (const char *filename) /* {{{ */ char *buffer_ptr; size_t buffer_free; size_t buffer_size; + rrdc_response_t *res; int status; + char file_path[PATH_MAX]; if (filename == NULL) return (-1); @@ -546,50 +784,248 @@ int rrdc_flush (const char *filename) /* {{{ */ if (status != 0) return (ENOBUFS); + pthread_mutex_lock (&lock); + filename = get_path (filename, file_path); + if (filename == NULL) + { + pthread_mutex_unlock (&lock); + return (-1); + } + status = buffer_add_string (filename, &buffer_ptr, &buffer_free); if (status != 0) + { + pthread_mutex_unlock (&lock); return (ENOBUFS); + } assert (buffer_free < sizeof (buffer)); buffer_size = sizeof (buffer) - buffer_free; assert (buffer[buffer_size - 1] == ' '); buffer[buffer_size - 1] = '\n'; - pthread_mutex_lock (&lock); + res = NULL; + status = request (buffer, buffer_size, &res); + pthread_mutex_unlock (&lock); - if (sd < 0) + if (status != 0) + return (status); + + status = res->status; + response_free (res); + + return (status); +} /* }}} int rrdc_flush */ + +int rrdc_fetch (const char *filename, /* {{{ */ + const char *cf, + time_t *ret_start, time_t *ret_end, + unsigned long *ret_step, + unsigned long *ret_ds_num, + char ***ret_ds_names, + rrd_value_t **ret_data) +{ + char buffer[4096]; + char *buffer_ptr; + size_t buffer_free; + size_t buffer_size; + rrdc_response_t *res; + char path_buffer[PATH_MAX]; + const char *path_ptr; + + char *str_tmp; + unsigned long flush_version; + + time_t start; + time_t end; + unsigned long step; + unsigned long ds_num; + char **ds_names; + + rrd_value_t *data; + size_t data_size; + size_t data_fill; + + int status; + size_t current_line; + time_t t; + + if ((filename == NULL) || (cf == NULL)) + return (-1); + + /* Send request {{{ */ + memset (buffer, 0, sizeof (buffer)); + buffer_ptr = &buffer[0]; + buffer_free = sizeof (buffer); + + status = buffer_add_string ("FETCH", &buffer_ptr, &buffer_free); + if (status != 0) + return (ENOBUFS); + + /* change to path for rrdcached */ + path_ptr = get_path (filename, path_buffer); + if (path_ptr == NULL) + return (EINVAL); + + status = buffer_add_string (path_ptr, &buffer_ptr, &buffer_free); + if (status != 0) + return (ENOBUFS); + + status = buffer_add_string (cf, &buffer_ptr, &buffer_free); + if (status != 0) + return (ENOBUFS); + + if ((ret_start != NULL) && (*ret_start > 0)) { - pthread_mutex_unlock (&lock); - return (ENOTCONN); + char tmp[64]; + snprintf (tmp, sizeof (tmp), "%lu", (unsigned long) *ret_start); + tmp[sizeof (tmp) - 1] = 0; + status = buffer_add_string (tmp, &buffer_ptr, &buffer_free); + if (status != 0) + return (ENOBUFS); + + if ((ret_end != NULL) && (*ret_end > 0)) + { + snprintf (tmp, sizeof (tmp), "%lu", (unsigned long) *ret_end); + tmp[sizeof (tmp) - 1] = 0; + status = buffer_add_string (tmp, &buffer_ptr, &buffer_free); + if (status != 0) + return (ENOBUFS); + } } - status = swrite (buffer, buffer_size); + assert (buffer_free < sizeof (buffer)); + buffer_size = sizeof (buffer) - buffer_free; + assert (buffer[buffer_size - 1] == ' '); + buffer[buffer_size - 1] = '\n'; + + res = NULL; + status = request (buffer, buffer_size, &res); if (status != 0) - { - pthread_mutex_unlock (&lock); return (status); - } - status = sread (buffer, sizeof (buffer)); + status = res->status; if (status < 0) { - status = errno; - pthread_mutex_unlock (&lock); + rrd_set_error ("rrdcached: %s", res->message); + response_free (res); return (status); } - else if (status == 0) + /* }}} Send request */ + + ds_names = NULL; + ds_num = 0; + data = NULL; + current_line = 0; + + /* Macros to make error handling a little easier (i. e. less to type and + * read. `BAIL_OUT' sets the error message, frees all dynamically allocated + * variables and returns the provided status code. */ +#define BAIL_OUT(status, ...) do { \ + rrd_set_error ("rrdc_fetch: " __VA_ARGS__); \ + free (data); \ + if (ds_names != 0) { size_t k; for (k = 0; k < ds_num; k++) free (ds_names[k]); } \ + free (ds_names); \ + response_free (res); \ + return (status); \ + } while (0) + +#define READ_NUMERIC_FIELD(name,type,var) do { \ + char *key; \ + unsigned long value; \ + assert (current_line < res->lines_num); \ + status = parse_ulong_header (res->lines[current_line], &key, &value); \ + if (status != 0) \ + BAIL_OUT (-1, "Unable to parse header `%s'", name); \ + if (strcasecmp (key, name) != 0) \ + BAIL_OUT (-1, "Unexpected header line: Expected `%s', got `%s'", name, key); \ + var = (type) value; \ + current_line++; \ + } while (0) + + if (res->lines_num < 1) + BAIL_OUT (-1, "Premature end of response packet"); + + /* We're making some very strong assumptions about the fields below. We + * therefore check the version of the `flush' command first, so that later + * versions can change the order of fields and it's easier to implement + * backwards compatibility. */ + READ_NUMERIC_FIELD ("FlushVersion", unsigned long, flush_version); + if (flush_version != 1) + BAIL_OUT (-1, "Don't know how to handle flush format version %lu.", + flush_version); + + if (res->lines_num < 5) + BAIL_OUT (-1, "Premature end of response packet"); + + READ_NUMERIC_FIELD ("Start", time_t, start); + READ_NUMERIC_FIELD ("End", time_t, end); + if (start >= end) + BAIL_OUT (-1, "Malformed start and end times: start = %lu; end = %lu;", + (unsigned long) start, + (unsigned long) end); + + READ_NUMERIC_FIELD ("Step", unsigned long, step); + if (step < 1) + BAIL_OUT (-1, "Invalid number for Step: %lu", step); + + READ_NUMERIC_FIELD ("DSCount", unsigned long, ds_num); + if (ds_num < 1) + BAIL_OUT (-1, "Invalid number for DSCount: %lu", ds_num); + + /* It's time to allocate some memory */ + ds_names = calloc ((size_t) ds_num, sizeof (*ds_names)); + if (ds_names == NULL) + BAIL_OUT (-1, "Out of memory"); + + status = parse_char_array_header (res->lines[current_line], + &str_tmp, ds_names, (size_t) ds_num, /* alloc = */ 1); + if (status != 0) + BAIL_OUT (-1, "Unable to parse header `DSName'"); + if (strcasecmp ("DSName", str_tmp) != 0) + BAIL_OUT (-1, "Unexpected header line: Expected `DSName', got `%s'", str_tmp); + current_line++; + + data_size = ds_num * (end - start) / step; + if (data_size < 1) + BAIL_OUT (-1, "No data returned or headers invalid."); + + if (res->lines_num != (6 + (data_size / ds_num))) + BAIL_OUT (-1, "Got %zu lines, expected %zu", + res->lines_num, (6 + (data_size / ds_num))); + + data = calloc (data_size, sizeof (*data)); + if (data == NULL) + BAIL_OUT (-1, "Out of memory"); + + + data_fill = 0; + for (t = start + step; t <= end; t += step, current_line++) { - pthread_mutex_unlock (&lock); - return (ENODATA); - } + time_t tmp; - pthread_mutex_unlock (&lock); + assert (current_line < res->lines_num); - status = atoi (buffer); - return (status); -} /* }}} int rrdc_flush */ + status = parse_value_array_header (res->lines[current_line], + &tmp, data + data_fill, (size_t) ds_num); + if (status != 0) + BAIL_OUT (-1, "Cannot parse value line"); + data_fill += (size_t) ds_num; + } + *ret_start = start; + *ret_end = end; + *ret_step = step; + *ret_ds_num = ds_num; + *ret_ds_names = ds_names; + *ret_data = data; + + response_free (res); + return (0); +#undef READ_NUMERIC_FIELD +#undef BAIL_OUT +} /* }}} int rrdc_flush */ /* convenience function; if there is a daemon specified, or if we can * detect one from the environment, then flush the file. Otherwise, no-op @@ -602,13 +1038,23 @@ int rrdc_flush_if_daemon (const char *opt_daemon, const char *filename) /* {{{ * if (rrdc_is_connected(opt_daemon)) { + rrd_clear_error(); status = rrdc_flush (filename); - if (status != 0) + + if (status != 0 && !rrd_test_error()) { - rrd_set_error ("rrdc_flush (%s) failed with status %i.", - filename, status); + if (status > 0) + { + rrd_set_error("rrdc_flush (%s) failed: %s", + filename, rrd_strerror(status)); + } + else if (status < 0) + { + rrd_set_error("rrdc_flush (%s) failed with status %i.", + filename, status); + } } - } /* if (daemon_addr) */ + } /* if (rrdc_is_connected(..)) */ return status; } /* }}} int rrdc_flush_if_daemon */ @@ -619,21 +1065,11 @@ int rrdc_stats_get (rrdc_stats_t **ret_stats) /* {{{ */ rrdc_stats_t *head; rrdc_stats_t *tail; - rrdc_response_t *response; + rrdc_response_t *res; - char buffer[4096]; - size_t buffer_size; int status; size_t i; - pthread_mutex_lock (&lock); - - if (sd < 0) - { - pthread_mutex_unlock (&lock); - return (ENOTCONN); - } - /* Protocol example: {{{ * -> STATS * <- 5 Statistics follow @@ -643,63 +1079,31 @@ int rrdc_stats_get (rrdc_stats_t **ret_stats) /* {{{ */ * <- TreeNodesNumber: 0 * <- TreeDepth: 0 * }}} */ - status = swrite ("STATS\n", strlen ("STATS\n")); - if (status != 0) - { - pthread_mutex_unlock (&lock); - return (status); - } - - status = sread (buffer, sizeof (buffer)); - if (status < 0) - { - status = errno; - pthread_mutex_unlock (&lock); - return (status); - } - else if (status == 0) - { - pthread_mutex_unlock (&lock); - return (ENODATA); - } + res = NULL; + pthread_mutex_lock (&lock); + status = request ("STATS\n", strlen ("STATS\n"), &res); pthread_mutex_unlock (&lock); - /* Assert NULL termination */ - buffer_size = (size_t) status; - if (buffer[buffer_size - 1] != 0) - { - if (buffer_size < sizeof (buffer)) - { - buffer[buffer_size] = 0; - buffer_size++; - } - else - { - return (ENOBUFS); - } - } - - status = response_parse (buffer, buffer_size, &response); if (status != 0) return (status); - if (response->status <= 0) + if (res->status <= 0) { - response_free (response); + response_free (res); return (EIO); } head = NULL; tail = NULL; - for (i = 0; i < response->lines_num; i++) + for (i = 0; i < res->lines_num; i++) { char *key; char *value; char *endptr; rrdc_stats_t *s; - key = response->lines[i]; + key = res->lines[i]; value = strchr (key, ':'); if (value == NULL) continue; @@ -718,14 +1122,18 @@ int rrdc_stats_get (rrdc_stats_t **ret_stats) /* {{{ */ endptr = NULL; if ((strcmp ("QueueLength", key) == 0) - || (strcmp ("TreeNodesNumber", key) == 0) - || (strcmp ("TreeDepth", key) == 0)) + || (strcmp ("TreeDepth", key) == 0) + || (strcmp ("TreeNodesNumber", key) == 0)) { s->type = RRDC_STATS_TYPE_GAUGE; s->value.gauge = strtod (value, &endptr); } - else if ((strcmp ("UpdatesWritten", key) == 0) - || (strcmp ("DataSetsWritten", key) == 0)) + else if ((strcmp ("DataSetsWritten", key) == 0) + || (strcmp ("FlushesReceived", key) == 0) + || (strcmp ("JournalBytes", key) == 0) + || (strcmp ("JournalRotate", key) == 0) + || (strcmp ("UpdatesReceived", key) == 0) + || (strcmp ("UpdatesWritten", key) == 0)) { s->type = RRDC_STATS_TYPE_COUNTER; s->value.counter = (uint64_t) strtoll (value, &endptr, /* base = */ 0); @@ -754,9 +1162,9 @@ int rrdc_stats_get (rrdc_stats_t **ret_stats) /* {{{ */ tail->next = s; tail = s; } - } /* for (i = 0; i < response->lines_num; i++) */ + } /* for (i = 0; i < res->lines_num; i++) */ - response_free (response); + response_free (res); if (head == NULL) return (EPROTO); @@ -778,7 +1186,7 @@ void rrdc_stats_free (rrdc_stats_t *ret_stats) /* {{{ */ if (this->name != NULL) { - free (this->name); + free ((char *)this->name); this->name = NULL; } free (this);