Merge branch 'master' into ff/rrdd
authorFlorian Forster <octo@leeloo.home.verplant.org>
Sun, 29 Jun 2008 21:38:36 +0000 (23:38 +0200)
committerFlorian Forster <octo@leeloo.home.verplant.org>
Sun, 29 Jun 2008 21:38:36 +0000 (23:38 +0200)
18 files changed:
CONTRIBUTORS
configure.ac
doc/Makefile.am
doc/rrdcached.pod [new file with mode: 0644]
doc/rrdfetch.pod
doc/rrdgraph.pod
doc/rrdupdate.pod
src/Makefile.am
src/rrd.h
src/rrd_client.c [new file with mode: 0644]
src/rrd_client.h [new file with mode: 0644]
src/rrd_daemon.c [new file with mode: 0644]
src/rrd_fetch.c
src/rrd_graph.c
src/rrd_graph.h
src/rrd_tool.c
src/rrd_tool.h
src/rrd_update.c

index 847c1d1..a2dc232 100644 (file)
@@ -21,7 +21,7 @@ David Grimes <dgrimes with navisite.com> SQRT/SORT/REV/SHIFT/TREND
 David L. Barker <dave with ncomtech.com> xport function bug fixes
 Evan Miller <emiller with imvu.com> Multiplicative HW Enhancements
 Frank Strauss <strauss with escape.de> TCL bindings
-Florian octo Forster <rrdtool nospam.verplant.org> rrd_restore libxml2 rewrite deprecated function export
+Florian octo Forster <rrdtool nospam.verplant.org> rrd_restore libxml2 rewrite, deprecated function export, rrdcached
 Henrik Storner <henrik with hswn.dk> functions for min/max values of data in graph
 Hermann Hueni <hueni with glue.ch> (SunOS porting)
 Jakob Ilves <jilves with se.oracle.com> HPUX 11
index 7a15b5c..822626e 100644 (file)
@@ -355,8 +355,9 @@ CONFIGURE_PART(Audit Compilation Environment)
 
 
 dnl Check for the compiler and static/shared library creation.
-AC_PROG_CC
 AC_PROG_CPP
+AC_PROG_CC
+AM_PROG_CC_C_O
 AC_PROG_LIBTOOL
 
 dnl Try to detect/use GNU features
@@ -674,7 +675,7 @@ EX_CHECK_ALL(cairo,      cairo_font_options_create,     cairo.h,
 EX_CHECK_ALL(cairo,      cairo_svg_surface_create,      cairo-svg.h,            cairo-svg,   1.4.6,  http://cairographics.org/releases/, "")
 EX_CHECK_ALL(cairo,      cairo_pdf_surface_create,      cairo-pdf.h,            cairo-pdf,   1.4.6,  http://cairographics.org/releases/, "")
 EX_CHECK_ALL(cairo,      cairo_ps_surface_create,       cairo-ps.h,             cairo-ps,    1.4.6,  http://cairographics.org/releases/, "")
-dnl EX_CHECK_ALL(glib-2.0,   glib_check_version,        glib.h,                 glib-2.0,    2.12.12, ftp://ftp.gtk.org/pub/glib/2.12/, "")
+EX_CHECK_ALL(glib-2.0,   glib_check_version,            glib.h,                 glib-2.0,    2.12.12, ftp://ftp.gtk.org/pub/glib/2.12/, "")
 EX_CHECK_ALL(pango-1.0,  pango_cairo_context_set_font_options,  pango/pango.h,  pangocairo,  1.17,    http://ftp.gnome.org/pub/GNOME/sources/pango/1.17, "")
 EX_CHECK_ALL(xml2,       xmlParseFile,                  libxml/parser.h,        libxml-2.0,        2.6.31,  http://xmlsoft.org/downloads.html, /usr/include/libxml2)
 
index 16fd617..05d637d 100644 (file)
@@ -10,7 +10,7 @@ CLEANFILES = *.1 *.html *.txt *-dircache RRD?.pod *.pdf *~ core *itemcache *.rej
 
 POD = bin_dec_hex.pod        rrddump.pod            rrdgraph_examples.pod  rrdrestore.pod         rrdupdate.pod  \
       cdeftutorial.pod       rrdfetch.pod           rrdgraph_graph.pod     rrdthreads.pod         rrdxport.pod   \
-      rpntutorial.pod        rrdfirst.pod           rrdgraph_rpn.pod       rrdtool.pod                           \
+      rpntutorial.pod        rrdfirst.pod           rrdgraph_rpn.pod       rrdtool.pod            rrdcached.pod       \
       rrd-beginners.pod      rrdinfo.pod            rrdtune.pod            rrdbuild.pod                          \
       rrdcgi.pod             rrdgraph.pod           rrdlast.pod            rrdlastupdate.pod                     \
       rrdcreate.pod          rrdgraph_data.pod      rrdresize.pod          rrdtutorial.pod                       
diff --git a/doc/rrdcached.pod b/doc/rrdcached.pod
new file mode 100644 (file)
index 0000000..297f009
--- /dev/null
@@ -0,0 +1,64 @@
+=pod
+
+=head1 NAME
+
+rrdcached - Data caching daemon for rrdtool
+
+=head1 SYNOPSIS
+
+B<rrdcached> [B<-l> I<address>] [B<-w> I<timeout>] [B<-f> I<timeout>]
+
+=head1 DESCRIPTION
+
+B<rrdcached> is a daemon that receives updates to existing RRD files,
+accumulates them and, if enough have been received or a defined time has
+passed, writes the updates to the RRD file. A I<flush> command may be used to
+force writing of values to disk, so that graphing facilities and similar can
+work with up-to-date data.
+
+=head1 OPTIONS
+
+=over 4
+
+=item B<-l> I<address>
+
+Tells the daemon to bind to I<address> and accept incoming connections on that
+socket. If I<address> begins with C<unix:>, everything following that prefix is
+interpreted as the path to a UNIX domain socket. Otherwise the address or node
+name are resolved using L<getaddrinfo(3)>.
+
+If the B<-l> option is not specified the default address,
+C<unix:/tmp/rrdcached.sock>, will be used.
+
+=item B<-w> I<timeout>
+
+Data is written to disk every I<timeout> seconds. If this option is not
+specified the default interval of 300E<nbsp>seconds will be used.
+
+=item B<-f> I<timeout>
+
+Every I<timeout> seconds the entire cache is searched for old values which are
+written to disk. This only concerns files to which updates have stopped, so
+setting this to a high value, such as 3600E<nbsp>seconds, is acceptable in most
+cases. This timeout defaults to 3600E<nbsp>seconds.
+
+=back
+
+=head1 BUGS
+
+=over 4
+
+=item
+
+Base directory is currently hard coded. The daemon will chdir to C</tmp/>.
+
+=back
+
+=head1 SEE ALSO
+
+L<rrdtool(1)>, L<rrdgraph(1)>
+
+=head1 AUHOR
+
+B<rrdcached> and this manual page have been written by Florian Forster
+E<lt>octoE<nbsp>atE<nbsp>verplant.orgE<gt>.
index 51b5ccd..9ef417a 100644 (file)
@@ -48,6 +48,17 @@ the end of the time series in seconds since epoch. See also AT-STYLE
 TIME SPECIFICATION section for a detailed explanation of how to
 specify the end time.
 
+=item B<--daemon> I<address>
+
+Address of the L<rrdcached(1)> daemon. If specified, a C<flush> command is sent
+to the server before reading the RRD files. This allows B<rrdtool> to return
+fresh data even if the daemon is configured to cache values for a long time. To
+specify a UNIX domain socket use the prefix C<unix:>, see example below. Other
+addresses are interpreted as normal network addresses, i.E<nbsp>e. IPv4 or IPv6
+addresses in most cases.
+
+ rrdtool fetch --daemon unix:/var/run/rrdcached.sock /var/lib/rrd/foo.rrd AVERAGE
+
 =back
 
 =head2 RESOLUTION INTERVAL
@@ -257,6 +268,21 @@ I<931225537> -- 18:45  July 5th, 1999
 I<19970703 12:45> -- 12:45  July 3th, 1997
 (my favorite, and its even got an ISO number (8601)).
 
+=head1 ENVIRONMENT VARIABLES
+
+The following environment variables may be used to change the behavior of
+C<rrdtoolE<nbsp>fetch>:
+
+=over 4
+
+=item B<RRDCACHED_ADDRESS>
+
+If this environment variable is set it will have the same effect as specifying
+the C<--daemon> option on the command line. If both are present, the command
+line argument takes precedence.
+
+=back
+
 =head1 AUTHOR
 
 Tobias Oetiker <tobi@oetiker.ch>
index 092e37f..99b2e6b 100644 (file)
@@ -248,7 +248,7 @@ to the more robust B<--alt-y-grid> mode.
 
 How many digits should rrdtool assume the y-axis labels to be? You
 may have to use this option to make enough space once you start
-fideling with the y-axis labeling.
+fiddling with the y-axis labeling.
 
 [B<--units=si>]
 
@@ -267,6 +267,17 @@ Note, that only the image size will be returned, if you run with lazy even
 when using graphv.
 
 
+[B<--daemon> I<address>]
+
+Address of the L<rrdcached(1)> daemon. If specified, a C<flush> command is sent
+to the server before reading the RRD files. This allows the graph to contain
+fresh data even if the daemon is configured to cache values for a long time. To
+specify a UNIX domain socket use the prefix C<unix:>, see example below. Other
+addresses are interpreted as normal network addresses, i.E<nbsp>e. IPv4 or IPv6
+addresses in most cases.
+
+ rrdtool graph [...] --daemon unix:/var/run/rrdcached.sock [...]
+
 [B<-f>|B<--imginfo> I<printfstr>]
 
 After the image has been created, the graph function uses printf
@@ -441,8 +452,6 @@ at least one print statement to generate a report.
 See L<rrdgraph_graph> for the exact format.
 
 
-=back
-
 =head2 graphv
 
 Calling rrdtool with the graphv option will return information in the
@@ -471,6 +480,21 @@ There is more information returned than in the standard interface.
 Especially the 'graph_*' keys are new. They help applications that want to
 know what is where on the graph.
 
+=head1 ENVIRONMENT VARIABLES
+
+The following environment variables may be used to change the behavior of
+C<rrdtoolE<nbsp>graph>:
+
+=over 4
+
+=item B<RRDCACHED_ADDRESS>
+
+If this environment variable is set it will have the same effect as specifying
+the C<--daemon> option on the command line. If both are present, the command
+line argument takes precedence.
+
+=back
+
 =head1 SEE ALSO
 
 L<rrdgraph> gives an overview of how B<rrdtool graph> works.
index cc0b452..d61cf12 100644 (file)
@@ -6,6 +6,7 @@ rrdupdate - Store a new set of values into the RRD
 
 B<rrdtool> {B<update> | B<updatev>} I<filename>
 S<[B<--template>|B<-t> I<ds-name>[B<:>I<ds-name>]...]>
+S<[B<--daemon> I<address>]>
 S<B<N>|I<timestamp>B<:>I<value>[B<:>I<value>...]>
 S<I<at-timestamp>B<@>I<value>[B<:>I<value>...]>
 S<[I<timestamp>B<:>I<value>[B<:>I<value>...] ...]>
@@ -29,6 +30,9 @@ RRA (consolidation function and PDPs per CDP), and data source (name).
 Note that depending on the arguments of the current and previous call to
 update, the list may have no entries or a large number of entries.
 
+Since B<updatev> requires direct disk access, the B<--daemon> option cannot be
+used with this command.
+
 =item I<filename>
 
 The name of the B<RRD> you want to update.
@@ -56,6 +60,18 @@ function. If this is done accidentally (and this can only be done
 using the template switch), B<RRDtool> will ignore the value specified
 for the COMPUTE B<DST>.
 
+=item B<--daemon> I<address>
+
+If given, B<RRDTool> will try to connect to the caching daemon L<rrdcached(1)>
+at I<address> and will fail if the connection cannot be established. If the
+connection is successfully established the values will be sent to the daemon
+instead of accessing the files directly. If I<address> begins with C<unix:>
+then everything after this prefix will be considered to be a UNIX domain
+socket, see L<EXAMPLES> below. Otherwise the address is interpreted as network
+address or node name as understood by L<getaddrinfo(3)>. One practical
+consequence is that both, IPv4 and IPv6, may be used if the system supports
+it. This option is available for the B<update> command only.
+
 =item B<N>|I<timestamp>B<:>I<value>[B<:>I<value>...]
 
 The data used for updating the RRD was acquired at a certain
@@ -82,20 +98,65 @@ separator.
 
 =back
 
-=head1 EXAMPLE
+=head1 ENVIRONMENT VARIABLES
+
+The following environment variables may be used to change the behavior of
+C<rrdtoolE<nbsp>update>:
+
+=over 4
+
+=item B<RRDCACHED_ADDRESS>
+
+If this environment variable is set it will have the same effect as specifying
+the C<--daemon> option on the command line. If both are present, the command
+line argument takes precedence.
+
+=back
+
+=head1 EXAMPLES
+
+=over 4
+
+=item
 
 C<rrdtool update demo1.rrd N:3.44:3.15:U:23>
 
 Update the database file demo1.rrd with 3 known and one I<*UNKNOWN*>
 value. Use the current time as the update time.
 
+=item
+
 C<rrdtool update demo2.rrd 887457267:U 887457521:22 887457903:2.7>
 
 Update the database file demo2.rrd which expects data from a single
 data-source, three times. First with an I<*UNKNOWN*> value then with two
 regular readings. The update interval seems to be around 300 seconds.
 
-=head1 AUTHOR
+=item
+
+C<rrdtool update --cache /var/lib/rrd/demo3.rrd N:42>
+
+Update the file C</var/lib/rrd/demo3.rrd> with a single data source, using the
+current time. If the caching daemon cannot be reached, do B<not> fall back to
+direct file access.
+
+=item
+
+C<rrdtool update --daemon unix:/tmp/rrdd.sock demo4.rrd N:23>
+
+Use the UNIX domain socket C</tmp/rrdd.sock> to contact the caching daemon. If
+the caching daemon is not available, update the file C<demo4.rrd> directly.
+B<WARNING:> Since a relative path is specified, the following disturbing effect
+may occur: If the daemon is available, the file relative to the working
+directory B<of the daemon> is used. If the daemon is not available, the file
+relative to the current working directory of the invoking process is used.
+B<This may update two different files depending on whether the daemon could be
+reached or not.> Don't do relative paths, kids!
+
+=back
+
+=head1 AUTHORS
 
-Tobias Oetiker <tobi@oetiker.ch>
+Tobias Oetiker <tobi@oetiker.ch>,
+Florian Forster <octoE<nbsp>atE<nbsp>verplant.org>
 
index c567679..7b97fff 100644 (file)
@@ -25,6 +25,7 @@ UPD_C_FILES =         \
        rrd_info.c      \
        rrd_error.c     \
        rrd_open.c      \
+       rrd_client.c    \
        rrd_nan_inf.c   \
        rrd_rpncalc.c   \
        rrd_update.c
@@ -81,7 +82,7 @@ librrd_th_la_LIBADD          = $(ALL_LIBS)
 
 include_HEADERS        = rrd.h rrd_format.h
 
-bin_PROGRAMS   = rrdtool rrdupdate
+bin_PROGRAMS   = rrdtool rrdupdate rrdcached
 
 if BUILD_RRDCGI
 bin_PROGRAMS += rrdcgi
@@ -97,6 +98,11 @@ rrdtool_SOURCES = rrd_tool.c
 rrdtool_DEPENDENCIES = librrd.la
 rrdtool_LDADD  = librrd.la
 
+rrdcached_SOURCES = rrd_daemon.c
+rrdcached_DEPENDENCIES = librrd.la
+rrdcached_CPPFLAGS = -DVERSION='"$(VERSION)"' -DLOCALSTATEDIR='"$(localstatedir)"'
+rrdcached_LDADD = librrd.la
+
 # strftime is here because we do not usually need it. unices have propper
 # iso date support
 EXTRA_DIST= strftime.c strftime.h \
index 31fd468..daed0e2 100644 (file)
--- a/src/rrd.h
+++ b/src/rrd.h
@@ -217,15 +217,16 @@ extern    "C" {
     const char *_template,
     int argc,
     const char **argv);
-    int       rrd_fetch_r(
-    const char *filename,
-    const char *cf,
-    time_t *start,
-    time_t *end,
-    unsigned long *step,
-    unsigned long *ds_cnt,
-    char ***ds_namv,
-    rrd_value_t **data);
+    int rrd_fetch_r (
+            const char *filename,
+            const char *cf,
+            time_t *start,
+            time_t *end,
+            unsigned long *step,
+            const char *daemon,
+            unsigned long *ds_cnt,
+            char ***ds_namv,
+            rrd_value_t **data);
     int       rrd_dump_r(
     const char *filename,
     char *outname);
diff --git a/src/rrd_client.c b/src/rrd_client.c
new file mode 100644 (file)
index 0000000..d9a7468
--- /dev/null
@@ -0,0 +1,432 @@
+/**
+ * RRDTool - src/rrd_client.c
+ * Copyright (C) 2008 Florian octo Forster
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; only version 2 of the License is applicable.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
+ *
+ * Authors:
+ *   Florian octo Forster <octo at verplant.org>
+ **/
+
+#include "rrd.h"
+#include "rrd_client.h"
+
+#include <stdlib.h>
+#include <string.h>
+#include <errno.h>
+#include <assert.h>
+#include <pthread.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <netdb.h>
+
+static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
+static int sd = -1;
+
+static ssize_t sread (void *buffer_void, size_t buffer_size) /* {{{ */
+{
+  char    *buffer;
+  size_t   buffer_used;
+  size_t   buffer_free;
+  ssize_t  status;
+
+  buffer       = (char *) buffer_void;
+  buffer_used  = 0;
+  buffer_free  = buffer_size;
+
+  while (buffer_free > 0)
+  {
+    status = read (sd, buffer + buffer_used, buffer_free);
+    if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
+      continue;
+
+    if (status < 0)
+      return (-1);
+
+    if (status == 0)
+    {
+      close (sd);
+      sd = -1;
+      errno = EPROTO;
+      return (-1);
+    }
+
+    assert ((0 > status) || (buffer_free >= (size_t) status));
+
+    buffer_free = buffer_free - status;
+    buffer_used = buffer_used + status;
+
+    if (buffer[buffer_used - 1] == '\n')
+      break;
+  }
+
+  if (buffer[buffer_used - 1] != '\n')
+  {
+    errno = ENOBUFS;
+    return (-1);
+  }
+
+  buffer[buffer_used - 1] = 0;
+  return (buffer_used);
+} /* }}} ssize_t sread */
+
+static ssize_t swrite (const void *buf, size_t count) /* {{{ */
+{
+  const char *ptr;
+  size_t      nleft;
+  ssize_t     status;
+
+  ptr   = (const char *) buf;
+  nleft = count;
+
+  while (nleft > 0)
+  {
+    status = write (sd, (const void *) ptr, nleft);
+
+    if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
+      continue;
+
+    if (status < 0)
+      return (status);
+
+    nleft = nleft - status;
+    ptr   = ptr   + status;
+  }
+
+  return (0);
+} /* }}} ssize_t swrite */
+
+static int buffer_add_string (const char *str, /* {{{ */
+    char **buffer_ret, size_t *buffer_size_ret)
+{
+  char *buffer;
+  size_t buffer_size;
+  size_t buffer_pos;
+  size_t i;
+  int status;
+
+  buffer = *buffer_ret;
+  buffer_size = *buffer_size_ret;
+  buffer_pos = 0;
+
+  i = 0;
+  status = -1;
+  while (buffer_pos < buffer_size)
+  {
+    if (str[i] == 0)
+    {
+      buffer[buffer_pos] = ' ';
+      buffer_pos++;
+      status = 0;
+      break;
+    }
+    else if ((str[i] == ' ') || (str[i] == '\\'))
+    {
+      if (buffer_pos >= (buffer_size - 1))
+        break;
+      buffer[buffer_pos] = '\\';
+      buffer_pos++;
+      buffer[buffer_pos] = str[i];
+      buffer_pos++;
+    }
+    else
+    {
+      buffer[buffer_pos] = str[i];
+      buffer_pos++;
+    }
+    i++;
+  } /* while (buffer_pos < buffer_size) */
+
+  if (status != 0)
+    return (-1);
+
+  *buffer_ret = buffer + buffer_pos;
+  *buffer_size_ret = buffer_size - buffer_pos;
+
+  return (0);
+} /* }}} int buffer_add_string */
+
+static int buffer_add_value (const char *value, /* {{{ */
+    char **buffer_ret, size_t *buffer_size_ret)
+{
+  char temp[4096];
+
+  if (strncmp (value, "N:", 2) == 0)
+    snprintf (temp, sizeof (temp), "%lu:%s",
+        (unsigned long) time (NULL), value + 2);
+  else
+    strncpy (temp, value, sizeof (temp));
+  temp[sizeof (temp) - 1] = 0;
+
+  return (buffer_add_string (temp, buffer_ret, buffer_size_ret));
+} /* }}} int buffer_add_value */
+
+static int rrdc_connect_unix (const char *path) /* {{{ */
+{
+  struct sockaddr_un sa;
+  int status;
+
+  assert (path != NULL);
+
+  pthread_mutex_lock (&lock);
+
+  if (sd >= 0)
+  {
+    pthread_mutex_unlock (&lock);
+    return (0);
+  }
+
+  sd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
+  if (sd < 0)
+  {
+    status = errno;
+    pthread_mutex_unlock (&lock);
+    return (status);
+  }
+
+  memset (&sa, 0, sizeof (sa));
+  sa.sun_family = AF_UNIX;
+  strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
+
+  status = connect (sd, (struct sockaddr *) &sa, sizeof (sa));
+  if (status != 0)
+  {
+    status = errno;
+    pthread_mutex_unlock (&lock);
+    return (status);
+  }
+
+  pthread_mutex_unlock (&lock);
+
+  return (0);
+} /* }}} int rrdc_connect_unix */
+
+int rrdc_connect (const char *addr) /* {{{ */
+{
+  struct addrinfo ai_hints;
+  struct addrinfo *ai_res;
+  struct addrinfo *ai_ptr;
+  int status;
+
+  if (addr == NULL)
+    addr = RRDCACHED_DEFAULT_ADDRESS;
+
+  if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
+    return (rrdc_connect_unix (addr + strlen ("unix:")));
+  else if (addr[0] == '/')
+    return (rrdc_connect_unix (addr));
+
+  pthread_mutex_lock (&lock);
+
+  if (sd >= 0)
+  {
+    pthread_mutex_unlock (&lock);
+    return (0);
+  }
+
+  memset (&ai_hints, 0, sizeof (ai_hints));
+  ai_hints.ai_flags = 0;
+#ifdef AI_ADDRCONFIG
+  ai_hints.ai_flags |= AI_ADDRCONFIG;
+#endif
+  ai_hints.ai_family = AF_UNSPEC;
+  ai_hints.ai_socktype = SOCK_STREAM;
+
+  ai_res = NULL;
+  status = getaddrinfo (addr, RRDCACHED_DEFAULT_PORT, &ai_hints, &ai_res);
+  if (status != 0)
+  {
+    pthread_mutex_unlock (&lock);
+    return (status);
+  }
+
+  for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
+  {
+    sd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
+    if (sd < 0)
+    {
+      status = errno;
+      sd = -1;
+      continue;
+    }
+
+    status = connect (sd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
+    if (status != 0)
+    {
+      status = errno;
+      close (sd);
+      sd = -1;
+      continue;
+    }
+
+    assert (status == 0);
+    break;
+  } /* for (ai_ptr) */
+  pthread_mutex_unlock (&lock);
+
+  return (status);
+} /* }}} int rrdc_connect */
+
+int rrdc_disconnect (void) /* {{{ */
+{
+  pthread_mutex_lock (&lock);
+
+  if (sd < 0)
+  {
+    pthread_mutex_unlock (&lock);
+    return (0);
+  }
+
+  close (sd);
+  sd = -1;
+
+  pthread_mutex_unlock (&lock);
+
+  return (0);
+} /* }}} int rrdc_disconnect */
+
+int rrdc_update (const char *filename, int values_num, /* {{{ */
+               const char * const *values)
+{
+  char buffer[4096];
+  char *buffer_ptr;
+  size_t buffer_free;
+  size_t buffer_size;
+  int status;
+  int i;
+
+  memset (buffer, 0, sizeof (buffer));
+  buffer_ptr = &buffer[0];
+  buffer_free = sizeof (buffer);
+
+  status = buffer_add_string ("update", &buffer_ptr, &buffer_free);
+  if (status != 0)
+    return (ENOBUFS);
+
+  status = buffer_add_string (filename, &buffer_ptr, &buffer_free);
+  if (status != 0)
+    return (ENOBUFS);
+
+  for (i = 0; i < values_num; i++)
+  {
+    status = buffer_add_value (values[i], &buffer_ptr, &buffer_free);
+    if (status != 0)
+      return (ENOBUFS);
+  }
+
+  assert (buffer_free < sizeof (buffer));
+  buffer_size = sizeof (buffer) - buffer_free;
+  assert (buffer[buffer_size - 1] == ' ');
+  buffer[buffer_size - 1] = '\n';
+
+  pthread_mutex_lock (&lock);
+
+  if (sd < 0)
+  {
+    pthread_mutex_unlock (&lock);
+    return (ENOTCONN);
+  }
+
+  status = swrite (buffer, buffer_size);
+  if (status != 0)
+  {
+    pthread_mutex_unlock (&lock);
+    return (status);
+  }
+
+  status = sread (buffer, sizeof (buffer));
+  if (status < 0)
+  {
+    status = errno;
+    pthread_mutex_unlock (&lock);
+    return (status);
+  }
+  else if (status == 0)
+  {
+    pthread_mutex_unlock (&lock);
+    return (ENODATA);
+  }
+
+  pthread_mutex_unlock (&lock);
+
+  status = atoi (buffer);
+  return (status);
+} /* }}} int rrdc_update */
+
+int rrdc_flush (const char *filename) /* {{{ */
+{
+  char buffer[4096];
+  char *buffer_ptr;
+  size_t buffer_free;
+  size_t buffer_size;
+  int status;
+
+  if (filename == NULL)
+    return (-1);
+
+  memset (buffer, 0, sizeof (buffer));
+  buffer_ptr = &buffer[0];
+  buffer_free = sizeof (buffer);
+
+  status = buffer_add_string ("flush", &buffer_ptr, &buffer_free);
+  if (status != 0)
+    return (ENOBUFS);
+
+  status = buffer_add_string (filename, &buffer_ptr, &buffer_free);
+  if (status != 0)
+    return (ENOBUFS);
+
+  assert (buffer_free < sizeof (buffer));
+  buffer_size = sizeof (buffer) - buffer_free;
+  assert (buffer[buffer_size - 1] == ' ');
+  buffer[buffer_size - 1] = '\n';
+
+  pthread_mutex_lock (&lock);
+
+  if (sd < 0)
+  {
+    pthread_mutex_unlock (&lock);
+    return (ENOTCONN);
+  }
+
+  status = swrite (buffer, buffer_size);
+  if (status != 0)
+  {
+    pthread_mutex_unlock (&lock);
+    return (status);
+  }
+
+  status = sread (buffer, sizeof (buffer));
+  if (status < 0)
+  {
+    status = errno;
+    pthread_mutex_unlock (&lock);
+    return (status);
+  }
+  else if (status == 0)
+  {
+    pthread_mutex_unlock (&lock);
+    return (ENODATA);
+  }
+
+  pthread_mutex_unlock (&lock);
+
+  status = atoi (buffer);
+  return (status);
+} /* }}} int rrdc_flush */
+
+/*
+ * vim: set sw=2 sts=2 ts=8 et fdm=marker :
+ */
diff --git a/src/rrd_client.h b/src/rrd_client.h
new file mode 100644 (file)
index 0000000..92d4c07
--- /dev/null
@@ -0,0 +1,40 @@
+/**
+ * RRDTool - src/rrd_client.h
+ * Copyright (C) 2008 Florian octo Forster
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; only version 2 of the License is applicable.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
+ *
+ * Authors:
+ *   Florian octo Forster <octo at verplant.org>
+ **/
+
+#ifndef __RRD_CLIENT_H
+#define __RRD_CLIENT_H 1
+
+#ifndef RRDCACHED_DEFAULT_ADDRESS
+# define RRDCACHED_DEFAULT_ADDRESS "unix:/tmp/rrdcached.sock"
+#endif
+
+#define RRDCACHED_DEFAULT_PORT "42217"
+#define ENV_RRDCACHED_ADDRESS "RRDCACHED_ADDRESS"
+
+int rrdc_connect (const char *addr);
+int rrdc_disconnect (void);
+
+int rrdc_update (const char *filename, int values_num,
+        const char * const *values);
+
+int rrdc_flush (const char *filename);
+
+#endif /* __RRD_CLIENT_H */
diff --git a/src/rrd_daemon.c b/src/rrd_daemon.c
new file mode 100644 (file)
index 0000000..87ac8e9
--- /dev/null
@@ -0,0 +1,1350 @@
+/**
+ * RRDTool - src/rrd_daemon.c
+ * Copyright (C) 2008 Florian octo Forster
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; only version 2 of the License is applicable.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
+ *
+ * Authors:
+ *   Florian octo Forster <octo at verplant.org>
+ **/
+
+/*
+ * First tell the compiler to stick to the C99 and POSIX standards as close as
+ * possible.
+ */
+#ifndef __STRICT_ANSI__ /* {{{ */
+# define __STRICT_ANSI__
+#endif
+
+#ifndef _ISOC99_SOURCE
+# define _ISOC99_SOURCE
+#endif
+
+#ifdef _POSIX_C_SOURCE
+# undef _POSIX_C_SOURCE
+#endif
+#define _POSIX_C_SOURCE 200112L
+
+/* Single UNIX needed for strdup. */
+#ifdef _XOPEN_SOURCE
+# undef _XOPEN_SOURCE
+#endif
+#define _XOPEN_SOURCE 500
+
+#ifndef _REENTRANT
+# define _REENTRANT
+#endif
+
+#ifndef _THREAD_SAFE
+# define _THREAD_SAFE
+#endif
+
+#ifdef _GNU_SOURCE
+# undef _GNU_SOURCE
+#endif
+/* }}} */
+
+/*
+ * Now for some includes..
+ */
+#include "rrd.h" /* {{{ */
+#include "rrd_client.h"
+
+#include <stdlib.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <string.h>
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <signal.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <netdb.h>
+#include <poll.h>
+#include <syslog.h>
+#include <pthread.h>
+#include <errno.h>
+#include <assert.h>
+#include <sys/time.h>
+#include <time.h>
+
+#include <glib-2.0/glib.h>
+/* }}} */
+
+#define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
+
+#ifndef __GNUC__
+# define __attribute__(x) /**/
+#endif
+
+/*
+ * Types
+ */
+struct listen_socket_s
+{
+  int fd;
+  char path[PATH_MAX + 1];
+};
+typedef struct listen_socket_s listen_socket_t;
+
+struct cache_item_s;
+typedef struct cache_item_s cache_item_t;
+struct cache_item_s
+{
+  char *file;
+  char **values;
+  int values_num;
+  time_t last_flush_time;
+#define CI_FLAGS_IN_TREE  0x01
+#define CI_FLAGS_IN_QUEUE 0x02
+  int flags;
+
+  cache_item_t *next;
+};
+
+enum queue_side_e
+{
+  HEAD,
+  TAIL
+};
+typedef enum queue_side_e queue_side_t;
+
+/*
+ * Variables
+ */
+static listen_socket_t *listen_fds = NULL;
+static size_t listen_fds_num = 0;
+
+static int do_shutdown = 0;
+
+static pthread_t queue_thread;
+
+static pthread_t *connetion_threads = NULL;
+static pthread_mutex_t connetion_threads_lock = PTHREAD_MUTEX_INITIALIZER;
+static int connetion_threads_num = 0;
+
+/* Cache stuff */
+static GTree          *cache_tree = NULL;
+static cache_item_t   *cache_queue_head = NULL;
+static cache_item_t   *cache_queue_tail = NULL;
+static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
+
+static pthread_cond_t  flush_cond = PTHREAD_COND_INITIALIZER;
+
+static int config_write_interval = 300;
+static int config_flush_interval = 3600;
+static char *config_pid_file = NULL;
+static char *config_base_dir = NULL;
+
+static char **config_listen_address_list = NULL;
+static int config_listen_address_list_len = 0;
+
+/* 
+ * Functions
+ */
+static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
+{
+  do_shutdown++;
+} /* }}} void sig_int_handler */
+
+static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
+{
+  do_shutdown++;
+} /* }}} void sig_term_handler */
+
+static int write_pidfile (void) /* {{{ */
+{
+  pid_t pid;
+  char *file;
+  FILE *fh;
+
+  pid = getpid ();
+  
+  file = (config_pid_file != NULL)
+    ? config_pid_file
+    : LOCALSTATEDIR "/run/rrdcached.pid";
+
+  fh = fopen (file, "w");
+  if (fh == NULL)
+  {
+    RRDD_LOG (LOG_ERR, "write_pidfile: Opening `%s' failed.", file);
+    return (-1);
+  }
+
+  fprintf (fh, "%i\n", (int) pid);
+  fclose (fh);
+
+  return (0);
+} /* }}} int write_pidfile */
+
+static int remove_pidfile (void) /* {{{ */
+{
+  char *file;
+  int status;
+
+  file = (config_pid_file != NULL)
+    ? config_pid_file
+    : LOCALSTATEDIR "/run/rrdcached.pid";
+
+  status = unlink (file);
+  if (status == 0)
+    return (0);
+  return (errno);
+} /* }}} int remove_pidfile */
+
+/*
+ * enqueue_cache_item:
+ * `cache_lock' must be acquired before calling this function!
+ */
+static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
+    queue_side_t side)
+{
+  RRDD_LOG (LOG_DEBUG, "enqueue_cache_item: Adding %s to the update queue.",
+      ci->file);
+
+  if (ci == NULL)
+    return (-1);
+
+  if (ci->values_num == 0)
+    return (0);
+
+  if (side == HEAD)
+  {
+    if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
+    {
+      assert (ci->next == NULL);
+      ci->next = cache_queue_head;
+      cache_queue_head = ci;
+
+      if (cache_queue_tail == NULL)
+        cache_queue_tail = cache_queue_head;
+    }
+    else if (cache_queue_head == ci)
+    {
+      /* do nothing */
+    }
+    else /* enqueued, but not first entry */
+    {
+      cache_item_t *prev;
+
+      /* find previous entry */
+      for (prev = cache_queue_head; prev != NULL; prev = prev->next)
+        if (prev->next == ci)
+          break;
+      assert (prev != NULL);
+
+      /* move to the front */
+      prev->next = ci->next;
+      ci->next = cache_queue_head;
+      cache_queue_head = ci;
+
+      /* check if we need to adapt the tail */
+      if (cache_queue_tail == ci)
+        cache_queue_tail = prev;
+    }
+  }
+  else /* (side == TAIL) */
+  {
+    /* We don't move values back in the list.. */
+    if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
+      return (0);
+
+    assert (ci->next == NULL);
+
+    if (cache_queue_tail == NULL)
+      cache_queue_head = ci;
+    else
+      cache_queue_tail->next = ci;
+    cache_queue_tail = ci;
+  }
+
+  ci->flags |= CI_FLAGS_IN_QUEUE;
+
+  return (0);
+} /* }}} int enqueue_cache_item */
+
+/*
+ * tree_callback_flush:
+ * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
+ * while this is in progress.
+ */
+static gboolean tree_callback_flush (gpointer key /* {{{ */
+    __attribute__((unused)), gpointer value, gpointer data)
+{
+  cache_item_t *ci;
+  time_t now;
+
+  key = NULL; /* make compiler happy */
+
+  ci = (cache_item_t *) value;
+  now = *((time_t *) data);
+
+  if (((now - ci->last_flush_time) >= config_write_interval)
+      && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
+      && (ci->values_num > 0))
+    enqueue_cache_item (ci, TAIL);
+
+  return (TRUE);
+} /* }}} gboolean tree_callback_flush */
+
+static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
+{
+  struct timeval now;
+  struct timespec next_flush;
+
+  gettimeofday (&now, NULL);
+  next_flush.tv_sec = now.tv_sec + config_flush_interval;
+  next_flush.tv_nsec = 1000 * now.tv_usec;
+
+  pthread_mutex_lock (&cache_lock);
+  while ((do_shutdown == 0) || (cache_queue_head != NULL))
+  {
+    cache_item_t *ci;
+    char *file;
+    char **values;
+    int values_num;
+    int status;
+    int i;
+
+    /* First, check if it's time to do the cache flush. */
+    gettimeofday (&now, NULL);
+    if ((now.tv_sec > next_flush.tv_sec)
+        || ((now.tv_sec == next_flush.tv_sec)
+          && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
+    {
+      time_t time_now;
+
+      /* Pass the current time as user data so that we don't need to call
+       * `time' for each node. */
+      time_now = time (NULL);
+
+      g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &time_now);
+
+      /* Determine the time of the next cache flush. */
+      while (next_flush.tv_sec < now.tv_sec)
+        next_flush.tv_sec += config_flush_interval;
+    }
+
+    /* Now, check if there's something to store away. If not, wait until
+     * something comes in or it's time to do the cache flush. */
+    if (cache_queue_head == NULL)
+    {
+      status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
+      if ((status != 0) && (status != ETIMEDOUT))
+      {
+        RRDD_LOG (LOG_ERR, "queue_thread_main: "
+            "pthread_cond_timedwait returned %i.", status);
+      }
+    }
+
+    /* Check if a value has arrived. This may be NULL if we timed out or there
+     * was an interrupt such as a signal. */
+    if (cache_queue_head == NULL)
+      continue;
+
+    ci = cache_queue_head;
+
+    /* copy the relevant parts */
+    file = strdup (ci->file);
+    if (file == NULL)
+    {
+      RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
+      continue;
+    }
+
+    values = ci->values;
+    values_num = ci->values_num;
+
+    ci->values = NULL;
+    ci->values_num = 0;
+
+    ci->last_flush_time = time (NULL);
+    ci->flags &= ~(CI_FLAGS_IN_QUEUE);
+
+    cache_queue_head = ci->next;
+    if (cache_queue_head == NULL)
+      cache_queue_tail = NULL;
+    ci->next = NULL;
+
+    pthread_mutex_unlock (&cache_lock);
+
+    RRDD_LOG (LOG_DEBUG, "queue_thread_main: rrd_update (%s, %i, %p)",
+        file, values_num, (void *) values);
+
+    status = rrd_update_r (file, NULL, values_num, (void *) values);
+    if (status != 0)
+    {
+      RRDD_LOG (LOG_ERR, "queue_thread_main: "
+          "rrd_update_r failed with status %i.",
+          status);
+    }
+
+    free (file);
+    for (i = 0; i < values_num; i++)
+      free (values[i]);
+
+    pthread_mutex_lock (&cache_lock);
+    pthread_cond_broadcast (&flush_cond);
+  } /* while (do_shutdown == 0) */
+  pthread_mutex_unlock (&cache_lock);
+
+  RRDD_LOG (LOG_DEBUG, "queue_thread_main: Exiting.");
+
+  return (NULL);
+} /* }}} void *queue_thread_main */
+
+static int buffer_get_field (char **buffer_ret, /* {{{ */
+    size_t *buffer_size_ret, char **field_ret)
+{
+  char *buffer;
+  size_t buffer_pos;
+  size_t buffer_size;
+  char *field;
+  size_t field_size;
+  int status;
+
+  buffer = *buffer_ret;
+  buffer_pos = 0;
+  buffer_size = *buffer_size_ret;
+  field = *buffer_ret;
+  field_size = 0;
+
+  /* This is ensured by `handle_request'. */
+  assert (buffer[buffer_size - 1] == ' ');
+
+  status = -1;
+  while (buffer_pos < buffer_size)
+  {
+    /* Check for end-of-field or end-of-buffer */
+    if (buffer[buffer_pos] == ' ')
+    {
+      field[field_size] = 0;
+      field_size++;
+      buffer_pos++;
+      status = 0;
+      break;
+    }
+    /* Handle escaped characters. */
+    else if (buffer[buffer_pos] == '\\')
+    {
+      if (buffer_pos >= (buffer_size - 1))
+        break;
+      buffer_pos++;
+      field[field_size] = buffer[buffer_pos];
+      field_size++;
+      buffer_pos++;
+    }
+    /* Normal operation */ 
+    else
+    {
+      field[field_size] = buffer[buffer_pos];
+      field_size++;
+      buffer_pos++;
+    }
+  } /* while (buffer_pos < buffer_size) */
+
+  if (status != 0)
+    return (status);
+
+  *buffer_ret = buffer + buffer_pos;
+  *buffer_size_ret = buffer_size - buffer_pos;
+  *field_ret = field;
+
+  return (0);
+} /* }}} int buffer_get_field */
+
+static int flush_file (const char *filename) /* {{{ */
+{
+  cache_item_t *ci;
+
+  pthread_mutex_lock (&cache_lock);
+
+  ci = g_tree_lookup (cache_tree, filename);
+  if (ci == NULL)
+  {
+    pthread_mutex_unlock (&cache_lock);
+    return (ENOENT);
+  }
+
+  /* Enqueue at head */
+  enqueue_cache_item (ci, HEAD);
+  pthread_cond_signal (&cache_cond);
+
+  while ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
+  {
+    ci = NULL;
+
+    pthread_cond_wait (&flush_cond, &cache_lock);
+
+    ci = g_tree_lookup (cache_tree, filename);
+    if (ci == NULL)
+    {
+      RRDD_LOG (LOG_ERR, "flush_file: Tree node went away "
+          "while waiting for flush.");
+      pthread_mutex_unlock (&cache_lock);
+      return (-1);
+    }
+  }
+
+  pthread_mutex_unlock (&cache_lock);
+  return (0);
+} /* }}} int flush_file */
+
+static int handle_request_flush (int fd, /* {{{ */
+    char *buffer, size_t buffer_size)
+{
+  char *file;
+  int status;
+  char result[4096];
+
+  status = buffer_get_field (&buffer, &buffer_size, &file);
+  if (status != 0)
+  {
+    RRDD_LOG (LOG_INFO, "handle_request_flush: Cannot get file name.");
+    return (-1);
+  }
+
+  status = flush_file (file);
+  if (status == 0)
+    snprintf (result, sizeof (result), "0 Successfully flushed %s.\n", file);
+  else if (status == ENOENT)
+    snprintf (result, sizeof (result), "-1 No such file: %s.\n", file);
+  else if (status < 0)
+    strncpy (result, "-1 Internal error.\n", sizeof (result));
+  else
+    snprintf (result, sizeof (result), "-1 Failed with status %i.\n", status);
+  result[sizeof (result) - 1] = 0;
+
+  status = write (fd, result, strlen (result));
+  if (status < 0)
+  {
+    status = errno;
+    RRDD_LOG (LOG_INFO, "handle_request_flush: write(2) returned an error.");
+    return (status);
+  }
+
+  return (0);
+} /* }}} int handle_request_flush */
+
+static int handle_request_update (int fd, /* {{{ */
+    char *buffer, size_t buffer_size)
+{
+  char *file;
+  int values_num = 0;
+  int status;
+
+  time_t now;
+
+  cache_item_t *ci;
+  char answer[4096];
+
+  now = time (NULL);
+
+  status = buffer_get_field (&buffer, &buffer_size, &file);
+  if (status != 0)
+  {
+    RRDD_LOG (LOG_INFO, "handle_request_update: Cannot get file name.");
+    return (-1);
+  }
+
+  pthread_mutex_lock (&cache_lock);
+
+  ci = g_tree_lookup (cache_tree, file);
+  if (ci == NULL) /* {{{ */
+  {
+    ci = (cache_item_t *) malloc (sizeof (cache_item_t));
+    if (ci == NULL)
+    {
+      pthread_mutex_unlock (&cache_lock);
+      RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
+      return (-1);
+    }
+    memset (ci, 0, sizeof (cache_item_t));
+
+    ci->file = strdup (file);
+    if (ci->file == NULL)
+    {
+      pthread_mutex_unlock (&cache_lock);
+      RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
+      free (ci);
+      return (-1);
+    }
+
+    ci->values = NULL;
+    ci->values_num = 0;
+    ci->last_flush_time = now;
+    ci->flags = CI_FLAGS_IN_TREE;
+
+    g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
+
+    RRDD_LOG (LOG_DEBUG, "handle_request_update: Created new tree node %s.",
+        ci->file);
+  } /* }}} */
+  assert (ci != NULL);
+
+  while (buffer_size > 0)
+  {
+    char **temp;
+    char *value;
+
+    status = buffer_get_field (&buffer, &buffer_size, &value);
+    if (status != 0)
+    {
+      RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
+      break;
+    }
+
+    temp = (char **) realloc (ci->values,
+        sizeof (char *) * (ci->values_num + 1));
+    if (temp == NULL)
+    {
+      RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
+      continue;
+    }
+    ci->values = temp;
+
+    ci->values[ci->values_num] = strdup (value);
+    if (ci->values[ci->values_num] == NULL)
+    {
+      RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
+      continue;
+    }
+    ci->values_num++;
+
+    values_num++;
+  }
+
+  if (((now - ci->last_flush_time) >= config_write_interval)
+      && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
+      && (ci->values_num > 0))
+  {
+    enqueue_cache_item (ci, TAIL);
+    pthread_cond_signal (&cache_cond);
+  }
+
+  pthread_mutex_unlock (&cache_lock);
+
+  snprintf (answer, sizeof (answer), "0 Enqueued %i value(s)\n", values_num);
+  answer[sizeof (answer) - 1] = 0;
+
+  status = write (fd, answer, strlen (answer));
+  if (status < 0)
+  {
+    status = errno;
+    RRDD_LOG (LOG_INFO, "handle_request_update: write(2) returned an error.");
+    return (status);
+  }
+
+  return (0);
+} /* }}} int handle_request_update */
+
+static int handle_request (int fd) /* {{{ */
+{
+  char buffer[4096];
+  size_t buffer_size;
+  char *buffer_ptr;
+  char *command;
+  int status;
+
+  status = read (fd, buffer, sizeof (buffer));
+  if (status == 0)
+  {
+    return (1);
+  }
+  else if (status < 0)
+  {
+    RRDD_LOG (LOG_ERR, "handle_request: read(2) failed.");
+    return (-1);
+  }
+  buffer_size = status;
+  assert (((size_t) buffer_size) <= sizeof (buffer));
+
+  if (buffer[buffer_size - 1] != '\n')
+  {
+    RRDD_LOG (LOG_INFO, "handle_request: malformed request.");
+    return (-1);
+  }
+
+  /* Accept Windows style line endings, too */
+  if ((buffer_size > 2) && (buffer[buffer_size - 2] == '\r'))
+  {
+    buffer_size--;
+    buffer[buffer_size - 1] = '\n';
+  }
+
+  /* Place the normal field separator at the end to simplify
+   * `buffer_get_field's work. */
+  buffer[buffer_size - 1] = ' ';
+
+  buffer_ptr = buffer;
+  command = NULL;
+  status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
+  if (status != 0)
+  {
+    RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
+    return (-1);
+  }
+
+  if (strcmp (command, "update") == 0)
+  {
+    return (handle_request_update (fd, buffer_ptr, buffer_size));
+  }
+  else if (strcmp (command, "flush") == 0)
+  {
+    return (handle_request_flush (fd, buffer_ptr, buffer_size));
+  }
+  else
+  {
+    RRDD_LOG (LOG_INFO, "handle_request: unknown command: %s.", buffer);
+    return (-1);
+  }
+} /* }}} int handle_request */
+
+static void *connection_thread_main (void *args /* {{{ */
+    __attribute__((unused)))
+{
+  pthread_t self;
+  int i;
+  int fd;
+  
+  fd = *((int *) args);
+
+  pthread_mutex_lock (&connetion_threads_lock);
+  {
+    pthread_t *temp;
+
+    temp = (pthread_t *) realloc (connetion_threads,
+        sizeof (pthread_t) * (connetion_threads_num + 1));
+    if (temp == NULL)
+    {
+      RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
+    }
+    else
+    {
+      connetion_threads = temp;
+      connetion_threads[connetion_threads_num] = pthread_self ();
+      connetion_threads_num++;
+    }
+  }
+  pthread_mutex_unlock (&connetion_threads_lock);
+
+  while (do_shutdown == 0)
+  {
+    struct pollfd pollfd;
+    int status;
+
+    pollfd.fd = fd;
+    pollfd.events = POLLIN | POLLPRI;
+    pollfd.revents = 0;
+
+    status = poll (&pollfd, 1, /* timeout = */ 500);
+    if (status == 0) /* timeout */
+      continue;
+    else if (status < 0) /* error */
+    {
+      status = errno;
+      if (status == EINTR)
+        continue;
+      RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
+      continue;
+    }
+
+    if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
+    {
+      close (fd);
+      break;
+    }
+    else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
+    {
+      RRDD_LOG (LOG_WARNING, "connection_thread_main: "
+          "poll(2) returned something unexpected: %#04hx",
+          pollfd.revents);
+      close (fd);
+      break;
+    }
+
+    status = handle_request (fd);
+    if (status != 0)
+    {
+      close (fd);
+      break;
+    }
+  }
+
+  self = pthread_self ();
+  /* Remove this thread from the connection threads list */
+  pthread_mutex_lock (&connetion_threads_lock);
+  /* Find out own index in the array */
+  for (i = 0; i < connetion_threads_num; i++)
+    if (pthread_equal (connetion_threads[i], self) != 0)
+      break;
+  assert (i < connetion_threads_num);
+
+  /* Move the trailing threads forward. */
+  if (i < (connetion_threads_num - 1))
+  {
+    memmove (connetion_threads + i,
+        connetion_threads + i + 1,
+        sizeof (pthread_t) * (connetion_threads_num - i - 1));
+  }
+
+  connetion_threads_num--;
+  pthread_mutex_unlock (&connetion_threads_lock);
+
+  free (args);
+  return (NULL);
+} /* }}} void *connection_thread_main */
+
+static int open_listen_socket_unix (const char *path) /* {{{ */
+{
+  int fd;
+  struct sockaddr_un sa;
+  listen_socket_t *temp;
+  int status;
+
+  temp = (listen_socket_t *) realloc (listen_fds,
+      sizeof (listen_fds[0]) * (listen_fds_num + 1));
+  if (temp == NULL)
+  {
+    RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
+    return (-1);
+  }
+  listen_fds = temp;
+  memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
+
+  fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
+  if (fd < 0)
+  {
+    RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
+    return (-1);
+  }
+
+  memset (&sa, 0, sizeof (sa));
+  sa.sun_family = AF_UNIX;
+  strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
+
+  status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
+  if (status != 0)
+  {
+    RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
+    close (fd);
+    unlink (path);
+    return (-1);
+  }
+
+  status = listen (fd, /* backlog = */ 10);
+  if (status != 0)
+  {
+    RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
+    close (fd);
+    unlink (path);
+    return (-1);
+  }
+  
+  listen_fds[listen_fds_num].fd = fd;
+  snprintf (listen_fds[listen_fds_num].path,
+      sizeof (listen_fds[listen_fds_num].path) - 1,
+      "unix:%s", path);
+  listen_fds_num++;
+
+  return (0);
+} /* }}} int open_listen_socket_unix */
+
+static int open_listen_socket (const char *addr) /* {{{ */
+{
+  struct addrinfo ai_hints;
+  struct addrinfo *ai_res;
+  struct addrinfo *ai_ptr;
+  int status;
+
+  assert (addr != NULL);
+
+  if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
+    return (open_listen_socket_unix (addr + strlen ("unix:")));
+  else if (addr[0] == '/')
+    return (open_listen_socket_unix (addr));
+
+  memset (&ai_hints, 0, sizeof (ai_hints));
+  ai_hints.ai_flags = 0;
+#ifdef AI_ADDRCONFIG
+  ai_hints.ai_flags |= AI_ADDRCONFIG;
+#endif
+  ai_hints.ai_family = AF_UNSPEC;
+  ai_hints.ai_socktype = SOCK_STREAM;
+
+  ai_res = NULL;
+  status = getaddrinfo (addr, RRDCACHED_DEFAULT_PORT, &ai_hints, &ai_res);
+  if (status != 0)
+  {
+    RRDD_LOG (LOG_ERR, "open_listen_socket: getaddrinfo(%s) failed: "
+        "%s", addr, gai_strerror (status));
+    return (-1);
+  }
+
+  for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
+  {
+    int fd;
+    listen_socket_t *temp;
+
+    temp = (listen_socket_t *) realloc (listen_fds,
+        sizeof (listen_fds[0]) * (listen_fds_num + 1));
+    if (temp == NULL)
+    {
+      RRDD_LOG (LOG_ERR, "open_listen_socket: realloc failed.");
+      continue;
+    }
+    listen_fds = temp;
+    memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
+
+    fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
+    if (fd < 0)
+    {
+      RRDD_LOG (LOG_ERR, "open_listen_socket: socket(2) failed.");
+      continue;
+    }
+
+    status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
+    if (status != 0)
+    {
+      RRDD_LOG (LOG_ERR, "open_listen_socket: bind(2) failed.");
+      close (fd);
+      continue;
+    }
+
+    status = listen (fd, /* backlog = */ 10);
+    if (status != 0)
+    {
+      RRDD_LOG (LOG_ERR, "open_listen_socket: listen(2) failed.");
+      close (fd);
+      return (-1);
+    }
+
+    listen_fds[listen_fds_num].fd = fd;
+    strncpy (listen_fds[listen_fds_num].path, addr,
+        sizeof (listen_fds[listen_fds_num].path) - 1);
+    listen_fds_num++;
+  } /* for (ai_ptr) */
+
+  return (0);
+} /* }}} int open_listen_socket */
+
+static int close_listen_sockets (void) /* {{{ */
+{
+  size_t i;
+
+  for (i = 0; i < listen_fds_num; i++)
+  {
+    close (listen_fds[i].fd);
+    if (strncmp ("unix:", listen_fds[i].path, strlen ("unix:")) == 0)
+      unlink (listen_fds[i].path + strlen ("unix:"));
+  }
+
+  free (listen_fds);
+  listen_fds = NULL;
+  listen_fds_num = 0;
+
+  return (0);
+} /* }}} int close_listen_sockets */
+
+static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
+{
+  struct pollfd *pollfds;
+  int pollfds_num;
+  int status;
+  int i;
+
+  for (i = 0; i < config_listen_address_list_len; i++)
+  {
+    RRDD_LOG (LOG_DEBUG, "listen_thread_main: config_listen_address_list[%i] "
+        "= %s", i, config_listen_address_list[i]);
+    open_listen_socket (config_listen_address_list[i]);
+  }
+
+  if (config_listen_address_list_len < 1)
+    open_listen_socket (RRDCACHED_DEFAULT_ADDRESS);
+
+  if (listen_fds_num < 1)
+  {
+    RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
+        "could be opened. Sorry.");
+    return (NULL);
+  }
+
+  pollfds_num = listen_fds_num;
+  pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
+  if (pollfds == NULL)
+  {
+    RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
+    return (NULL);
+  }
+  memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
+
+  while (do_shutdown == 0)
+  {
+    assert (pollfds_num == ((int) listen_fds_num));
+    for (i = 0; i < pollfds_num; i++)
+    {
+      pollfds[i].fd = listen_fds[i].fd;
+      pollfds[i].events = POLLIN | POLLPRI;
+      pollfds[i].revents = 0;
+    }
+
+    status = poll (pollfds, pollfds_num, /* timeout = */ -1);
+    if (status < 1)
+    {
+      status = errno;
+      if (status != EINTR)
+      {
+        RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
+      }
+      continue;
+    }
+
+    for (i = 0; i < pollfds_num; i++)
+    {
+      int *client_sd;
+      struct sockaddr_storage client_sa;
+      socklen_t client_sa_size;
+      pthread_t tid;
+
+      if (pollfds[i].revents == 0)
+        continue;
+
+      if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
+      {
+        RRDD_LOG (LOG_ERR, "listen_thread_main: "
+            "poll(2) returned something unexpected for listen FD #%i.",
+            pollfds[i].fd);
+        continue;
+      }
+
+      client_sd = (int *) malloc (sizeof (int));
+      if (client_sd == NULL)
+      {
+        RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
+        continue;
+      }
+
+      client_sa_size = sizeof (client_sa);
+      *client_sd = accept (pollfds[i].fd,
+          (struct sockaddr *) &client_sa, &client_sa_size);
+      if (*client_sd < 0)
+      {
+        RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
+        continue;
+      }
+
+      status = pthread_create (&tid, /* attr = */ NULL, connection_thread_main,
+          /* args = */ (void *) client_sd);
+      if (status != 0)
+      {
+        RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
+        close (*client_sd);
+        free (client_sd);
+        continue;
+      }
+    } /* for (pollfds_num) */
+  } /* while (do_shutdown == 0) */
+
+  close_listen_sockets ();
+
+  pthread_mutex_lock (&connetion_threads_lock);
+  while (connetion_threads_num > 0)
+  {
+    pthread_t wait_for;
+
+    wait_for = connetion_threads[0];
+
+    pthread_mutex_unlock (&connetion_threads_lock);
+    pthread_join (wait_for, /* retval = */ NULL);
+    pthread_mutex_lock (&connetion_threads_lock);
+  }
+  pthread_mutex_unlock (&connetion_threads_lock);
+
+  RRDD_LOG (LOG_DEBUG, "listen_thread_main: Exiting.");
+
+  return (NULL);
+} /* }}} void *listen_thread_main */
+
+static int daemonize (void) /* {{{ */
+{
+  pid_t child;
+  int status;
+  char *base_dir;
+
+  /* These structures are static, because `sigaction' behaves weird if the are
+   * overwritten.. */
+  static struct sigaction sa_int;
+  static struct sigaction sa_term;
+  static struct sigaction sa_pipe;
+
+  child = fork ();
+  if (child < 0)
+  {
+    fprintf (stderr, "daemonize: fork(2) failed.\n");
+    return (-1);
+  }
+  else if (child > 0)
+  {
+    return (1);
+  }
+
+  /* Change into the /tmp directory. */
+  base_dir = (config_base_dir != NULL)
+    ? config_base_dir
+    : "/tmp";
+  status = chdir (base_dir);
+  if (status != 0)
+  {
+    fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
+    return (-1);
+  }
+
+  /* Become session leader */
+  setsid ();
+
+  /* Open the first three file descriptors to /dev/null */
+  close (2);
+  close (1);
+  close (0);
+
+  open ("/dev/null", O_RDWR);
+  dup (0);
+  dup (0);
+
+  /* Install signal handlers */
+  memset (&sa_int, 0, sizeof (sa_int));
+  sa_int.sa_handler = sig_int_handler;
+  sigaction (SIGINT, &sa_int, NULL);
+
+  memset (&sa_term, 0, sizeof (sa_term));
+  sa_term.sa_handler = sig_term_handler;
+  sigaction (SIGINT, &sa_term, NULL);
+
+  memset (&sa_pipe, 0, sizeof (sa_pipe));
+  sa_pipe.sa_handler = SIG_IGN;
+  sigaction (SIGPIPE, &sa_pipe, NULL);
+
+  openlog ("rrdcached", LOG_PID, LOG_DAEMON);
+
+  cache_tree = g_tree_new ((GCompareFunc) strcmp);
+  if (cache_tree == NULL)
+  {
+    RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
+    return (-1);
+  }
+
+  memset (&queue_thread, 0, sizeof (queue_thread));
+  status = pthread_create (&queue_thread, /* attr = */ NULL,
+      queue_thread_main, /* args = */ NULL);
+  if (status != 0)
+  {
+    RRDD_LOG (LOG_ERR, "daemonize: pthread_create failed.");
+    return (-1);
+  }
+
+  write_pidfile ();
+
+  return (0);
+} /* }}} int daemonize */
+
+static int cleanup (void) /* {{{ */
+{
+  RRDD_LOG (LOG_DEBUG, "cleanup ()");
+
+  do_shutdown++;
+
+  RRDD_LOG (LOG_DEBUG, "cleanup: Joining queue_thread..");
+  pthread_cond_signal (&cache_cond);
+  pthread_join (queue_thread, /* return = */ NULL);
+  RRDD_LOG (LOG_DEBUG, "cleanup: done");
+
+  remove_pidfile ();
+
+  closelog ();
+
+  return (0);
+} /* }}} int cleanup */
+
+static int read_options (int argc, char **argv) /* {{{ */
+{
+  int option;
+  int status = 0;
+
+  while ((option = getopt(argc, argv, "l:f:w:b:p:h?")) != -1)
+  {
+    switch (option)
+    {
+      case 'l':
+      {
+        char **temp;
+
+        temp = (char **) realloc (config_listen_address_list,
+            sizeof (char *) * (config_listen_address_list_len + 1));
+        if (temp == NULL)
+        {
+          fprintf (stderr, "read_options: realloc failed.\n");
+          return (2);
+        }
+        config_listen_address_list = temp;
+
+        temp[config_listen_address_list_len] = strdup (optarg);
+        if (temp[config_listen_address_list_len] == NULL)
+        {
+          fprintf (stderr, "read_options: strdup failed.\n");
+          return (2);
+        }
+        config_listen_address_list_len++;
+      }
+      break;
+
+      case 'f':
+      {
+        int temp;
+
+        temp = atoi (optarg);
+        if (temp > 0)
+          config_flush_interval = temp;
+        else
+        {
+          fprintf (stderr, "Invalid flush interval: %s\n", optarg);
+          status = 3;
+        }
+      }
+      break;
+
+      case 'w':
+      {
+        int temp;
+
+        temp = atoi (optarg);
+        if (temp > 0)
+          config_write_interval = temp;
+        else
+        {
+          fprintf (stderr, "Invalid write interval: %s\n", optarg);
+          status = 2;
+        }
+      }
+      break;
+
+      case 'b':
+      {
+        size_t len;
+
+        if (config_base_dir != NULL)
+          free (config_base_dir);
+        config_base_dir = strdup (optarg);
+        if (config_base_dir == NULL)
+        {
+          fprintf (stderr, "read_options: strdup failed.\n");
+          return (3);
+        }
+
+        len = strlen (config_base_dir);
+        while ((len > 0) && (config_base_dir[len - 1] == '/'))
+        {
+          config_base_dir[len - 1] = 0;
+          len--;
+        }
+
+        if (len < 1)
+        {
+          fprintf (stderr, "Invalid base directory: %s\n", optarg);
+          return (4);
+        }
+      }
+      break;
+
+      case 'p':
+      {
+        if (config_pid_file != NULL)
+          free (config_pid_file);
+        config_pid_file = strdup (optarg);
+        if (config_pid_file == NULL)
+        {
+          fprintf (stderr, "read_options: strdup failed.\n");
+          return (3);
+        }
+      }
+      break;
+
+      case 'h':
+      case '?':
+        printf ("RRDd %s  Copyright (C) 2008 Florian octo Forster\n"
+            "\n"
+            "Usage: rrdcached [options]\n"
+            "\n"
+            "Valid options are:\n"
+            "  -l <address>  Socket address to listen to.\n"
+            "  -w <seconds>  Interval in which to write data.\n"
+            "  -f <seconds>  Interval in which to flush dead data.\n"
+            "  -p <file>     Location of the PID-file.\n"
+            "  -b <dir>      Base directory to change to.\n"
+            "\n"
+            "For more information and a detailed description of all options "
+            "please refer\n"
+            "to the rrdcached(1) manual page.\n",
+            VERSION);
+        status = -1;
+        break;
+    } /* switch (option) */
+  } /* while (getopt) */
+
+  return (status);
+} /* }}} int read_options */
+
+int main (int argc, char **argv)
+{
+  int status;
+
+  status = read_options (argc, argv);
+  if (status != 0)
+  {
+    if (status < 0)
+      status = 0;
+    return (status);
+  }
+
+  status = daemonize ();
+  if (status == 1)
+  {
+    struct sigaction sigchld;
+
+    memset (&sigchld, 0, sizeof (sigchld));
+    sigchld.sa_handler = SIG_IGN;
+    sigaction (SIGCHLD, &sigchld, NULL);
+
+    return (0);
+  }
+  else if (status != 0)
+  {
+    fprintf (stderr, "daemonize failed, exiting.\n");
+    return (1);
+  }
+
+  listen_thread_main (NULL);
+
+  cleanup ();
+
+  return (0);
+} /* int main */
+
+/*
+ * vim: set sw=2 sts=2 ts=8 et fdm=marker :
+ */
index 4ea2eb1..26c3324 100644 (file)
@@ -53,6 +53,7 @@
  *****************************************************************************/
 
 #include "rrd_tool.h"
+#include "rrd_client.h"
 
 #include "rrd_is_thread_safe.h"
 /*#define DEBUG*/
@@ -72,6 +73,7 @@ int rrd_fetch(
     long      step_tmp = 1;
     time_t    start_tmp = 0, end_tmp = 0;
     const char *cf;
+    char *daemon = NULL;
 
     rrd_time_value_t start_tv, end_tv;
     char     *parsetime_error = NULL;
@@ -79,6 +81,7 @@ int rrd_fetch(
         {"resolution", required_argument, 0, 'r'},
         {"start", required_argument, 0, 's'},
         {"end", required_argument, 0, 'e'},
+        {"daemon", required_argument, 0, 'd'},
         {0, 0, 0, 0}
     };
 
@@ -93,7 +96,7 @@ int rrd_fetch(
         int       option_index = 0;
         int       opt;
 
-        opt = getopt_long(argc, argv, "r:s:e:", long_options, &option_index);
+        opt = getopt_long(argc, argv, "r:s:e:d:", long_options, &option_index);
 
         if (opt == EOF)
             break;
@@ -114,6 +117,18 @@ int rrd_fetch(
         case 'r':
             step_tmp = atol(optarg);
             break;
+
+        case 'd':
+           if (daemon != NULL)
+                   free (daemon);
+            daemon = strdup (optarg);
+            if (daemon == NULL)
+            {
+                rrd_set_error ("strdup failed.");
+                return (-1);
+            }
+            break;
+
         case '?':
             rrd_set_error("unknown option '-%c'", optopt);
             return (-1);
@@ -151,10 +166,26 @@ int rrd_fetch(
         return -1;
     }
 
+    if (daemon == NULL)
+    {
+        char *temp;
+
+        temp = getenv (ENV_RRDCACHED_ADDRESS);
+        if (temp != NULL)
+        {
+            daemon = strdup (temp);
+            if (daemon == NULL)
+            {
+                rrd_set_error("strdup failed.");
+               return (-1);
+            }
+        }
+    }
+
     cf = argv[optind + 1];
 
-    if (rrd_fetch_r(argv[optind], cf, start, end, step, ds_cnt, ds_namv, data)
-        != 0)
+    if (rrd_fetch_r(argv[optind], cf, start, end, step, daemon, ds_cnt,
+                           ds_namv, data) != 0)
         return (-1);
     return (0);
 }
@@ -167,19 +198,36 @@ int rrd_fetch_r(
                          * will be changed to represent reality */
     unsigned long *step,    /* which stepsize do you want? 
                              * will be changed to represent reality */
+    const char *daemon,
     unsigned long *ds_cnt,  /* number of data sources in file */
     char ***ds_namv,    /* names of data_sources */
     rrd_value_t **data)
 {                       /* two dimensional array containing the data */
     enum cf_en cf_idx;
+    int status;
 
     if ((int) (cf_idx = cf_conv(cf)) == -1) {
         return -1;
     }
 
-    return (rrd_fetch_fn
-            (filename, cf_idx, start, end, step, ds_cnt, ds_namv, data));
-}
+    if (daemon != NULL)
+    {
+        status = rrdc_connect (daemon);
+        if (status != 0)
+        {
+            rrd_set_error ("rrdc_connect failed with status %i.", status);
+            return (-1);
+        }
+    }
+
+    status = rrd_fetch_fn (filename, cf_idx, start, end, step,
+            (daemon == NULL) ? 0 : 1,
+            ds_cnt, ds_namv, data);
+
+    rrdc_disconnect ();
+
+    return (status);
+} /* int rrd_fetch_r */
 
 int rrd_fetch_fn(
     const char *filename,   /* name of the rrd */
@@ -189,6 +237,7 @@ int rrd_fetch_fn(
                          * will be changed to represent reality */
     unsigned long *step,    /* which stepsize do you want? 
                              * will be changed to represent reality */
+    int use_rrdcached,
     unsigned long *ds_cnt,  /* number of data sources in file */
     char ***ds_namv,    /* names of data_sources */
     rrd_value_t **data)
@@ -208,6 +257,18 @@ int rrd_fetch_fn(
     rrd_value_t *data_ptr;
     unsigned long rows;
 
+    if (use_rrdcached)
+    {
+        int status;
+
+        status = rrdc_flush (filename);
+        if (status != 0)
+        {
+            rrd_set_error ("rrdc_flush failed with status %i.", status);
+            return (-1);
+        }
+    }
+
 #ifdef DEBUG
     fprintf(stderr, "Entered rrd_fetch_fn() searching for the best match\n");
     fprintf(stderr, "Looking for: start %10lu end %10lu step %5lu\n",
index e671f78..dbca157 100644 (file)
@@ -26,6 +26,7 @@
 #endif
 
 #include "rrd_graph.h"
+#include "rrd_client.h"
 
 /* some constant definitions */
 
@@ -302,6 +303,13 @@ int im_free(
 
     if (im == NULL)
         return 0;
+
+    if (im->use_rrdcached)
+    {
+        rrdc_disconnect ();
+        im->use_rrdcached = 0;
+    }
+
     for (i = 0; i < (unsigned) im->gdes_c; i++) {
         if (im->gdes[i].data_first) {
             /* careful here, because a single pointer can occur several times */
@@ -829,6 +837,7 @@ int data_fetch(
                               &im->gdes[i].start,
                               &im->gdes[i].end,
                               &ft_step,
+                              im->use_rrdcached ? 1 : 0,
                               &im->gdes[i].ds_cnt,
                               &im->gdes[i].ds_namv,
                               &im->gdes[i].data)) == -1) {
@@ -3720,6 +3729,7 @@ void rrd_graph_init(
     im->grinfo_current = (rrd_info_t *) NULL;
     im->imgformat = IF_PNG;
     im->imginfo = NULL;
+    im->use_rrdcached = 0;
     im->lazy = 0;
     im->logarithmic = 0;
     im->maxval = DNAN;
@@ -3838,6 +3848,7 @@ void rrd_graph_options(
         { "watermark",          required_argument, 0, 'W'},
         { "alt-y-mrtg",         no_argument,       0, 1000},    /* this has no effect it is just here to save old apps from crashing when they use it */
         { "pango-markup",       no_argument,       0, 'P'},
+        { "daemon",             required_argument, 0, 'd'},
         {  0, 0, 0, 0}
 };
 /* *INDENT-ON* */
@@ -3852,7 +3863,7 @@ void rrd_graph_options(
         int       col_start, col_end;
 
         opt = getopt_long(argc, argv,
-                          "s:e:x:y:v:w:h:D:iu:l:rb:oc:n:m:t:f:a:I:zgjFYAMEX:L:S:T:NR:B:W:kP",
+                          "s:e:x:y:v:w:h:D:iu:l:rb:oc:n:m:t:f:a:I:zgjFYAMEX:L:S:T:NR:B:W:kPd:",
                           long_options, &option_index);
         if (opt == EOF)
             break;
@@ -4196,6 +4207,25 @@ void rrd_graph_options(
             strncpy(im->watermark, optarg, 100);
             im->watermark[99] = '\0';
             break;
+        case 'd':
+        {
+            int status;
+            if (im->use_rrdcached)
+            {
+                rrd_set_error ("You cannot specify --daemon "
+                        "more than once.");
+                return;
+            }
+            status = rrdc_connect (optarg);
+            if (status != 0)
+            {
+                rrd_set_error ("rrdc_connect(%s) failed with status %i.",
+                        optarg, status);
+                return;
+            }
+            im->use_rrdcached = 1;
+            break;
+        }
         case '?':
             if (optopt != 0)
                 rrd_set_error("unknown option '%c'", optopt);
@@ -4203,6 +4233,26 @@ void rrd_graph_options(
                 rrd_set_error("unknown option '%s'", argv[optind - 1]);
             return;
         }
+    } /* while (1) */
+
+    if (im->use_rrdcached == 0)
+    {
+        char *temp;
+
+        temp = getenv (ENV_RRDCACHED_ADDRESS);
+        if (temp != NULL)
+        {
+            int status;
+
+            status = rrdc_connect (temp);
+            if (status != 0)
+            {
+                rrd_set_error ("rrdc_connect(%s) failed with status %i.",
+                        temp, status);
+                return;
+            }
+            im->use_rrdcached = 1;
+        }
     }
 
     if (im->logarithmic && im->minval <= 0) {
index 2b1c05b..c21f356 100644 (file)
@@ -210,6 +210,7 @@ typedef struct image_desc_t {
     char     *imginfo;  /* construct an <IMG ... tag and return 
                            as first retval */
     enum gfx_if_en imgformat;   /* image format */
+    int       use_rrdcached;
     int       lazy;     /* only update the image if there is
                            reasonable probablility that the
                            existing one is out of date */
index 8efd492..42ebfd1 100644 (file)
@@ -95,7 +95,8 @@ void PrintUsage(
     const char *help_update =
         N_("* update - update an RRD\n\n"
            "\trrdtool update filename\n"
-           "\t\t--template|-t ds-name:ds-name:...\n"
+           "\t\t[--template|-t ds-name:ds-name:...]\n"
+          "\t\t[--daemon <address>]\n"
            "\t\ttime|N:value[:value...]\n\n"
            "\t\tat-time@value[:value...]\n\n"
            "\t\t[ time:value[:value...] ..]\n\n");
@@ -104,7 +105,7 @@ void PrintUsage(
         N_("* updatev - a verbose version of update\n"
            "\treturns information about values, RRAs, and datasources updated\n\n"
            "\trrdtool updatev filename\n"
-           "\t\t--template|-t ds-name:ds-name:...\n"
+           "\t\t[--template|-t ds-name:ds-name:...]\n"
            "\t\ttime|N:value[:value...]\n\n"
            "\t\tat-time@value[:value...]\n\n"
            "\t\t[ time:value[:value...] ..]\n\n");
@@ -113,7 +114,8 @@ void PrintUsage(
         N_("* fetch - fetch data out of an RRD\n\n"
            "\trrdtool fetch filename.rrd CF\n"
            "\t\t[-r|--resolution resolution]\n"
-           "\t\t[-s|--start start] [-e|--end end]\n\n");
+           "\t\t[-s|--start start] [-e|--end end]\n"
+          "\t\t[--daemon <address>]\n\n");
 
 /* break up very large strings (help_graph, help_tune) for ISO C89 compliance*/
 
@@ -132,7 +134,7 @@ void PrintUsage(
            "\t\t[-h|--height pixels] [-o|--logarithmic]\n"
            "\t\t[-u|--upper-limit value] [-z|--lazy]\n"
            "\t\t[-l|--lower-limit value] [-r|--rigid]\n"
-           "\t\t[-g|--no-legend]\n"
+           "\t\t[-g|--no-legend] [--daemon <address>]\n"
            "\t\t[-F|--force-rules-legend]\n" "\t\t[-j|--only-graph]\n");
     const char *help_graph2 =
         N_("\t\t[-n|--font FONTTAG:size:font]\n"
index 0be66e4..63359b6 100644 (file)
@@ -77,15 +77,15 @@ extern    "C" {
     int       rrd_create_fn(
     const char *file_name,
     rrd_t *rrd);
-    int       rrd_fetch_fn(
-    const char *filename,
-    enum cf_en cf_idx,
-    time_t *start,
-    time_t *end,
-    unsigned long *step,
-    unsigned long *ds_cnt,
-    char ***ds_namv,
-    rrd_value_t **data);
+    int rrd_fetch_fn (const char *filename,
+            enum cf_en cf_idx,
+            time_t *start,
+            time_t *end,
+            unsigned long *step,
+            int use_rrdcached,
+            unsigned long *ds_cnt,
+            char ***ds_namv,
+            rrd_value_t **data);
 
 #define RRD_READONLY    (1<<0)
 #define RRD_READWRITE   (1<<1)
index fe6aea4..5536c3c 100644 (file)
@@ -1,6 +1,7 @@
 
 /*****************************************************************************
  * RRDtool 1.3.0  Copyright by Tobi Oetiker, 1997-2008
+ *                Copyright by Florian Forster, 2008
  *****************************************************************************
  * rrd_update.c  RRD Update Function
  *****************************************************************************
@@ -23,6 +24,8 @@
 #include "rrd_is_thread_safe.h"
 #include "unused.h"
 
+#include "rrd_client.h"
+
 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
 /*
  * WIN32 does not have gettimeofday    and struct timeval. This is a quick and dirty
@@ -374,18 +377,20 @@ int rrd_update(
 {
     struct option long_options[] = {
         {"template", required_argument, 0, 't'},
+        {"daemon",   required_argument, 0, 'd'},
         {0, 0, 0, 0}
     };
     int       option_index = 0;
     int       opt;
     char     *tmplt = NULL;
     int       rc = -1;
+    char     *daemon = NULL;
 
     optind = 0;
     opterr = 0;         /* initialize getopt */
 
     while (1) {
-        opt = getopt_long(argc, argv, "t:", long_options, &option_index);
+        opt = getopt_long(argc, argv, "t:d:", long_options, &option_index);
 
         if (opt == EOF)
             break;
@@ -395,6 +400,17 @@ int rrd_update(
             tmplt = strdup(optarg);
             break;
 
+        case 'd':
+            if (daemon != NULL)
+                free (daemon);
+            daemon = strdup (optarg);
+            if (daemon == NULL)
+            {
+                rrd_set_error("strdup failed.");
+                goto out;
+            }
+            break;
+
         case '?':
             rrd_set_error("unknown option '%s'", argv[optind - 1]);
             goto out;
@@ -407,10 +423,71 @@ int rrd_update(
         goto out;
     }
 
+    if ((tmplt != NULL) && (daemon != NULL))
+    {
+        rrd_set_error("The caching daemon cannot be used together with "
+                "templates yet.");
+        goto out;
+    }
+
+    if ((tmplt == NULL) && (daemon == NULL))
+    {
+        char *temp;
+
+        temp = getenv (ENV_RRDCACHED_ADDRESS);
+        if (temp != NULL)
+        {
+            daemon = strdup (temp);
+            if (daemon == NULL)
+            {
+                rrd_set_error("strdup failed.");
+                goto out;
+            }
+        }
+    }
+
+    if (daemon != NULL)
+    {
+        int status;
+
+        status = rrdc_connect (daemon);
+        if (status != 0)
+        {
+            rrd_set_error("Unable to connect to daemon: %s",
+                    (status < 0)
+                    ? "Internal error"
+                    : rrd_strerror (status));
+            goto out;
+        }
+
+        status = rrdc_update (/* file = */ argv[optind],
+                /* values_num = */ argc - optind - 1,
+                /* values = */ (void *) (argv + optind + 1));
+        if (status != 0)
+        {
+            rrd_set_error("Failed sending the values to the daemon: %s",
+                    (status < 0)
+                    ? "Internal error"
+                    : rrd_strerror (status));
+        }
+
+        rrdc_disconnect ();
+        goto out;
+    } /* if (daemon != NULL) */
+
     rc = rrd_update_r(argv[optind], tmplt,
                       argc - optind - 1, (const char **) (argv + optind + 1));
   out:
-    free(tmplt);
+    if (tmplt != NULL)
+    {
+        free(tmplt);
+        tmplt = NULL;
+    }
+    if (daemon != NULL)
+    {
+        free (daemon);
+        daemon = NULL;
+    }
     return rc;
 }