RRDCacheD: Add the "FETCH" command. ff/rrdc_fetch
authorFlorian Forster <octo@noris.net>
Wed, 24 Feb 2010 16:34:41 +0000 (17:34 +0100)
committerFlorian Forster <octo@noris.net>
Wed, 24 Feb 2010 16:35:51 +0000 (17:35 +0100)
Add a "FETCH" command to RRDCacheD which behaves like a (simplified
version of) rrdfetch(1).

This has advantages over calling "FLUSH" from within the "client",
especially if the daemon is accessed using a network socket. For one, it
makes it easy to separate collecting and storing of data on one side and
creating graphs on another, possibly more public server. Without this
command this is only possible using networked file systems and similar
techniques.

When talking to an instance of RRDCacheD via a network socket, only
relative pathnames are allowed. If the RRD file is to be accessed
afterwards (why else would one call "FLUSH"?), the client has to be in a
specific directory so the *same* relative path can be used. If the file
is on a share mounted via the network, the required CWD may differ from
the CWD of the server, making developing and deploying solutions using
separated storing and graphing unnecessarily hard.

The data can be accessed using "rrdc_fetch" which should be a drop-in
replacement for "rrd_fetch_r". This makes it easy for programs using the
RRDtool C API to use this new functionality.

doc/rrdcached.pod
src/rrd_client.c
src/rrd_client.h
src/rrd_daemon.c

index 0fa12ca..5f7f0dd 100644 (file)
@@ -477,6 +477,13 @@ returns immediately, even though the writes may take a long time.
 Shows any "pending" updates for a file, in order.  The updates shown have
 not yet been written to the underlying RRD file.
 
+=item B<FETCH> I<filename> I<CF> [I<start> [I<end>]]
+
+Calls C<rrd_fetch> with the specified arguments and returns the result in text
+form. If necessary, the file is flushed to disk first. The client side function
+C<rrdc_fetch> (declared in C<rrd_client.h>) parses the output and behaves just
+like C<rrd_fetch_r> for easy integration of remote queries.
+
 =item B<FORGET> I<filename>
 
 Removes I<filename> from the cache.  Any pending updates B<WILL BE LOST>.
index 0b69000..04b54a8 100644 (file)
@@ -1,6 +1,6 @@
 /**
  * RRDTool - src/rrd_client.c
- * Copyright (C) 2008 Florian octo Forster
+ * Copyright (C) 2008-2010  Florian octo Forster
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License as published by the
@@ -94,6 +94,137 @@ static const char *get_path (const char *path, char *resolved_path) /* {{{ */
   return (ret);
 } /* }}} char *get_path */
 
+static size_t strsplit (char *string, char **fields, size_t size) /* {{{ */
+{
+  size_t i;
+  char *ptr;
+  char *saveptr;
+
+  i = 0;
+  ptr = string;
+  saveptr = NULL;
+  while ((fields[i] = strtok_r (ptr, " \t\r\n", &saveptr)) != NULL)
+  {
+    ptr = NULL;
+    i++;
+
+    if (i >= size)
+      break;
+  }
+
+  return (i);
+} /* }}} size_t strsplit */
+
+static int parse_header (char *line, /* {{{ */
+    char **ret_key, char **ret_value)
+{
+  char *tmp;
+
+  *ret_key = line;
+
+  tmp = strchr (line, ':');
+  if (tmp == NULL)
+    return (-1);
+
+  do
+  {
+    *tmp = 0;
+    tmp++;
+  }
+  while ((tmp[0] == ' ') || (tmp[0] == '\t'));
+
+  if (*tmp == 0)
+    return (-1);
+
+  *ret_value = tmp;
+  return (0);
+} /* }}} int parse_header */
+
+static int parse_ulong_header (char *line, /* {{{ */
+    char **ret_key, unsigned long *ret_value)
+{
+  char *str_value;
+  char *endptr;
+  int status;
+
+  str_value = NULL;
+  status = parse_header (line, ret_key, &str_value);
+  if (status != 0)
+    return (status);
+
+  endptr = NULL;
+  errno = 0;
+  *ret_value = (unsigned long) strtol (str_value, &endptr, /* base = */ 0);
+  if ((endptr == str_value) || (errno != 0))
+    return (-1);
+
+  return (0);
+} /* }}} int parse_ulong_header */
+
+static int parse_char_array_header (char *line, /* {{{ */
+    char **ret_key, char **array, size_t array_len, int alloc)
+{
+  char *tmp_array[array_len];
+  char *value;
+  size_t num;
+  int status;
+
+  value = NULL;
+  status = parse_header (line, ret_key, &value);
+  if (status != 0)
+    return (-1);
+
+  num = strsplit (value, tmp_array, array_len);
+  if (num != array_len)
+    return (-1);
+
+  if (alloc == 0)
+  {
+    memcpy (array, tmp_array, sizeof (tmp_array));
+  }
+  else
+  {
+    size_t i;
+
+    for (i = 0; i < array_len; i++)
+      array[i] = strdup (tmp_array[i]);
+  }
+
+  return (0);
+} /* }}} int parse_char_array_header */
+
+static int parse_value_array_header (char *line, /* {{{ */
+    time_t *ret_time, rrd_value_t *array, size_t array_len)
+{
+  char *str_key;
+  char *str_array[array_len];
+  char *endptr;
+  int status;
+  size_t i;
+
+  str_key = NULL;
+  status = parse_char_array_header (line, &str_key,
+      str_array, array_len, /* alloc = */ 0);
+  if (status != 0)
+    return (-1);
+
+  errno = 0;
+  endptr = NULL;
+  *ret_time = (time_t) strtol (str_key, &endptr, /* base = */ 10);
+  if ((endptr == str_key) || (errno != 0))
+    return (-1);
+
+  for (i = 0; i < array_len; i++)
+  {
+    endptr = NULL;
+    array[i] = (rrd_value_t) strtod (str_array[i], &endptr);
+    if ((endptr == str_array[i]) || (errno != 0))
+      return (-1);
+  }
+
+  return (0);
+} /* }}} int parse_value_array_header */
+
 /* One must hold `lock' when calling `close_connection'. */
 static void close_connection (void) /* {{{ */
 {
@@ -674,6 +805,215 @@ int rrdc_flush (const char *filename) /* {{{ */
   return (status);
 } /* }}} int rrdc_flush */
 
+int rrdc_fetch (const char *filename, /* {{{ */
+    const char *cf,
+    time_t *ret_start, time_t *ret_end,
+    unsigned long *ret_step,
+    unsigned long *ret_ds_num,
+    char ***ret_ds_names,
+    rrd_value_t **ret_data)
+{
+  char buffer[4096];
+  char *buffer_ptr;
+  size_t buffer_free;
+  size_t buffer_size;
+  rrdc_response_t *res;
+  char path_buffer[PATH_MAX];
+  char *path_ptr;
+
+  char *str_tmp;
+  unsigned long flush_version;
+
+  time_t start;
+  time_t end;
+  unsigned long step;
+  unsigned long ds_num;
+  char **ds_names;
+
+  rrd_value_t *data;
+  size_t data_size;
+  size_t data_fill;
+
+  int status;
+  size_t current_line;
+  time_t t;
+
+  if ((filename == NULL) || (cf == NULL))
+    return (-1);
+
+  /* Send request {{{ */
+  memset (buffer, 0, sizeof (buffer));
+  buffer_ptr = &buffer[0];
+  buffer_free = sizeof (buffer);
+
+  status = buffer_add_string ("FETCH", &buffer_ptr, &buffer_free);
+  if (status != 0)
+    return (ENOBUFS);
+
+  /* change to path for rrdcached */
+  path_ptr = get_path (filename, path_buffer);
+  if (path_ptr == NULL)
+    return (EINVAL);
+
+  status = buffer_add_string (path_ptr, &buffer_ptr, &buffer_free);
+  if (status != 0)
+    return (ENOBUFS);
+
+  status = buffer_add_string (cf, &buffer_ptr, &buffer_free);
+  if (status != 0)
+    return (ENOBUFS);
+
+  if ((ret_start != NULL) && (*ret_start > 0))
+  {
+    char tmp[64];
+    snprintf (tmp, sizeof (tmp), "%lu", (unsigned long) *ret_start);
+    tmp[sizeof (tmp) - 1] = 0;
+    status = buffer_add_string (tmp, &buffer_ptr, &buffer_free);
+    if (status != 0)
+      return (ENOBUFS);
+
+    if ((ret_end != NULL) && (*ret_end > 0))
+    {
+      snprintf (tmp, sizeof (tmp), "%lu", (unsigned long) *ret_end);
+      tmp[sizeof (tmp) - 1] = 0;
+      status = buffer_add_string (tmp, &buffer_ptr, &buffer_free);
+      if (status != 0)
+        return (ENOBUFS);
+    }
+  }
+
+  assert (buffer_free < sizeof (buffer));
+  buffer_size = sizeof (buffer) - buffer_free;
+  assert (buffer[buffer_size - 1] == ' ');
+  buffer[buffer_size - 1] = '\n';
+
+  res = NULL;
+  status = request (buffer, buffer_size, &res);
+  if (status != 0)
+    return (status);
+
+  status = res->status;
+  if (status < 0)
+  {
+    rrd_set_error ("rrdcached: %s", res->message);
+    response_free (res);
+    return (status);
+  }
+  /* }}} Send request */
+
+  ds_names = NULL;
+  ds_num = 0;
+  data = NULL;
+  current_line = 0;
+
+  /* Macros to make error handling a little easier (i. e. less to type and
+   * read. `BAIL_OUT' sets the error message, frees all dynamically allocated
+   * variables and returns the provided status code. */
+#define BAIL_OUT(status, ...) do { \
+    rrd_set_error ("rrdc_fetch: " __VA_ARGS__); \
+    free (data); \
+    if (ds_names != 0) { size_t k; for (k = 0; k < ds_num; k++) free (ds_names[k]); } \
+    free (ds_names); \
+    response_free (res); \
+    return (status); \
+  } while (0)
+
+#define READ_NUMERIC_FIELD(name,type,var) do { \
+    char *key; \
+    unsigned long value; \
+    assert (current_line < res->lines_num); \
+    status = parse_ulong_header (res->lines[current_line], &key, &value); \
+    if (status != 0) \
+      BAIL_OUT (-1, "Unable to parse header `%s'", name); \
+    if (strcasecmp (key, name) != 0) \
+      BAIL_OUT (-1, "Unexpected header line: Expected `%s', got `%s'", name, key); \
+    var = (type) value; \
+    current_line++; \
+  } while (0)
+
+  if (res->lines_num < 1)
+    BAIL_OUT (-1, "Premature end of response packet");
+
+  /* We're making some very strong assumptions about the fields below. We
+   * therefore check the version of the `flush' command first, so that later
+   * versions can change the order of fields and it's easier to implement
+   * backwards compatibility. */
+  READ_NUMERIC_FIELD ("FlushVersion", unsigned long, flush_version);
+  if (flush_version != 1)
+    BAIL_OUT (-1, "Don't know how to handle flush format version %lu.",
+        flush_version);
+
+  if (res->lines_num < 5)
+    BAIL_OUT (-1, "Premature end of response packet");
+
+  READ_NUMERIC_FIELD ("Start", time_t, start);
+  READ_NUMERIC_FIELD ("End", time_t, end);
+  if (start >= end)
+    BAIL_OUT (-1, "Malformed start and end times: start = %lu; end = %lu;",
+        (unsigned long) start,
+        (unsigned long) end);
+
+  READ_NUMERIC_FIELD ("Step", unsigned long, step);
+  if (step < 1)
+    BAIL_OUT (-1, "Invalid number for Step: %lu", step);
+
+  READ_NUMERIC_FIELD ("DSCount", unsigned long, ds_num);
+  if (ds_num < 1)
+    BAIL_OUT (-1, "Invalid number for DSCount: %lu", ds_num);
+  
+  /* It's time to allocate some memory */
+  ds_names = calloc ((size_t) ds_num, sizeof (*ds_names));
+  if (ds_names == NULL)
+    BAIL_OUT (-1, "Out of memory");
+
+  status = parse_char_array_header (res->lines[current_line],
+      &str_tmp, ds_names, (size_t) ds_num, /* alloc = */ 1);
+  if (status != 0)
+    BAIL_OUT (-1, "Unable to parse header `DSName'");
+  if (strcasecmp ("DSName", str_tmp) != 0)
+    BAIL_OUT (-1, "Unexpected header line: Expected `DSName', got `%s'", str_tmp);
+  current_line++;
+
+  data_size = ds_num * (end - start) / step;
+  if (data_size < 1)
+    BAIL_OUT (-1, "No data returned or headers invalid.");
+
+  if (res->lines_num != (6 + (data_size / ds_num)))
+    BAIL_OUT (-1, "Got %zu lines, expected %zu",
+        res->lines_num, (6 + (data_size / ds_num)));
+
+  data = calloc (data_size, sizeof (*data));
+  if (data == NULL)
+    BAIL_OUT (-1, "Out of memory");
+  
+
+  data_fill = 0;
+  for (t = start + step; t <= end; t += step, current_line++)
+  {
+    time_t tmp;
+
+    assert (current_line < res->lines_num);
+
+    status = parse_value_array_header (res->lines[current_line],
+        &tmp, data + data_fill, (size_t) ds_num);
+    if (status != 0)
+      BAIL_OUT (-1, "Cannot parse value line");
+
+    data_fill += (size_t) ds_num;
+  }
+
+  *ret_start = start;
+  *ret_end = end;
+  *ret_step = step;
+  *ret_ds_num = ds_num;
+  *ret_ds_names = ds_names;
+  *ret_data = data;
+
+  response_free (res);
+  return (0);
+#undef READ_NUMERIC_FIELD
+#undef BAIL_OUT
+} /* }}} int rrdc_flush */
 
 /* convenience function; if there is a daemon specified, or if we can
  * detect one from the environment, then flush the file.  Otherwise, no-op
index 6c48dec..58f5473 100644 (file)
@@ -1,6 +1,6 @@
 /**
  * RRDTool - src/rrd_client.h
- * Copyright (C) 2008 Florian octo Forster
+ * Copyright (C) 2008-2010  Florian octo Forster
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License as published by the
@@ -66,6 +66,14 @@ int rrdc_update (const char *filename, int values_num,
 int rrdc_flush (const char *filename);
 int rrdc_flush_if_daemon (const char *opt_daemon, const char *filename);
 
+int rrdc_fetch (const char *filename,
+    const char *cf,
+    time_t *ret_start, time_t *ret_end,
+    unsigned long *ret_step,
+    unsigned long *ret_ds_num,
+    char ***ret_ds_names,
+    rrd_value_t **ret_data);
+
 #else
 #      define rrdc_flush_if_daemon(a,b) 0
 #      define rrdc_connect(a) 0
index 4c84f19..ecb8261 100644 (file)
@@ -1,6 +1,6 @@
 /**
  * RRDTool - src/rrd_daemon.c
- * Copyright (C) 2008,2009 Florian octo Forster
+ * Copyright (C) 2008-2010 Florian octo Forster
  * Copyright (C) 2008,2009 Kevin Brintnall
  *
  * This program is free software; you can redistribute it and/or modify it
@@ -1463,6 +1463,176 @@ static int handle_request_update (HANDLER_PROTO) /* {{{ */
 
 } /* }}} int handle_request_update */
 
+static int handle_request_fetch (HANDLER_PROTO) /* {{{ */
+{
+  char *file;
+  char *cf;
+
+  char *start_str;
+  char *end_str;
+  rrd_time_value_t start_tv;
+  rrd_time_value_t end_tv;
+  time_t start_tm;
+  time_t end_tm;
+
+  unsigned long step;
+  unsigned long ds_cnt;
+  char **ds_namv;
+  rrd_value_t *data;
+
+  int status;
+  unsigned long i;
+  time_t t;
+  rrd_value_t *data_ptr;
+
+  file = NULL;
+  cf = NULL;
+  start_str = NULL;
+  end_str = NULL;
+
+  /* Read the arguments */
+  do /* while (0) */
+  {
+    status = buffer_get_field (&buffer, &buffer_size, &file);
+    if (status != 0)
+      break;
+
+    status = buffer_get_field (&buffer, &buffer_size, &cf);
+    if (status != 0)
+      break;
+
+    status = buffer_get_field (&buffer, &buffer_size, &start_str);
+    if (status != 0)
+    {
+      start_str = NULL;
+      status = 0;
+      break;
+    }
+
+    status = buffer_get_field (&buffer, &buffer_size, &end_str);
+    if (status != 0)
+    {
+      end_str = NULL;
+      status = 0;
+      break;
+    }
+  } while (0);
+
+  if (status != 0)
+    return (syntax_error(sock,cmd));
+
+  status = flush_file (file);
+  if ((status != 0) && (status != ENOENT))
+    return (send_response (sock, RESP_ERR,
+          "flush_file (%s) failed with status %i.\n", file, status));
+
+  /* Parse start time */
+  if (start_str != NULL)
+  {
+    const char *errmsg;
+
+    errmsg = rrd_parsetime (start_str, &start_tv);
+    if (errmsg != NULL)
+      return (send_response(sock, RESP_ERR,
+            "Cannot parse start time `%s': %s\n", start_str, errmsg));
+  }
+  else
+    rrd_parsetime ("-86400", &start_tv);
+
+  /* Parse end time */
+  if (end_str != NULL)
+  {
+    const char *errmsg;
+
+    errmsg = rrd_parsetime (end_str, &end_tv);
+    if (errmsg != NULL)
+      return (send_response(sock, RESP_ERR,
+            "Cannot parse end time `%s': %s\n", end_str, errmsg));
+  }
+  else
+    rrd_parsetime ("now", &end_tv);
+
+  start_tm = 0;
+  end_tm = 0;
+  status = rrd_proc_start_end (&start_tv, &end_tv, &start_tm, &end_tm);
+  if (status != 0)
+    return (send_response(sock, RESP_ERR,
+          "rrd_proc_start_end failed: %s\n", rrd_get_error ()));
+
+  step = -1;
+  ds_cnt = 0;
+  ds_namv = NULL;
+  data = NULL;
+
+  status = rrd_fetch_r (file, cf, &start_tm, &end_tm, &step,
+      &ds_cnt, &ds_namv, &data);
+  if (status != 0)
+    return (send_response(sock, RESP_ERR,
+          "rrd_fetch_r failed: %s\n", rrd_get_error ()));
+
+  add_response_info (sock, "FlushVersion: %lu\n", 1);
+  add_response_info (sock, "Start: %lu\n", (unsigned long) start_tm);
+  add_response_info (sock, "End: %lu\n", (unsigned long) end_tm);
+  add_response_info (sock, "Step: %lu\n", step);
+  add_response_info (sock, "DSCount: %lu\n", ds_cnt);
+
+#define SSTRCAT(buffer,str,buffer_fill) do { \
+    size_t str_len = strlen (str); \
+    if ((buffer_fill + str_len) > sizeof (buffer)) \
+      str_len = sizeof (buffer) - buffer_fill; \
+    if (str_len > 0) { \
+      strncpy (buffer + buffer_fill, str, str_len); \
+      buffer_fill += str_len; \
+      assert (buffer_fill <= sizeof (buffer)); \
+      if (buffer_fill == sizeof (buffer)) \
+        buffer[buffer_fill - 1] = 0; \
+      else \
+        buffer[buffer_fill] = 0; \
+    } \
+  } while (0)
+
+  { /* Add list of DS names */
+    char linebuf[1024];
+    size_t linebuf_fill;
+
+    memset (linebuf, 0, sizeof (linebuf));
+    linebuf_fill = 0;
+    for (i = 0; i < ds_cnt; i++)
+    {
+      if (i > 0)
+        SSTRCAT (linebuf, " ", linebuf_fill);
+      SSTRCAT (linebuf, ds_namv[i], linebuf_fill);
+    }
+    add_response_info (sock, "DSName: %s\n", linebuf);
+  }
+
+  /* Add the actual data */
+  assert (step > 0);
+  data_ptr = data;
+  for (t = start_tm + step; t <= end_tm; t += step)
+  {
+    char linebuf[1024];
+    size_t linebuf_fill;
+    char tmp[128];
+
+    memset (linebuf, 0, sizeof (linebuf));
+    linebuf_fill = 0;
+    for (i = 0; i < ds_cnt; i++)
+    {
+      snprintf (tmp, sizeof (tmp), " %0.10e", *data_ptr);
+      tmp[sizeof (tmp) - 1] = 0;
+      SSTRCAT (linebuf, tmp, linebuf_fill);
+
+      data_ptr++;
+    }
+
+    add_response_info (sock, "%10lu:%s\n", (unsigned long) t, linebuf);
+  } /* for (t) */
+
+  return (send_response (sock, RESP_OK, "Success\n"));
+#undef SSTRCAT
+} /* }}} int handle_request_fetch */
+
 /* we came across a "WROTE" entry during journal replay.
  * throw away any values that we have accumulated for this file
  */
@@ -1635,6 +1805,14 @@ static command_t list_of_commands[] = { /* {{{ */
     NULL
   },
   {
+    "FETCH",
+    handle_request_fetch,
+    CMD_CONTEXT_CLIENT,
+    "FETCH <file> <CF> [<start> [<end>]]\n"
+    ,
+    "The 'FETCH' can be used by the client to retrieve values from an RRD file.\n"
+  },
+  {
     "QUIT",
     handle_request_quit,
     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,