Merge branch 'master' into ff/rrdd
authorFlorian Forster <octo@leeloo.home.verplant.org>
Sun, 14 Sep 2008 08:23:10 +0000 (10:23 +0200)
committerFlorian Forster <octo@leeloo.home.verplant.org>
Sun, 14 Sep 2008 08:23:10 +0000 (10:23 +0200)
32 files changed:
CONTRIBUTORS
configure.ac
doc/Makefile.am
doc/rrdcached.pod [new file with mode: 0644]
doc/rrddump.pod
doc/rrdfetch.pod
doc/rrdflush.pod [new file with mode: 0644]
doc/rrdgraph.pod
doc/rrdinfo.pod
doc/rrdlast.pod
doc/rrdlastupdate.pod
doc/rrdtool.pod
doc/rrdupdate.pod
doc/rrdxport.pod
src/Makefile.am
src/librrd.sym.in
src/rrd.h
src/rrd_client.c [new file with mode: 0644]
src/rrd_client.h [new file with mode: 0644]
src/rrd_daemon.c [new file with mode: 0644]
src/rrd_dump.c
src/rrd_fetch.c
src/rrd_flush.c [new file with mode: 0644]
src/rrd_graph.c
src/rrd_graph.h
src/rrd_info.c
src/rrd_last.c
src/rrd_lastupdate.c
src/rrd_tool.c
src/rrd_tool.h
src/rrd_update.c
src/rrd_xport.c

index 847c1d1..ebb76e3 100644 (file)
@@ -21,7 +21,7 @@ David Grimes <dgrimes with navisite.com> SQRT/SORT/REV/SHIFT/TREND
 David L. Barker <dave with ncomtech.com> xport function bug fixes
 Evan Miller <emiller with imvu.com> Multiplicative HW Enhancements
 Frank Strauss <strauss with escape.de> TCL bindings
-Florian octo Forster <rrdtool nospam.verplant.org> rrd_restore libxml2 rewrite deprecated function export
+Florian octo Forster <rrdtool nospam.verplant.org> rrd_restore libxml2 rewrite, deprecated function export, rrdcached
 Henrik Storner <henrik with hswn.dk> functions for min/max values of data in graph
 Hermann Hueni <hueni with glue.ch> (SunOS porting)
 Jakob Ilves <jilves with se.oracle.com> HPUX 11
@@ -32,6 +32,7 @@ Joel Becker <jlbec with raleigh.ibm.com> AIX
 Joey Miller <joeym with inficad.com>php3 and php4 bindings
 Jost.Krieger <Jost.Krieger with ruhr-uni-bochum.de>
 Kai Siering <kai.siering with mediaways.net>
+Kevin Brintnall <kbrint with rufus.net> bugfixes in and additions to rrdcached, including journaling support
 Larry Leszczynski <larryl with furph.com>
 Mark Plaksin <happy@usg.edu> rrd_graph_v
 Matt Chambers <matthew.chambers with vanderbilt.edu> --full-size-mode for rrdgraph
index cc2e998..451502c 100644 (file)
@@ -130,8 +130,9 @@ CONFIGURE_PART(Audit Compilation Environment)
 
 
 dnl Check for the compiler and static/shared library creation.
-AC_PROG_CC
 AC_PROG_CPP
+AC_PROG_CC
+AM_PROG_CC_C_O
 AC_PROG_LIBTOOL
 
 dnl Try to detect/use GNU features
@@ -466,7 +467,7 @@ EX_CHECK_ALL(cairo,      cairo_font_options_create,     cairo.h,
 EX_CHECK_ALL(cairo,      cairo_svg_surface_create,      cairo-svg.h,            cairo-svg,   1.4.6,  http://cairographics.org/releases/, "")
 EX_CHECK_ALL(cairo,      cairo_pdf_surface_create,      cairo-pdf.h,            cairo-pdf,   1.4.6,  http://cairographics.org/releases/, "")
 EX_CHECK_ALL(cairo,      cairo_ps_surface_create,       cairo-ps.h,             cairo-ps,    1.4.6,  http://cairographics.org/releases/, "")
-dnl EX_CHECK_ALL(glib-2.0,   glib_check_version,        glib.h,                 glib-2.0,    2.12.12, ftp://ftp.gtk.org/pub/glib/2.12/, "")
+EX_CHECK_ALL(glib-2.0,   glib_check_version,            glib.h,                 glib-2.0,    2.12.12, ftp://ftp.gtk.org/pub/glib/2.12/, "")
 EX_CHECK_ALL(pango-1.0,  pango_cairo_context_set_font_options,  pango/pango.h,  pangocairo,  1.17,    http://ftp.gnome.org/pub/GNOME/sources/pango/1.17, "")
 EX_CHECK_ALL(xml2,       xmlParseFile,                  libxml/parser.h,        libxml-2.0,        2.6.31,  http://xmlsoft.org/downloads.html, /usr/include/libxml2)
 
index 16fd617..8e52084 100644 (file)
@@ -10,8 +10,8 @@ CLEANFILES = *.1 *.html *.txt *-dircache RRD?.pod *.pdf *~ core *itemcache *.rej
 
 POD = bin_dec_hex.pod        rrddump.pod            rrdgraph_examples.pod  rrdrestore.pod         rrdupdate.pod  \
       cdeftutorial.pod       rrdfetch.pod           rrdgraph_graph.pod     rrdthreads.pod         rrdxport.pod   \
-      rpntutorial.pod        rrdfirst.pod           rrdgraph_rpn.pod       rrdtool.pod                           \
-      rrd-beginners.pod      rrdinfo.pod            rrdtune.pod            rrdbuild.pod                          \
+      rpntutorial.pod        rrdfirst.pod           rrdgraph_rpn.pod       rrdtool.pod            rrdcached.pod  \
+      rrd-beginners.pod      rrdinfo.pod            rrdtune.pod            rrdbuild.pod           rrdflush.pod   \
       rrdcgi.pod             rrdgraph.pod           rrdlast.pod            rrdlastupdate.pod                     \
       rrdcreate.pod          rrdgraph_data.pod      rrdresize.pod          rrdtutorial.pod                       
 
diff --git a/doc/rrdcached.pod b/doc/rrdcached.pod
new file mode 100644 (file)
index 0000000..e762659
--- /dev/null
@@ -0,0 +1,408 @@
+=pod
+
+=head1 NAME
+
+rrdcached - Data caching daemon for rrdtool
+
+=head1 SYNOPSIS
+
+B<rrdcached> [B<-l> I<address>] [B<-w> I<timeout>] [B<-z> I<delay>] [B<-f> I<timeout>] [B<-j> I<dir>]
+
+=head1 DESCRIPTION
+
+B<rrdcached> is a daemon that receives updates to existing RRD files,
+accumulates them and, if enough have been received or a defined time has
+passed, writes the updates to the RRD file. A I<flush> command may be used to
+force writing of values to disk, so that graphing facilities and similar can
+work with up-to-date data.
+
+The daemon was written with big setups in mind. Those setups usually run into
+IOE<nbsp>related problems sooner or later for reasons that are beyond the scope
+of this document. Check the wiki at the RRDTool homepage for details. Also
+check L<SECURITY CONSIDERATIONS> below before using this daemon! A detailed
+description of how the daemon operates can be found in the L<HOW IT WORKS>
+section below.
+
+=head1 OPTIONS
+
+=over 4
+
+=item B<-l> I<address>
+
+Tells the daemon to bind to I<address> and accept incoming connections on that
+socket. If I<address> begins with C<unix:>, everything following that prefix is
+interpreted as the path to a UNIX domain socket. Otherwise the address or node
+name are resolved using L<getaddrinfo(3)>.
+
+If the B<-l> option is not specified the default address,
+C<unix:/tmp/rrdcached.sock>, will be used.
+
+=item B<-w> I<timeout>
+
+Data is written to disk every I<timeout> seconds. If this option is not
+specified the default interval of 300E<nbsp>seconds will be used.
+
+=item B<-z> I<delay>
+
+If specified, rrdcached will delay writing of each RRD for a random number
+of seconds in the rangeE<nbsp>[0,I<delay>).  This will avoid too many
+writes being queued simultaneously.  This value should be no greater than
+the value specified in B<-w>.  By default, there is no delay.
+
+=item B<-f> I<timeout>
+
+Every I<timeout> seconds the entire cache is searched for old values which are
+written to disk. This only concerns files to which updates have stopped, so
+setting this to a high value, such as 3600E<nbsp>seconds, is acceptable in most
+cases. This timeout defaults to 3600E<nbsp>seconds.
+
+=item B<-p> I<file>
+
+Sets the name and location of the PID-file. If not specified, the default,
+C<I<$localststedir>/run/rrdcached.pid> will be used.
+
+=item B<-j> I<dir>
+
+Write updates to a journal in I<dir>.  In the event of a program or system
+crash, this will allow the daemon to write any updates that were pending
+at the time of the crash.
+
+On startup, the daemon will check for journal files in this directory.  If
+found, all updates therein will be read into memory before the daemon
+starts accepting new connections.
+
+The journal will be rotated with the same frequency as the flush timer
+given by B<-f>.  On clean shutdown, the journal files are removed.
+
+=item B<-b> I<dir>
+
+The daemon will change into a specific directory at startup. All files passed
+to the daemon, that are specified by a B<relative> path, will be interpreted
+to be relative to this directory. If not given the default, C</tmp>, will be
+used.
+
+  +------------------------+------------------------+
+  ! Command line           ! File updated           !
+  +------------------------+------------------------+
+  ! foo.rrd                ! /tmp/foo.rrd           !
+  ! foo/bar.rrd            ! /tmp/foo/bar.rrd       !
+  ! /var/lib/rrd/foo.rrd   ! /var/lib/rrd/foo.rrd   !
+  +------------------------+------------------------+
+  Paths given on the command  line and paths actually
+  updated by the daemon,  assuming the base directory
+  "/tmp".
+
+=back
+
+=head1 EFFECTED RRDTOOL COMMANDS
+
+The following commands may be made aware of the B<rrdcached> using the command
+line argument B<--daemon> or the environment variable B<RRDCACHED_ADDRESS>:
+
+=over 4
+
+=item B<dump>
+
+=item B<fetch>
+
+=item B<flush>
+
+=item B<graph>
+
+=item B<graphv>
+
+=item B<info>
+
+=item B<last>
+
+=item B<lastupdate>
+
+=item B<update>
+
+=item B<xport>
+
+=back
+
+The B<update> command can send values to the daemon instead of writing them to
+the disk itself. All other commands can send a B<FLUSH> command (see below) to
+the daemon before accessing the files, so they work with up-to-date data even
+if the cache timeout is large.
+
+=head1 HOW IT WORKS
+
+When receiving an update, B<rrdcached> does not write to disk but looks for an
+entry for that file in its internal tree. If not found, an entry is created
+including the current time (called "First" in the diagram below). This time is
+B<not> the time specified on the command line but the time the operating system
+considers to be "now". The value and time of the value (called "Time" in the
+diagram below) are appended to the tree node.
+
+When appending a value to a tree node, it is checked whether it's time to write
+the values to disk. Values are written to disk if
+S<C<now() - First E<gt>= timeout>>, where C<timeout> is the timeout specified
+using the B<-w> option, see L<OPTIONS>. If the values are "old enough" they
+will be enqueued in the "update queue", i.E<nbsp>e. they will be appended to
+the linked list shown below.  Because the tree nodes and the elements of the
+linked list are the same data structures in memory, any update to a file that
+has already been enqueued will be written with the next write to the RRD file,
+too.
+
+A separate "update thread" constantly dequeues the first element in the update
+queue and writes all its values to the appropriate file. So as long as the
+update queue is not empty files are written at the highest possible rate.
+
+Since the timeout of files is checked only when new values are added to the
+file, "dead" files, i.E<nbsp>e. files that are not updated anymore, would never
+be written to disk. Therefore, every now and then, controlled by the B<-f>
+option, the entire tree is walked and all "old" values are enqueued. Since this
+only affects "dead" files and walking the tree is relatively expensive, you
+should set the "flush interval" to a reasonably high value. The default is
+3600E<nbsp>seconds (one hour).
+
+The downside of caching values is that they won't show up in graphs generated
+from the RRDE<nbsp>files. To get around this, the daemon provides the "flush
+command" to flush specific files. This means that the file is inserted at the
+B<head> of the update queue or moved there if it is already enqueued. The flush
+command will return after the update thread has dequeued the file, so there is
+a good chance that the file has been updated by the time the client receives
+the response from the daemon, but there is no guarantee.
+
+ +------+   +------+                               +------+
+ ! head !   ! root !                               ! tail !
+ +---+--+   +---+--+                               +---+--+
+     !         /\                                      !
+     !        /  \                                     !
+     !       /\  /\                                    !
+     !      /\/\ \ `----------------- ... --------,    !
+     V     /      `-------,                       !    V
+ +---+----+---+    +------+-----+             +---+----+---+
+ ! File:  foo !    ! File:  bar !             ! File:  qux !
+ ! First: 101 !    ! First: 119 !             ! First: 180 !
+ ! Next:   ---+--->! Next:   ---+---> ... --->! Next:   -  !
+ +============+    +============+             +============+
+ ! Time:  100 !    ! Time:  120 !             ! Time:  180 !
+ ! Value:  10 !    ! Value: 0.1 !             ! Value: 2,2 !
+ +------------+    +------------+             +------------+
+ ! Time:  110 !    ! Time:  130 !             ! Time:  190 !
+ ! Value:  26 !    ! Value: 0.1 !             ! Value: 7,3 !
+ +------------+    +------------+             +------------+
+ :            :    :            :             :            :
+ +------------+    +------------+             +------------+
+ ! Time:  230 !    ! Time:  250 !             ! Time:  310 !
+ ! Value:  42 !    ! Value: 0.2 !             ! Value: 1,2 !
+ +------------+    +------------+             +------------+
+
+The above diagram demonstrates:
+
+=over 4
+
+=item
+
+Files/values are stored in a (balanced) tree.
+
+=item
+
+Tree nodes and entries in the update queue are the same data structure.
+
+=item
+
+The local time ("First") and the time specified in updates ("Time") may differ.  
+
+=item
+
+Timed out values are inserted at the "tail".
+
+=item
+
+Explicitly flushed values are inserted at the "head".
+
+=item
+
+ASCII art rocks.
+
+=back
+
+=head1 SECURITY CONSIDERATIONS
+
+This daemon is meant to improve IOE<nbsp>performance for setups with thousands
+of RRDE<nbsp>file to be updated. So security measures built into the daemon can
+be summarized easily: B<There is no security built in!>
+
+There is no authentication and authorization, so B<you> will have to take care
+that only authorized clients can talk to the daemon. Since we assume that graph
+collection is done on a dedicated machine, i.E<nbsp>e. the box doesn't do
+anything else and especially does not have any interactive logins other than
+root, a UNIX domain socket should take care of that.
+
+If you (want to) use the network capability, i.E<nbsp>e. let the daemon bind to
+an IPv4 or IPv6 socket, it is B<your> job to install a packet filter or similar
+mechanism to prevent unauthorized connections. Unless you have a dedicated VLAN
+or VPN for this, using the network option is probably a bad idea!
+
+The daemon will blindly write to any file it gets told, so you really should
+create a separate user just for this daemon. Also it does not do any sanity
+checks, so if it gets told to write values for a time far in the future, your
+files will be messed up good!
+
+You have been warned.
+
+=head1 PROTOCOL
+
+The daemon communicates with clients using a line based ASCII protocol which is
+easy to read and easy to type. This makes it easy for scripts to implement the
+protocol and possible for users to use L<telnet(1)> to connect to the daemon
+and test stuff "by hand".
+
+The protocol is line based, this means that each record consists of one or more
+lines. A line is terminated by the line feed character C<0x0A>, commonly
+written as C<\n>. In the examples below, this character will be written as
+C<E<lt>LFE<gt>> ("line feed").
+
+After the connection has been established, the client is expected to send a
+"command". A command consists of the command keyword, possibly some arguments,
+and a terminating newline character. For a list of commands, see
+L<Valid Commands> below.
+
+Example:
+
+  FLUSH /tmp/foo.rrd<LF>
+
+The daemon answers with a line consisting of a status code and a short status
+message, separated by one or more space characters. A negative status code
+signals an error, a positive status code or zero signal success. If the status
+code is greater than zero, it indicates the number of lines that follow the
+status line.
+
+Examples:
+
+ 0 Success<LF>
+
+ 2 Two lines follow<LF>
+ This is the first line<LF>
+ And this is the second line<LF>
+
+=head2 Valid Commands
+
+The following commands are understood by the daemon:
+
+=over 4
+
+=item B<FLUSH> I<filename>
+
+Causes the daemon to put I<filename> to the B<head> of the update queue
+(possibly moving it there if the node is already enqueued). The answer will be
+sent B<after> the node has been dequeued.
+
+=item B<HELP> [I<command>]
+
+Returns a short usage message. If no command is given, or I<command> is
+B<HELP>, a list of commands supported by the daemon is returned. Otherwise a
+short description, possibly containing a pointer to a manual page, is returned.
+Obviously, this is meant for interactive usage and the format in which the
+commands and usage summaries are returned is not well defined.
+
+=item B<STATS>
+
+Returns a list of metrics which can be used to measure the daemons performance
+and check its status. For a description of the values returned, see
+L<Performance Values> below.
+
+The format in which the values are returned is similar to many other line based
+protocols: Each value is printed on a separate line, each consisting of the
+name of the value, a colon, one or more spaces and the actual value.
+
+Example:
+
+ 9 Statistics follow
+ QueueLength: 0
+ UpdatesReceived: 30
+ FlushesReceived: 2
+ UpdatesWritten: 13
+ DataSetsWritten: 390
+ TreeNodesNumber: 13
+ TreeDepth: 4
+ JournalBytes: 190
+ JournalRotate: 0
+
+=item B<UPDATE> I<filename> I<values> [I<values> ...]
+
+Adds more data to a filename. This is B<the> operation the daemon was designed
+for, so describing the mechanism again is unnecessary. Read L<HOW IT WORKS>
+above for a detailed explanation.
+
+=item B<WROTE> I<filename>
+
+This command is written to the journal after a file is successfully
+written out to disk.  It is used during journal replay to determine which
+updates have already been applied.  It is I<only> valid in the journal; it
+is not accepted from the other command channels.
+
+=back
+
+=head2 Performance Values
+
+The following counters are returned by the B<STATS> command:
+
+=over 4
+
+=item B<QueueLength> I<(unsigned 64bit integer)>
+
+Number of nodes currently enqueued in the update queue.
+
+=item B<UpdatesReceived> I<(unsigned 64bit integer)>
+
+Number of UPDATE commands received.
+
+=item B<FlushesReceived> I<(unsigned 64bit integer)>
+
+Number of FLUSH commands received.
+
+=item B<UpdatesWritten> I<(unsigned 64bit integer)>
+
+Total number of updates, i.E<nbsp>e. calls to C<rrd_update_r>, since the
+daemon was started.
+
+=item B<DataSetsWritten> I<(unsigned 64bit integer)>
+
+Total number of "data sets" written to disk since the daemon was started. A
+data set is one or more values passed to the B<UPDATE> command. For example:
+C<N:123:456> is one data set with two values. The term "data set" is used to
+prevent confusion whether individual values or groups of values are counted.
+
+=item B<TreeNodesNumber> I<(unsigned 64bit integer)>
+
+Number of nodes in the cache.
+
+=item B<TreeDepth> I<(unsigned 64bit integer)>
+
+Depth of the tree used for fast key lookup.
+
+=item B<JournalBytes> I<(unsigned 64bit integer)>
+
+Total number of bytes written to the journal since startup.
+
+=item B<JournalRotate> I<(unsigned 64bit integer)>
+
+Number of times the journal has been rotated since startup.
+
+=back
+
+=head1 BUGS
+
+No known bugs at the moment.
+
+=head1 SEE ALSO
+
+L<rrdtool(1)>, L<rrdgraph(1)>
+
+=head1 AUTHOR
+
+B<rrdcached> and this manual page have been written by Florian Forster
+E<lt>octoE<nbsp>atE<nbsp>verplant.orgE<gt>.
+
+=head1 CONTRIBUTORS
+
+kevin brintnall E<lt>kbrint@rufus.netE<gt>
+
+=cut
+
index a698d84..f48a6d4 100644 (file)
@@ -4,11 +4,16 @@ rrddump - dump the contents of an RRD to XML format
 
 =head1 SYNOPSIS
 
-B<rrdtool> B<dump> S<[B<--no-header>|B<-n>]> I<filename.rrd> E<gt> I<filename.xml>
+B<rrdtool> B<dump> I<filename.rrd>
+S<[B<--no-header>|B<-n>]>
+S<[B<--daemon> I<address>]>
+S<E<gt> I<filename.xml>>
 
 or 
 
-B<rrdtool> B<dump> S<[B<--no-header>|B<-n>]> I<filename.rrd> I<filename.xml>
+B<rrdtool> B<dump> I<filename.rrd> I<filename.xml>
+S<[B<--no-header>|B<-n>]>
+S<[B<--daemon> I<address>]>
 
 =head1 DESCRIPTION
 
@@ -31,13 +36,24 @@ The name of the B<RRD> you want to dump.
 The (optional) filename that you want to write the XML output to.
 If not specified, the XML will be printed to stdout.
 
-=item S<[B<--no-header>|B<-n>]>
+=item B<--no-header>|B<-n>
 
 In rrdtool 1.3, the dump function started producing correct xml-headers.
 Unfortunately the rrdtool restore function from the 1.2 series can not
 handle these headers. With this option you can supress the creatinon of
 the xml headers.
 
+=item B<--daemon> I<address>
+
+Address of the L<rrdcached(1)> daemon. If specified, a C<flush> command is sent
+to the server before reading the RRD files. This allows B<rrdtool> to return
+fresh data even if the daemon is configured to cache values for a long time. To
+specify a UNIX domain socket use the prefix C<unix:>, see example below. Other
+addresses are interpreted as normal network addresses, i.E<nbsp>e. IPv4 or IPv6
+addresses in most cases.
+
+ rrdtool dump --daemon unix:/var/run/rrdcached.sock /var/lib/rrd/foo.rrd
+
 =back
 
 =head1 EXAMPLES
@@ -62,6 +78,21 @@ B<rrdrestore> for details.
 
 =back
 
+=head1 ENVIRONMENT VARIABLES
+
+The following environment variables may be used to change the behavior of
+C<rrdtoolE<nbsp>dump>:
+
+=over 4
+
+=item B<RRDCACHED_ADDRESS>
+
+If this environment variable is set it will have the same effect as specifying
+the C<--daemon> option on the command line. If both are present, the command
+line argument takes precedence.
+
+=back
+
 =head1 AUTHOR
 
 Tobias Oetiker E<lt>tobi@oetiker.chE<gt>
index 51b5ccd..c6a0b1e 100644 (file)
@@ -8,6 +8,7 @@ B<rrdtool> B<fetch> I<filename> I<CF>
 S<[B<--resolution>|B<-r> I<resolution>]>
 S<[B<--start>|B<-s> I<start>]>
 S<[B<--end>|B<-e> I<end>]>
+S<[B<--daemon> I<address>]>
 
 =head1 DESCRIPTION
 
@@ -48,6 +49,17 @@ the end of the time series in seconds since epoch. See also AT-STYLE
 TIME SPECIFICATION section for a detailed explanation of how to
 specify the end time.
 
+=item B<--daemon> I<address>
+
+Address of the L<rrdcached(1)> daemon. If specified, a C<flush> command is sent
+to the server before reading the RRD files. This allows B<rrdtool> to return
+fresh data even if the daemon is configured to cache values for a long time. To
+specify a UNIX domain socket use the prefix C<unix:>, see example below. Other
+addresses are interpreted as normal network addresses, i.E<nbsp>e. IPv4 or IPv6
+addresses in most cases.
+
+ rrdtool fetch --daemon unix:/var/run/rrdcached.sock /var/lib/rrd/foo.rrd AVERAGE
+
 =back
 
 =head2 RESOLUTION INTERVAL
@@ -257,6 +269,22 @@ I<931225537> -- 18:45  July 5th, 1999
 I<19970703 12:45> -- 12:45  July 3th, 1997
 (my favorite, and its even got an ISO number (8601)).
 
+=head1 ENVIRONMENT VARIABLES
+
+The following environment variables may be used to change the behavior of
+C<rrdtoolE<nbsp>fetch>:
+
+=over 4
+
+=item B<RRDCACHED_ADDRESS>
+
+If this environment variable is set it will have the same effect as specifying
+the C<--daemon> option on the command line. If both are present, the command
+line argument takes precedence.
+
+=back
+
 =head1 AUTHOR
 
-Tobias Oetiker <tobi@oetiker.ch>
+Tobias Oetiker E<lt>tobi@oetiker.chE<gt>
+
diff --git a/doc/rrdflush.pod b/doc/rrdflush.pod
new file mode 100644 (file)
index 0000000..031c29a
--- /dev/null
@@ -0,0 +1,54 @@
+=head1 NAME
+
+rrdflush - Flush the values for a spcific RRD file from memory.
+
+=head1 SYNOPSIS
+
+B<rrdtool> B<flush> I<filename>
+S<[B<--daemon> I<address>]>
+
+=head1 DESCRIPTION
+
+The B<flush> function connects to L<rrdcached(1)>, the RRD caching daemon, and
+issues a "flush" command for the given file. The daemon will put this file to
+the head of the update queue so it is written "soon". The status will be
+returned after the node has been B<dequeued> by the update thread. By the time
+execution of this command ends it is very likely that the update thread has
+just updated the requested file, though this is not guaranteed.
+
+=over 8
+
+=item I<filename>
+
+The name of the B<RRD> that is to be written to disk.
+
+=item B<--daemon> I<address>
+
+Address of the L<rrdcached(1)> daemon. If not specified, the RRDCACHED_ADDRESS
+environment variable must be set (see below). To specify a UNIX domain socket
+use the prefix C<unix:>, see example below. Other addresses are interpreted as
+normal network addresses, i.E<nbsp>e. IPv4 or IPv6 addresses in most cases.
+
+ rrdtool flush --daemon unix:/var/run/rrdcached.sock /var/lib/rrd/foo.rrd
+
+=back
+
+=head1 ENVIRONMENT VARIABLES
+
+The following environment variables may be used to change the behavior of
+C<rrdtoolE<nbsp>flush>:
+
+=over 4
+
+=item B<RRDCACHED_ADDRESS>
+
+If this environment variable is set it will have the same effect as specifying
+the C<--daemon> option on the command line. If both are present, the command
+line argument takes precedence.
+
+=back
+
+=head1 AUTHOR
+
+Florian Forster E<lt>octoE<nbsp>atE<nbsp>verplant.orgE<gt>
+
index c2e9b3d..94260d9 100644 (file)
@@ -248,7 +248,7 @@ to the more robust B<--alt-y-grid> mode.
 
 How many digits should rrdtool assume the y-axis labels to be? You
 may have to use this option to make enough space once you start
-fideling with the y-axis labeling.
+fiddling with the y-axis labeling.
 
 [B<--units=si>]
 
@@ -267,6 +267,17 @@ Note, that only the image size will be returned, if you run with lazy even
 when using graphv and even when using PRINT.
 
 
+[B<--daemon> I<address>]
+
+Address of the L<rrdcached(1)> daemon. If specified, a C<flush> command is sent
+to the server before reading the RRD files. This allows the graph to contain
+fresh data even if the daemon is configured to cache values for a long time. To
+specify a UNIX domain socket use the prefix C<unix:>, see example below. Other
+addresses are interpreted as normal network addresses, i.E<nbsp>e. IPv4 or IPv6
+addresses in most cases.
+
+ rrdtool graph [...] --daemon unix:/var/run/rrdcached.sock [...]
+
 [B<-f>|B<--imginfo> I<printfstr>]
 
 After the image has been created, the graph function uses printf
@@ -469,6 +480,21 @@ There is more information returned than in the standard interface.
 Especially the 'graph_*' keys are new. They help applications that want to
 know what is where on the graph.
 
+=head1 ENVIRONMENT VARIABLES
+
+The following environment variables may be used to change the behavior of
+C<rrdtoolE<nbsp>graph>:
+
+=over 4
+
+=item B<RRDCACHED_ADDRESS>
+
+If this environment variable is set it will have the same effect as specifying
+the C<--daemon> option on the command line. If both are present, the command
+line argument takes precedence.
+
+=back
+
 =head1 SEE ALSO
 
 L<rrdgraph> gives an overview of how B<rrdtool graph> works.
index e83d8d6..e8e59c6 100644 (file)
@@ -4,7 +4,8 @@ rrdinfo - extract header information from an RRD
 
 =head1 SYNOPSIS
 
-B<rrdtool> B<info> I<filename.rrd>
+B<rrdtool> B<info> I<filename>
+S<[B<--daemon> I<address>]>
 
 =head1 DESCRIPTION
 
@@ -14,6 +15,25 @@ a parsing friendly format.
 Check L<rrdcreate> if you are uncertain about the meaning of the
 individual keys.
 
+=over 8
+
+=item I<filename>
+
+The name of the B<RRD> you want to examine.
+
+=item B<--daemon> I<address>
+
+Address of the L<rrdcached(1)> daemon. If specified, a C<flush> command is sent
+to the server before reading the RRD files. This allows B<rrdtool> to return
+fresh data even if the daemon is configured to cache values for a long time. To
+specify a UNIX domain socket use the prefix C<unix:>, see example below. Other
+addresses are interpreted as normal network addresses, i.E<nbsp>e. IPv4 or IPv6
+addresses in most cases.
+
+ rrdtool info --daemon unix:/var/run/rrdcached.sock /var/lib/rrd/foo.rrd
+
+=back
+
 =head1 EXAMPLE
 
 This is the output generated by running B<info> on a simple RRD which
@@ -48,11 +68,18 @@ data sources.
  rra[0].cdp_prep[1].value = nan
  rra[0].cdp_prep[1].unknown_datapoints = 0
 
-=over 8
+=head1 ENVIRONMENT VARIABLES
 
-=item I<filename.rrd>
+The following environment variables may be used to change the behavior of
+C<rrdtoolE<nbsp>info>:
 
-The name of the B<RRD> you want to examine.
+=over 4
+
+=item B<RRDCACHED_ADDRESS>
+
+If this environment variable is set it will have the same effect as specifying
+the C<--daemon> option on the command line. If both are present, the command
+line argument takes precedence.
 
 =back
 
index a3eb7fe..b3dcec1 100644 (file)
@@ -5,6 +5,7 @@ rrdlast - Return the date of the last data sample in an RRD
 =head1 SYNOPSIS
 
 B<rrdtool> B<last> I<filename>
+S<[B<--daemon> I<address>]>
 
 =head1 DESCRIPTION
 
@@ -17,6 +18,32 @@ update of the RRD.
 
 The name of the B<RRD> that contains the data.
 
+=item B<--daemon> I<address>
+
+Address of the L<rrdcached(1)> daemon. If specified, a C<flush> command is sent
+to the server before reading the RRD files. This allows B<rrdtool> to return
+fresh data even if the daemon is configured to cache values for a long time. To
+specify a UNIX domain socket use the prefix C<unix:>, see example below. Other
+addresses are interpreted as normal network addresses, i.E<nbsp>e. IPv4 or IPv6
+addresses in most cases.
+
+ rrdtool last --daemon unix:/var/run/rrdcached.sock /var/lib/rrd/foo.rrd
+
+=back
+
+=head1 ENVIRONMENT VARIABLES
+
+The following environment variables may be used to change the behavior of
+C<rrdtoolE<nbsp>last>:
+
+=over 4
+
+=item B<RRDCACHED_ADDRESS>
+
+If this environment variable is set it will have the same effect as specifying
+the C<--daemon> option on the command line. If both are present, the command
+line argument takes precedence.
+
 =back
 
 =head1 AUTHOR
index 37013cf..72b8682 100644 (file)
@@ -5,6 +5,7 @@ rrdlastupdate - Return the most recent update to an RRD
 =head1 SYNOPSIS
 
 B<rrdtool> B<lastupdate> I<filename>
+S<[B<--daemon> I<address>]>
 
 =head1 DESCRIPTION
 
@@ -17,11 +18,37 @@ value stored for each datum in the most recent update of an RRD.
 
 The name of the B<RRD> that contains the data.
 
+=item B<--daemon> I<address>
+
+Address of the L<rrdcached(1)> daemon. If specified, a C<flush> command is sent
+to the server before reading the RRD files. This allows B<rrdtool> to return
+fresh data even if the daemon is configured to cache values for a long time. To
+specify a UNIX domain socket use the prefix C<unix:>, see example below. Other
+addresses are interpreted as normal network addresses, i.E<nbsp>e. IPv4 or IPv6
+addresses in most cases.
+
+ rrdtool lastupdate --daemon unix:/var/run/rrdcached.sock /var/lib/rrd/foo.rrd
+
+=back
+
+=head1 ENVIRONMENT VARIABLES
+
+The following environment variables may be used to change the behavior of
+C<rrdtoolE<nbsp>lastupdate>:
+
+=over 4
+
+=item B<RRDCACHED_ADDRESS>
+
+If this environment variable is set it will have the same effect as specifying
+the C<--daemon> option on the command line. If both are present, the command
+line argument takes precedence.
+
 =back
 
 =head1 AUTHOR
 
-Andy Riebs <andy.riebs@hp.com>
+Andy Riebs E<lt>andy.riebs@hp.comE<gt>
 
 
 
index 154afe7..835f64b 100644 (file)
@@ -91,7 +91,11 @@ Change the size of individual RRAs. This is dangerous! Check L<rrdresize>.
 
 =item B<xport>
 
-Export data retrieved from one or several RRDs. Check L<rrdxport>
+Export data retrieved from one or several RRDs. Check L<rrdxport>.
+
+=item B<flush>
+
+Flush the values for a spcific RRD file from memory. Check L<rrdflush>.
 
 =item B<rrdcgi>
 
@@ -298,9 +302,17 @@ sockets, tools like netcat, or in a quick interactive test by using
 B<NOTE:> that there is no authentication with this feature! Do not setup
 such a port unless you are sure what you are doing.
 
+=head1 RRDCACHED, THE CACHING DAEMON
+
+For very big setups, updating thousands of RRD files often becomes a serious IO
+problem. If you run into such problems, you might want to take a look at
+L<rrdcached(1)>, a caching daemon for RRDTool which may help you lessen the
+stress on your disks.
+
 =head1 SEE ALSO
 
-rrdcreate, rrdupdate, rrdgraph, rrddump, rrdfetch, rrdtune, rrdlast, rrdxport
+rrdcreate, rrdupdate, rrdgraph, rrddump, rrdfetch, rrdtune, rrdlast, rrdxport,
+rrdflush, rrdcached
 
 =head1 BUGS
 
index cc0b452..d61cf12 100644 (file)
@@ -6,6 +6,7 @@ rrdupdate - Store a new set of values into the RRD
 
 B<rrdtool> {B<update> | B<updatev>} I<filename>
 S<[B<--template>|B<-t> I<ds-name>[B<:>I<ds-name>]...]>
+S<[B<--daemon> I<address>]>
 S<B<N>|I<timestamp>B<:>I<value>[B<:>I<value>...]>
 S<I<at-timestamp>B<@>I<value>[B<:>I<value>...]>
 S<[I<timestamp>B<:>I<value>[B<:>I<value>...] ...]>
@@ -29,6 +30,9 @@ RRA (consolidation function and PDPs per CDP), and data source (name).
 Note that depending on the arguments of the current and previous call to
 update, the list may have no entries or a large number of entries.
 
+Since B<updatev> requires direct disk access, the B<--daemon> option cannot be
+used with this command.
+
 =item I<filename>
 
 The name of the B<RRD> you want to update.
@@ -56,6 +60,18 @@ function. If this is done accidentally (and this can only be done
 using the template switch), B<RRDtool> will ignore the value specified
 for the COMPUTE B<DST>.
 
+=item B<--daemon> I<address>
+
+If given, B<RRDTool> will try to connect to the caching daemon L<rrdcached(1)>
+at I<address> and will fail if the connection cannot be established. If the
+connection is successfully established the values will be sent to the daemon
+instead of accessing the files directly. If I<address> begins with C<unix:>
+then everything after this prefix will be considered to be a UNIX domain
+socket, see L<EXAMPLES> below. Otherwise the address is interpreted as network
+address or node name as understood by L<getaddrinfo(3)>. One practical
+consequence is that both, IPv4 and IPv6, may be used if the system supports
+it. This option is available for the B<update> command only.
+
 =item B<N>|I<timestamp>B<:>I<value>[B<:>I<value>...]
 
 The data used for updating the RRD was acquired at a certain
@@ -82,20 +98,65 @@ separator.
 
 =back
 
-=head1 EXAMPLE
+=head1 ENVIRONMENT VARIABLES
+
+The following environment variables may be used to change the behavior of
+C<rrdtoolE<nbsp>update>:
+
+=over 4
+
+=item B<RRDCACHED_ADDRESS>
+
+If this environment variable is set it will have the same effect as specifying
+the C<--daemon> option on the command line. If both are present, the command
+line argument takes precedence.
+
+=back
+
+=head1 EXAMPLES
+
+=over 4
+
+=item
 
 C<rrdtool update demo1.rrd N:3.44:3.15:U:23>
 
 Update the database file demo1.rrd with 3 known and one I<*UNKNOWN*>
 value. Use the current time as the update time.
 
+=item
+
 C<rrdtool update demo2.rrd 887457267:U 887457521:22 887457903:2.7>
 
 Update the database file demo2.rrd which expects data from a single
 data-source, three times. First with an I<*UNKNOWN*> value then with two
 regular readings. The update interval seems to be around 300 seconds.
 
-=head1 AUTHOR
+=item
+
+C<rrdtool update --cache /var/lib/rrd/demo3.rrd N:42>
+
+Update the file C</var/lib/rrd/demo3.rrd> with a single data source, using the
+current time. If the caching daemon cannot be reached, do B<not> fall back to
+direct file access.
+
+=item
+
+C<rrdtool update --daemon unix:/tmp/rrdd.sock demo4.rrd N:23>
+
+Use the UNIX domain socket C</tmp/rrdd.sock> to contact the caching daemon. If
+the caching daemon is not available, update the file C<demo4.rrd> directly.
+B<WARNING:> Since a relative path is specified, the following disturbing effect
+may occur: If the daemon is available, the file relative to the working
+directory B<of the daemon> is used. If the daemon is not available, the file
+relative to the current working directory of the invoking process is used.
+B<This may update two different files depending on whether the daemon could be
+reached or not.> Don't do relative paths, kids!
+
+=back
+
+=head1 AUTHORS
 
-Tobias Oetiker <tobi@oetiker.ch>
+Tobias Oetiker <tobi@oetiker.ch>,
+Florian Forster <octoE<nbsp>atE<nbsp>verplant.org>
 
index a668a20..0c88a33 100644 (file)
@@ -9,6 +9,7 @@ S<[B<-s>|B<--start> I<seconds>]>
 S<[B<-e>|B<--end> I<seconds>]>
 S<[B<-m>|B<--maxrows> I<rows>]>
 S<[B<--step> I<value>]>
+S<[B<--daemon> I<address>]>
 S<[B<DEF:>I<vname>B<=>I<rrd>B<:>I<ds-name>B<:>I<CF>]>
 S<[B<CDEF:>I<vname>B<=>I<rpn-expression>]>
 S<[B<XPORT>B<:>I<vname>[B<:>I<legend>]]>
@@ -48,6 +49,17 @@ for details.
 
 See L<rrdgraph> documentation.
 
+=item B<--daemon> I<address>
+
+Address of the L<rrdcached(1)> daemon. If specified, a C<flush> command is sent
+to the server before reading the RRD files. This allows B<rrdtool> to return
+fresh data even if the daemon is configured to cache values for a long time. To
+specify a UNIX domain socket use the prefix C<unix:>, see example below. Other
+addresses are interpreted as normal network addresses, i.E<nbsp>e. IPv4 or IPv6
+addresses in most cases.
+
+ rrdtool xport --daemon unix:/var/run/rrdcached.sock ...
+
 =item B<--enumds> 
 
 The generated xml should contain the data values in enumerated tags.
@@ -137,6 +149,20 @@ The resulting data section is:
           XPORT:out2:"if2 out bytes" \
          XPORT:sum:"output sum"
 
+=head1 ENVIRONMENT VARIABLES
+
+The following environment variables may be used to change the behavior of
+C<rrdtoolE<nbsp>xport>:
+
+=over 4
+
+=item B<RRDCACHED_ADDRESS>
+
+If this environment variable is set it will have the same effect as specifying
+the C<--daemon> option on the command line. If both are present, the command
+line argument takes precedence.
+
+=back
 
 =head1 AUTHOR
 
index 8ff671d..420aeab 100644 (file)
@@ -23,6 +23,7 @@ UPD_C_FILES =         \
        rrd_info.c      \
        rrd_error.c     \
        rrd_open.c      \
+       rrd_client.c    \
        rrd_nan_inf.c   \
        rrd_rpncalc.c   \
        rrd_update.c
@@ -41,6 +42,7 @@ RRD_C_FILES =         \
        rrd_xport.c     \
        rrd_gfx.c \
        rrd_dump.c      \
+       rrd_flush.c     \
        rrd_fetch.c     \
        rrd_resize.c \
        rrd_tune.c
@@ -82,9 +84,9 @@ librrd_th_la_LDFLAGS         = $(MULTITHREAD_LDFLAGS) -version-info @LIBVERS@
 librrd_th_la_LDFLAGS         += -export-symbols librrd.sym
 librrd_th_la_LIBADD          = $(ALL_LIBS)
 
-include_HEADERS        = rrd.h rrd_format.h
+include_HEADERS        = rrd.h rrd_format.h rrd_client.h
 
-bin_PROGRAMS   = rrdtool rrdupdate
+bin_PROGRAMS   = rrdtool rrdupdate rrdcached
 
 if BUILD_RRDCGI
 bin_PROGRAMS += rrdcgi
@@ -100,6 +102,11 @@ rrdtool_SOURCES = rrd_tool.c
 rrdtool_DEPENDENCIES = librrd.la
 rrdtool_LDADD  = librrd.la
 
+rrdcached_SOURCES = rrd_daemon.c
+rrdcached_DEPENDENCIES = librrd.la
+rrdcached_CPPFLAGS = -DVERSION='"$(VERSION)"' -DLOCALSTATEDIR='"$(localstatedir)"'
+rrdcached_LDADD = librrd.la
+
 # strftime is here because we do not usually need it. unices have propper
 # iso date support
 EXTRA_DIST= strftime.c strftime.h  rrd_getopt.c rrd_getopt1.c rrd_getopt.h \
index a178f55..ae5da6f 100644 (file)
@@ -1,5 +1,6 @@
 rrd_clear_error
 rrd_close
+rrd_cmd_flush
 rrd_create
 rrd_create_r
 rrd_dontneed
@@ -25,6 +26,7 @@ rrd_init
 rrd_last
 rrd_last_r
 rrd_lastupdate
+rrd_lastupdate_r
 rrd_lock
 rrd_new_context
 rrd_open
@@ -48,4 +50,11 @@ rrd_update_v
 rrd_version
 rrd_write
 rrd_xport
+rrdc_connect
+rrdc_is_connected
+rrdc_disconnect
+rrdc_flush
+rrdc_stats_free
+rrdc_stats_get
+rrdc_update
 @RRD_GETOPT_LONG@
index a428370..ad7e15c 100644 (file)
--- a/src/rrd.h
+++ b/src/rrd.h
@@ -171,13 +171,7 @@ extern    "C" {
     time_t    rrd_last(
     int,
     char **);
-    int       rrd_lastupdate(
-    int argc,
-    char **argv,
-    time_t *last_update,
-    unsigned long *ds_cnt,
-    char ***ds_namv,
-    char ***last_ds);
+    int rrd_lastupdate(int argc, char **argv);
     time_t    rrd_first(
     int,
     char **);
@@ -198,6 +192,7 @@ extern    "C" {
     unsigned long *,
     char ***,
     rrd_value_t **);
+    int       rrd_cmd_flush (int argc, char **argv);
 
     void      rrd_freemem(
     void *mem);
@@ -217,20 +212,24 @@ extern    "C" {
     const char *_template,
     int argc,
     const char **argv);
-    int       rrd_fetch_r(
-    const char *filename,
-    const char *cf,
-    time_t *start,
-    time_t *end,
-    unsigned long *step,
-    unsigned long *ds_cnt,
-    char ***ds_namv,
-    rrd_value_t **data);
+    int rrd_fetch_r (
+            const char *filename,
+            const char *cf,
+            time_t *start,
+            time_t *end,
+            unsigned long *step,
+            unsigned long *ds_cnt,
+            char ***ds_namv,
+            rrd_value_t **data);
     int       rrd_dump_r(
     const char *filename,
     char *outname);
-    time_t    rrd_last_r(
-    const char *filename);
+    time_t    rrd_last_r (const char *filename);
+    int rrd_lastupdate_r (const char *filename,
+            time_t *ret_last_update,
+            unsigned long *ret_ds_count,
+            char ***ret_ds_names,
+            char ***ret_last_ds);
     time_t    rrd_first_r(
     const char *filename,
     int rraindex);
diff --git a/src/rrd_client.c b/src/rrd_client.c
new file mode 100644 (file)
index 0000000..d1ad5e0
--- /dev/null
@@ -0,0 +1,792 @@
+/**
+ * RRDTool - src/rrd_client.c
+ * Copyright (C) 2008 Florian octo Forster
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; only version 2 of the License is applicable.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
+ *
+ * Authors:
+ *   Florian octo Forster <octo at verplant.org>
+ **/
+
+#include "rrd.h"
+#include "rrd_client.h"
+#include "rrd_tool.h"
+
+#include <stdlib.h>
+#include <string.h>
+#include <errno.h>
+#include <assert.h>
+#include <pthread.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <netdb.h>
+
+#ifndef ENODATA
+#define ENODATA ENOENT
+#endif
+
+struct rrdc_response_s
+{
+  int status;
+  char *message;
+  char **lines;
+  size_t lines_num;
+};
+typedef struct rrdc_response_s rrdc_response_t;
+
+static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
+static int sd = -1;
+static char *sd_path = NULL; /* cache the path for sd */
+static void _disconnect(void);
+
+static ssize_t sread (void *buffer_void, size_t buffer_size) /* {{{ */
+{
+  char    *buffer;
+  size_t   buffer_used;
+  size_t   buffer_free;
+  ssize_t  status;
+
+  buffer       = (char *) buffer_void;
+  buffer_used  = 0;
+  buffer_free  = buffer_size;
+
+  while (buffer_free > 0)
+  {
+    status = read (sd, buffer + buffer_used, buffer_free);
+    if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
+      continue;
+
+    if (status < 0)
+      return (-1);
+
+    if (status == 0)
+    {
+      _disconnect();
+      errno = EPROTO;
+      return (-1);
+    }
+
+    assert ((0 > status) || (buffer_free >= (size_t) status));
+
+    buffer_free -= status;
+    buffer_used += status;
+
+    if (buffer[buffer_used - 1] == '\n')
+      break;
+  }
+
+  if (buffer[buffer_used - 1] != '\n')
+  {
+    errno = ENOBUFS;
+    return (-1);
+  }
+
+  buffer[buffer_used - 1] = '\0';
+  return (buffer_used);
+} /* }}} ssize_t sread */
+
+static ssize_t swrite (const void *buf, size_t count) /* {{{ */
+{
+  const char *ptr;
+  size_t      nleft;
+  ssize_t     status;
+
+  ptr   = (const char *) buf;
+  nleft = count;
+
+  while (nleft > 0)
+  {
+    status = write (sd, (const void *) ptr, nleft);
+
+    if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
+      continue;
+
+    if (status < 0)
+    {
+      _disconnect();
+      rrd_set_error("lost connection to rrdcached");
+      return (status);
+    }
+
+    nleft -= status;
+    ptr   += status;
+  }
+
+  return (0);
+} /* }}} ssize_t swrite */
+
+static int buffer_add_string (const char *str, /* {{{ */
+    char **buffer_ret, size_t *buffer_size_ret)
+{
+  char *buffer;
+  size_t buffer_size;
+  size_t buffer_pos;
+  size_t i;
+  int status;
+
+  buffer = *buffer_ret;
+  buffer_size = *buffer_size_ret;
+  buffer_pos = 0;
+
+  i = 0;
+  status = -1;
+  while (buffer_pos < buffer_size)
+  {
+    if (str[i] == 0)
+    {
+      buffer[buffer_pos] = ' ';
+      buffer_pos++;
+      status = 0;
+      break;
+    }
+    else if ((str[i] == ' ') || (str[i] == '\\'))
+    {
+      if (buffer_pos >= (buffer_size - 1))
+        break;
+      buffer[buffer_pos] = '\\';
+      buffer_pos++;
+      buffer[buffer_pos] = str[i];
+      buffer_pos++;
+    }
+    else
+    {
+      buffer[buffer_pos] = str[i];
+      buffer_pos++;
+    }
+    i++;
+  } /* while (buffer_pos < buffer_size) */
+
+  if (status != 0)
+    return (-1);
+
+  *buffer_ret = buffer + buffer_pos;
+  *buffer_size_ret = buffer_size - buffer_pos;
+
+  return (0);
+} /* }}} int buffer_add_string */
+
+static int buffer_add_value (const char *value, /* {{{ */
+    char **buffer_ret, size_t *buffer_size_ret)
+{
+  char temp[4096];
+
+  if (strncmp (value, "N:", 2) == 0)
+    snprintf (temp, sizeof (temp), "%lu:%s",
+        (unsigned long) time (NULL), value + 2);
+  else
+    strncpy (temp, value, sizeof (temp));
+  temp[sizeof (temp) - 1] = 0;
+
+  return (buffer_add_string (temp, buffer_ret, buffer_size_ret));
+} /* }}} int buffer_add_value */
+
+static int response_parse (char *buffer, size_t buffer_size, /* {{{ */
+    rrdc_response_t **ret_response)
+{
+  rrdc_response_t *ret;
+
+  char *dummy;
+  char *saveptr;
+
+  char *line_ptr;
+  size_t line_counter;
+
+  if (buffer == NULL)
+    return (EINVAL);
+  if (buffer_size <= 0)
+    return (EINVAL);
+
+  if (buffer[buffer_size - 1] != 0)
+    return (-1);
+
+  ret = (rrdc_response_t *) malloc (sizeof (rrdc_response_t));
+  if (ret == NULL)
+    return (ENOMEM);
+  memset (ret, 0, sizeof (*ret));
+
+  line_counter = 0;
+
+  dummy = buffer;
+  saveptr = NULL;
+  while ((line_ptr = strtok_r (dummy, "\r\n", &saveptr)) != NULL)
+  {
+    dummy = NULL;
+
+    if (ret->message == NULL)
+    {
+      ret->status = strtol (buffer, &ret->message, 0);
+      if (buffer == ret->message)
+      {
+        free (ret);
+        return (EPROTO);
+      }
+
+      /* Skip leading whitespace of the status message */
+      ret->message += strspn (ret->message, " \t");
+
+      if (ret->status > 0)
+      {
+        ret->lines = (char **) malloc (sizeof (char *) * ret->status);
+        if (ret->lines == NULL)
+        {
+          free (ret);
+          return (ENOMEM);
+        }
+        memset (ret->lines, 0, sizeof (char *) * ret->status);
+        ret->lines_num = (size_t) ret->status;
+      }
+      else
+      {
+        ret->lines = NULL;
+        ret->lines_num = 0;
+      }
+    }
+    else /* if (ret->message != NULL) */
+    {
+      if (line_counter < ret->lines_num)
+        ret->lines[line_counter] = line_ptr;
+      line_counter++;
+    }
+  } /* while (strtok_r) */
+
+  if (ret->lines_num != line_counter)
+  {
+    errno = EPROTO;
+    if (ret->lines != NULL)
+      free (ret->lines);
+    free (ret);
+    return (-1);
+  }
+
+  *ret_response = ret;
+  return (0);
+} /* }}} int response_parse */
+
+static void response_free (rrdc_response_t *res) /* {{{ */
+{
+  if (res == NULL)
+    return;
+
+  if (res->lines != NULL)
+  {
+    res->lines_num = 0;
+    free (res->lines);
+    res->lines = NULL;
+  }
+
+  free (res);
+} /* }}} void response_free */
+
+
+/* determine whether we are connected to the specified daemon_addr if
+ * NULL, return whether we are connected at all
+ */
+int rrdc_is_connected(const char *daemon_addr) /* {{{ */
+{
+  if (sd < 0)
+    return 0;
+  else if (daemon_addr == NULL)
+  {
+    /* here we have to handle the case i.e.
+     *   UPDATE --daemon ...; UPDATEV (no --daemon) ...
+     * In other words: we have a cached connection,
+     * but it is not specified in the current command.
+     * Daemon is only implied in this case if set in ENV
+     */
+    if (getenv(ENV_RRDCACHED_ADDRESS) != NULL)
+      return 1;
+    else
+      return 0;
+  }
+  else if (strcmp(daemon_addr, sd_path) == 0)
+    return 1;
+  else
+    return 0;
+
+} /* }}} int rrdc_is_connected */
+
+static int rrdc_connect_unix (const char *path) /* {{{ */
+{
+  struct sockaddr_un sa;
+  int status;
+
+  assert (path != NULL);
+  assert (sd == -1);
+
+  sd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
+  if (sd < 0)
+  {
+    status = errno;
+    return (status);
+  }
+
+  memset (&sa, 0, sizeof (sa));
+  sa.sun_family = AF_UNIX;
+  strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
+
+  status = connect (sd, (struct sockaddr *) &sa, sizeof (sa));
+  if (status != 0)
+  {
+    status = errno;
+    return (status);
+  }
+
+  return (0);
+} /* }}} int rrdc_connect_unix */
+
+static int rrdc_connect_network (const char *addr) /* {{{ */
+{
+  struct addrinfo ai_hints;
+  struct addrinfo *ai_res;
+  struct addrinfo *ai_ptr;
+
+  assert (addr != NULL);
+  assert (sd == -1);
+
+  int status;
+  memset (&ai_hints, 0, sizeof (ai_hints));
+  ai_hints.ai_flags = 0;
+#ifdef AI_ADDRCONFIG
+  ai_hints.ai_flags |= AI_ADDRCONFIG;
+#endif
+  ai_hints.ai_family = AF_UNSPEC;
+  ai_hints.ai_socktype = SOCK_STREAM;
+
+  ai_res = NULL;
+  status = getaddrinfo (addr, RRDCACHED_DEFAULT_PORT, &ai_hints, &ai_res);
+  if (status != 0)
+    return (status);
+
+  for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
+  {
+    sd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
+    if (sd < 0)
+    {
+      status = errno;
+      sd = -1;
+      continue;
+    }
+
+    status = connect (sd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
+    if (status != 0)
+    {
+      status = errno;
+      _disconnect();
+      continue;
+    }
+
+    assert (status == 0);
+    break;
+  } /* for (ai_ptr) */
+
+  return (status);
+} /* }}} int rrdc_connect_network */
+
+int rrdc_connect (const char *addr) /* {{{ */
+{
+  int status = 0;
+
+  if (addr == NULL)
+    addr = getenv (ENV_RRDCACHED_ADDRESS);
+
+  if (addr == NULL)
+    return 0;
+
+  pthread_mutex_lock(&lock);
+
+  if (sd >= 0 && sd_path != NULL && strcmp(addr, sd_path) == 0)
+  {
+    /* connection to the same daemon; use cached connection */
+    pthread_mutex_unlock (&lock);
+    return (0);
+  }
+  else
+  {
+    _disconnect();
+  }
+
+  if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
+    status = rrdc_connect_unix (addr + strlen ("unix:"));
+  else if (addr[0] == '/')
+    status = rrdc_connect_unix (addr);
+  else
+    status = rrdc_connect_network(addr);
+
+  if (status == 0 && sd >= 0)
+    sd_path = strdup(addr);
+  else
+    rrd_set_error("Unable to connect to rrdcached: %s",
+                  (status < 0)
+                  ? "Internal error"
+                  : rrd_strerror (status));
+
+  pthread_mutex_unlock (&lock);
+  return (status);
+} /* }}} int rrdc_connect */
+
+static void _disconnect(void) /* {{{ */
+{
+  if (sd >= 0)
+    close(sd);
+
+  if (sd_path != NULL)
+    free(sd_path);
+
+  sd = -1;
+  sd_path = NULL;
+} /* }}} static void _disconnect(void) */
+
+int rrdc_disconnect (void) /* {{{ */
+{
+  pthread_mutex_lock (&lock);
+
+  _disconnect();
+
+  pthread_mutex_unlock (&lock);
+
+  return (0);
+} /* }}} int rrdc_disconnect */
+
+int rrdc_update (const char *filename, int values_num, /* {{{ */
+               const char * const *values)
+{
+  char buffer[4096];
+  char *buffer_ptr;
+  size_t buffer_free;
+  size_t buffer_size;
+  int status;
+  int i;
+
+  memset (buffer, 0, sizeof (buffer));
+  buffer_ptr = &buffer[0];
+  buffer_free = sizeof (buffer);
+
+  status = buffer_add_string ("update", &buffer_ptr, &buffer_free);
+  if (status != 0)
+    return (ENOBUFS);
+
+  status = buffer_add_string (filename, &buffer_ptr, &buffer_free);
+  if (status != 0)
+    return (ENOBUFS);
+
+  for (i = 0; i < values_num; i++)
+  {
+    status = buffer_add_value (values[i], &buffer_ptr, &buffer_free);
+    if (status != 0)
+      return (ENOBUFS);
+  }
+
+  assert (buffer_free < sizeof (buffer));
+  buffer_size = sizeof (buffer) - buffer_free;
+  assert (buffer[buffer_size - 1] == ' ');
+  buffer[buffer_size - 1] = '\n';
+
+  pthread_mutex_lock (&lock);
+
+  if (sd < 0)
+  {
+    pthread_mutex_unlock (&lock);
+    return (ENOTCONN);
+  }
+
+  status = swrite (buffer, buffer_size);
+  if (status != 0)
+  {
+    pthread_mutex_unlock (&lock);
+    return (status);
+  }
+
+  status = sread (buffer, sizeof (buffer));
+  if (status < 0)
+  {
+    status = errno;
+    pthread_mutex_unlock (&lock);
+    return (status);
+  }
+  else if (status == 0)
+  {
+    pthread_mutex_unlock (&lock);
+    return (ENODATA);
+  }
+
+  pthread_mutex_unlock (&lock);
+
+  status = atoi (buffer);
+  return (status);
+} /* }}} int rrdc_update */
+
+int rrdc_flush (const char *filename) /* {{{ */
+{
+  char buffer[4096];
+  char *buffer_ptr;
+  size_t buffer_free;
+  size_t buffer_size;
+  int status;
+
+  if (filename == NULL)
+    return (-1);
+
+  memset (buffer, 0, sizeof (buffer));
+  buffer_ptr = &buffer[0];
+  buffer_free = sizeof (buffer);
+
+  status = buffer_add_string ("flush", &buffer_ptr, &buffer_free);
+  if (status != 0)
+    return (ENOBUFS);
+
+  status = buffer_add_string (filename, &buffer_ptr, &buffer_free);
+  if (status != 0)
+    return (ENOBUFS);
+
+  assert (buffer_free < sizeof (buffer));
+  buffer_size = sizeof (buffer) - buffer_free;
+  assert (buffer[buffer_size - 1] == ' ');
+  buffer[buffer_size - 1] = '\n';
+
+  pthread_mutex_lock (&lock);
+
+  if (sd < 0)
+  {
+    pthread_mutex_unlock (&lock);
+    return (ENOTCONN);
+  }
+
+  status = swrite (buffer, buffer_size);
+  if (status != 0)
+  {
+    pthread_mutex_unlock (&lock);
+    return (status);
+  }
+
+  status = sread (buffer, sizeof (buffer));
+  if (status < 0)
+  {
+    status = errno;
+    pthread_mutex_unlock (&lock);
+    return (status);
+  }
+  else if (status == 0)
+  {
+    pthread_mutex_unlock (&lock);
+    return (ENODATA);
+  }
+
+  pthread_mutex_unlock (&lock);
+
+  status = atoi (buffer);
+  return (status);
+} /* }}} int rrdc_flush */
+
+
+
+/* convenience function; if there is a daemon specified, or if we can
+ * detect one from the environment, then flush the file.  Otherwise, no-op
+ */
+int rrdc_flush_if_daemon (const char *opt_daemon, const char *filename) /* {{{ */
+{
+  int status = 0;
+
+  rrdc_connect(opt_daemon);
+
+  if (rrdc_is_connected(opt_daemon))
+  {
+    status = rrdc_flush (filename);
+    if (status != 0)
+    {
+      rrd_set_error ("rrdc_flush (%s) failed with status %i.",
+                     filename, status);
+    }
+  } /* if (daemon_addr) */
+
+  return status;
+} /* }}} int rrdc_flush_if_daemon */
+
+
+int rrdc_stats_get (rrdc_stats_t **ret_stats) /* {{{ */
+{
+  rrdc_stats_t *head;
+  rrdc_stats_t *tail;
+
+  rrdc_response_t *response;
+
+  char buffer[4096];
+  size_t buffer_size;
+  int status;
+  size_t i;
+
+  pthread_mutex_lock (&lock);
+
+  if (sd < 0)
+  {
+    pthread_mutex_unlock (&lock);
+    return (ENOTCONN);
+  }
+
+  /* Protocol example: {{{
+   * ->  STATS
+   * <-  5 Statistics follow
+   * <-  QueueLength: 0
+   * <-  UpdatesWritten: 0
+   * <-  DataSetsWritten: 0
+   * <-  TreeNodesNumber: 0
+   * <-  TreeDepth: 0
+   * }}} */
+  status = swrite ("STATS\n", strlen ("STATS\n"));
+  if (status != 0)
+  {
+    pthread_mutex_unlock (&lock);
+    return (status);
+  }
+
+  status = sread (buffer, sizeof (buffer));
+  if (status < 0)
+  {
+    status = errno;
+    pthread_mutex_unlock (&lock);
+    return (status);
+  }
+  else if (status == 0)
+  {
+    pthread_mutex_unlock (&lock);
+    return (ENODATA);
+  }
+
+  pthread_mutex_unlock (&lock);
+
+  /* Assert NULL termination */
+  buffer_size = (size_t) status;
+  if (buffer[buffer_size - 1] != 0)
+  {
+    if (buffer_size < sizeof (buffer))
+    {
+      buffer[buffer_size] = 0;
+      buffer_size++;
+    }
+    else
+    {
+      return (ENOBUFS);
+    }
+  }
+
+  status = response_parse (buffer, buffer_size, &response);
+  if (status != 0)
+    return (status);
+
+  if (response->status <= 0)
+  {
+    response_free (response);
+    return (EIO);
+  }
+
+  head = NULL;
+  tail = NULL;
+  for (i = 0; i < response->lines_num; i++)
+  {
+    char *key;
+    char *value;
+    char *endptr;
+    rrdc_stats_t *s;
+
+    key = response->lines[i];
+    value = strchr (key, ':');
+    if (value == NULL)
+      continue;
+    *value = 0;
+    value++;
+
+    while ((value[0] == ' ') || (value[0] == '\t'))
+      value++;
+
+    s = (rrdc_stats_t *) malloc (sizeof (rrdc_stats_t));
+    if (s == NULL)
+      continue;
+    memset (s, 0, sizeof (*s));
+
+    s->name = strdup (key);
+
+    endptr = NULL;
+    if ((strcmp ("QueueLength", key) == 0)
+        || (strcmp ("TreeNodesNumber", key) == 0)
+        || (strcmp ("TreeDepth", key) == 0))
+    {
+      s->type = RRDC_STATS_TYPE_GAUGE;
+      s->value.gauge = strtod (value, &endptr);
+    }
+    else if ((strcmp ("UpdatesWritten", key) == 0)
+        || (strcmp ("DataSetsWritten", key) == 0))
+    {
+      s->type = RRDC_STATS_TYPE_COUNTER;
+      s->value.counter = (uint64_t) strtoll (value, &endptr, /* base = */ 0);
+    }
+    else
+    {
+      free (s);
+      continue;
+    }
+
+    /* Conversion failed */
+    if (endptr == value)
+    {
+      free (s);
+      continue;
+    }
+
+    if (head == NULL)
+    {
+      head = s;
+      tail = s;
+      s->next = NULL;
+    }
+    else
+    {
+      tail->next = s;
+      tail = s;
+    }
+  } /* for (i = 0; i < response->lines_num; i++) */
+
+  response_free (response);
+
+  if (head == NULL)
+    return (EPROTO);
+
+  *ret_stats = head;
+  return (0);
+} /* }}} int rrdc_stats_get */
+
+void rrdc_stats_free (rrdc_stats_t *ret_stats) /* {{{ */
+{
+  rrdc_stats_t *this;
+
+  this = ret_stats;
+  while (this != NULL)
+  {
+    rrdc_stats_t *next;
+
+    next = this->next;
+
+    if (this->name != NULL)
+    {
+      free (this->name);
+      this->name = NULL;
+    }
+    free (this);
+
+    this = next;
+  } /* while (this != NULL) */
+} /* }}} void rrdc_stats_free */
+
+/*
+ * vim: set sw=2 sts=2 ts=8 et fdm=marker :
+ */
diff --git a/src/rrd_client.h b/src/rrd_client.h
new file mode 100644 (file)
index 0000000..1776c2b
--- /dev/null
@@ -0,0 +1,67 @@
+/**
+ * RRDTool - src/rrd_client.h
+ * Copyright (C) 2008 Florian octo Forster
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; only version 2 of the License is applicable.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
+ *
+ * Authors:
+ *   Florian octo Forster <octo at verplant.org>
+ **/
+
+#ifndef __RRD_CLIENT_H
+#define __RRD_CLIENT_H 1
+
+#include <stdint.h>
+
+#ifndef RRDCACHED_DEFAULT_ADDRESS
+# define RRDCACHED_DEFAULT_ADDRESS "unix:/tmp/rrdcached.sock"
+#endif
+
+#define RRDCACHED_DEFAULT_PORT "42217"
+#define ENV_RRDCACHED_ADDRESS "RRDCACHED_ADDRESS"
+
+int rrdc_connect (const char *addr);
+int rrdc_is_connected(const char *daemon_addr);
+int rrdc_disconnect (void);
+
+int rrdc_update (const char *filename, int values_num,
+        const char * const *values);
+
+int rrdc_flush (const char *filename);
+int rrdc_flush_if_daemon (const char *opt_daemon, const char *filename);
+
+
+struct rrdc_stats_s
+{
+  const char *name;
+  uint16_t type;
+#define RRDC_STATS_TYPE_GAUGE   0x0001
+#define RRDC_STATS_TYPE_COUNTER 0x0002
+  uint16_t flags;
+  union
+  {
+    uint64_t counter;
+    double   gauge;
+  } value;
+  struct rrdc_stats_s *next;
+};
+typedef struct rrdc_stats_s rrdc_stats_t;
+
+int rrdc_stats_get (rrdc_stats_t **ret_stats);
+void rrdc_stats_free (rrdc_stats_t *ret_stats);
+
+#endif /* __RRD_CLIENT_H */
+/*
+ * vim: set sw=2 sts=2 ts=8 et fdm=marker :
+ */
diff --git a/src/rrd_daemon.c b/src/rrd_daemon.c
new file mode 100644 (file)
index 0000000..79b908d
--- /dev/null
@@ -0,0 +1,2137 @@
+/**
+ * RRDTool - src/rrd_daemon.c
+ * Copyright (C) 2008 Florian octo Forster
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; only version 2 of the License is applicable.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
+ *
+ * Authors:
+ *   Florian octo Forster <octo at verplant.org>
+ *   kevin brintnall <kbrint@rufus.net>
+ **/
+
+#if 0
+/*
+ * First tell the compiler to stick to the C99 and POSIX standards as close as
+ * possible.
+ */
+#ifndef __STRICT_ANSI__ /* {{{ */
+# define __STRICT_ANSI__
+#endif
+
+#ifndef _ISOC99_SOURCE
+# define _ISOC99_SOURCE
+#endif
+
+#ifdef _POSIX_C_SOURCE
+# undef _POSIX_C_SOURCE
+#endif
+#define _POSIX_C_SOURCE 200112L
+
+/* Single UNIX needed for strdup. */
+#ifdef _XOPEN_SOURCE
+# undef _XOPEN_SOURCE
+#endif
+#define _XOPEN_SOURCE 500
+
+#ifndef _REENTRANT
+# define _REENTRANT
+#endif
+
+#ifndef _THREAD_SAFE
+# define _THREAD_SAFE
+#endif
+
+#ifdef _GNU_SOURCE
+# undef _GNU_SOURCE
+#endif
+/* }}} */
+#endif /* 0 */
+
+/*
+ * Now for some includes..
+ */
+#include "rrd.h" /* {{{ */
+#include "rrd_client.h"
+
+#include <stdlib.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <string.h>
+#include <strings.h>
+#include <stdint.h>
+#include <inttypes.h>
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <signal.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <netdb.h>
+#include <poll.h>
+#include <syslog.h>
+#include <pthread.h>
+#include <errno.h>
+#include <assert.h>
+#include <sys/time.h>
+#include <time.h>
+
+#include <glib-2.0/glib.h>
+/* }}} */
+
+#define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
+
+#ifndef __GNUC__
+# define __attribute__(x) /**/
+#endif
+
+/*
+ * Types
+ */
+struct listen_socket_s
+{
+  int fd;
+  char path[PATH_MAX + 1];
+};
+typedef struct listen_socket_s listen_socket_t;
+
+struct cache_item_s;
+typedef struct cache_item_s cache_item_t;
+struct cache_item_s
+{
+  char *file;
+  char **values;
+  int values_num;
+  time_t last_flush_time;
+#define CI_FLAGS_IN_TREE  (1<<0)
+#define CI_FLAGS_IN_QUEUE (1<<1)
+  int flags;
+
+  cache_item_t *next;
+};
+
+struct callback_flush_data_s
+{
+  time_t now;
+  time_t abs_timeout;
+  char **keys;
+  size_t keys_num;
+};
+typedef struct callback_flush_data_s callback_flush_data_t;
+
+enum queue_side_e
+{
+  HEAD,
+  TAIL
+};
+typedef enum queue_side_e queue_side_t;
+
+/* max length of socket command or response */
+#define CMD_MAX 4096
+
+/*
+ * Variables
+ */
+static int stay_foreground = 0;
+
+static listen_socket_t *listen_fds = NULL;
+static size_t listen_fds_num = 0;
+
+static int do_shutdown = 0;
+
+static pthread_t queue_thread;
+
+static pthread_t *connection_threads = NULL;
+static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
+static int connection_threads_num = 0;
+
+/* Cache stuff */
+static GTree          *cache_tree = NULL;
+static cache_item_t   *cache_queue_head = NULL;
+static cache_item_t   *cache_queue_tail = NULL;
+static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
+
+static pthread_cond_t  flush_cond = PTHREAD_COND_INITIALIZER;
+
+static int config_write_interval = 300;
+static int config_write_jitter   = 0;
+static int config_flush_interval = 3600;
+static char *config_pid_file = NULL;
+static char *config_base_dir = NULL;
+
+static char **config_listen_address_list = NULL;
+static int config_listen_address_list_len = 0;
+
+static uint64_t stats_queue_length = 0;
+static uint64_t stats_updates_received = 0;
+static uint64_t stats_flush_received = 0;
+static uint64_t stats_updates_written = 0;
+static uint64_t stats_data_sets_written = 0;
+static uint64_t stats_journal_bytes = 0;
+static uint64_t stats_journal_rotate = 0;
+static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
+
+/* Journaled updates */
+static char *journal_cur = NULL;
+static char *journal_old = NULL;
+static FILE *journal_fh = NULL;
+static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
+static int journal_write(char *cmd, char *args);
+static void journal_done(void);
+static void journal_rotate(void);
+
+/* 
+ * Functions
+ */
+static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
+{
+  RRDD_LOG(LOG_NOTICE, "caught SIGINT");
+  do_shutdown++;
+  pthread_cond_broadcast(&cache_cond);
+} /* }}} void sig_int_handler */
+
+static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
+{
+  RRDD_LOG(LOG_NOTICE, "caught SIGTERM");
+  do_shutdown++;
+  pthread_cond_broadcast(&cache_cond);
+} /* }}} void sig_term_handler */
+
+static int write_pidfile (void) /* {{{ */
+{
+  pid_t pid;
+  char *file;
+  int fd;
+  FILE *fh;
+
+  pid = getpid ();
+  
+  file = (config_pid_file != NULL)
+    ? config_pid_file
+    : LOCALSTATEDIR "/run/rrdcached.pid";
+
+  fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
+  if (fd < 0)
+  {
+    RRDD_LOG(LOG_ERR, "FATAL: cannot create '%s' (%s)",
+             file, rrd_strerror(errno));
+    return (-1);
+  }
+
+  fh = fdopen (fd, "w");
+  if (fh == NULL)
+  {
+    RRDD_LOG (LOG_ERR, "write_pidfile: Opening `%s' failed.", file);
+    close(fd);
+    return (-1);
+  }
+
+  fprintf (fh, "%i\n", (int) pid);
+  fclose (fh);
+
+  return (0);
+} /* }}} int write_pidfile */
+
+static int remove_pidfile (void) /* {{{ */
+{
+  char *file;
+  int status;
+
+  file = (config_pid_file != NULL)
+    ? config_pid_file
+    : LOCALSTATEDIR "/run/rrdcached.pid";
+
+  status = unlink (file);
+  if (status == 0)
+    return (0);
+  return (errno);
+} /* }}} int remove_pidfile */
+
+static ssize_t sread (int fd, void *buffer_void, size_t buffer_size) /* {{{ */
+{
+  char    *buffer;
+  size_t   buffer_used;
+  size_t   buffer_free;
+  ssize_t  status;
+
+  buffer       = (char *) buffer_void;
+  buffer_used  = 0;
+  buffer_free  = buffer_size;
+
+  while (buffer_free > 0)
+  {
+    status = read (fd, buffer + buffer_used, buffer_free);
+    if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
+      continue;
+
+    if (status < 0)
+      return (-1);
+
+    if (status == 0)
+      return (0);
+
+    assert ((0 > status) || (buffer_free >= (size_t) status));
+
+    buffer_free = buffer_free - status;
+    buffer_used = buffer_used + status;
+
+    if (buffer[buffer_used - 1] == '\n')
+      break;
+  }
+
+  assert (buffer_used > 0);
+
+  if (buffer[buffer_used - 1] != '\n')
+  {
+    errno = ENOBUFS;
+    return (-1);
+  }
+
+  buffer[buffer_used - 1] = 0;
+
+  /* Fix network line endings. */
+  if ((buffer_used > 1) && (buffer[buffer_used - 2] == '\r'))
+  {
+    buffer_used--;
+    buffer[buffer_used - 1] = 0;
+  }
+
+  return (buffer_used);
+} /* }}} ssize_t sread */
+
+static ssize_t swrite (int fd, const void *buf, size_t count) /* {{{ */
+{
+  const char *ptr;
+  size_t      nleft;
+  ssize_t     status;
+
+  /* special case for journal replay */
+  if (fd < 0) return 0;
+
+  ptr   = (const char *) buf;
+  nleft = count;
+
+  while (nleft > 0)
+  {
+    status = write (fd, (const void *) ptr, nleft);
+
+    if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
+      continue;
+
+    if (status < 0)
+      return (status);
+
+    nleft -= status;
+    ptr   += status;
+  }
+
+  return (0);
+} /* }}} ssize_t swrite */
+
+static void _wipe_ci_values(cache_item_t *ci, time_t when)
+{
+  ci->values = NULL;
+  ci->values_num = 0;
+
+  ci->last_flush_time = when;
+  if (config_write_jitter > 0)
+    ci->last_flush_time += (random() % config_write_jitter);
+
+  ci->flags &= ~(CI_FLAGS_IN_QUEUE);
+}
+
+/*
+ * enqueue_cache_item:
+ * `cache_lock' must be acquired before calling this function!
+ */
+static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
+    queue_side_t side)
+{
+  int did_insert = 0;
+
+  if (ci == NULL)
+    return (-1);
+
+  if (ci->values_num == 0)
+    return (0);
+
+  if (side == HEAD)
+  {
+    if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
+    {
+      assert (ci->next == NULL);
+      ci->next = cache_queue_head;
+      cache_queue_head = ci;
+
+      if (cache_queue_tail == NULL)
+        cache_queue_tail = cache_queue_head;
+
+      did_insert = 1;
+    }
+    else if (cache_queue_head == ci)
+    {
+      /* do nothing */
+    }
+    else /* enqueued, but not first entry */
+    {
+      cache_item_t *prev;
+
+      /* find previous entry */
+      for (prev = cache_queue_head; prev != NULL; prev = prev->next)
+        if (prev->next == ci)
+          break;
+      assert (prev != NULL);
+
+      /* move to the front */
+      prev->next = ci->next;
+      ci->next = cache_queue_head;
+      cache_queue_head = ci;
+
+      /* check if we need to adapt the tail */
+      if (cache_queue_tail == ci)
+        cache_queue_tail = prev;
+    }
+  }
+  else /* (side == TAIL) */
+  {
+    /* We don't move values back in the list.. */
+    if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
+      return (0);
+
+    assert (ci->next == NULL);
+
+    if (cache_queue_tail == NULL)
+      cache_queue_head = ci;
+    else
+      cache_queue_tail->next = ci;
+    cache_queue_tail = ci;
+
+    did_insert = 1;
+  }
+
+  ci->flags |= CI_FLAGS_IN_QUEUE;
+
+  if (did_insert)
+  {
+    pthread_mutex_lock (&stats_lock);
+    stats_queue_length++;
+    pthread_mutex_unlock (&stats_lock);
+  }
+
+  return (0);
+} /* }}} int enqueue_cache_item */
+
+/*
+ * tree_callback_flush:
+ * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
+ * while this is in progress.
+ */
+static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
+    gpointer data)
+{
+  cache_item_t *ci;
+  callback_flush_data_t *cfd;
+
+  ci = (cache_item_t *) value;
+  cfd = (callback_flush_data_t *) data;
+
+  if ((ci->last_flush_time <= cfd->abs_timeout)
+      && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
+      && (ci->values_num > 0))
+  {
+    enqueue_cache_item (ci, TAIL);
+  }
+  else if ((do_shutdown != 0)
+      && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
+      && (ci->values_num > 0))
+  {
+    enqueue_cache_item (ci, TAIL);
+  }
+  else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
+      && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
+      && (ci->values_num <= 0))
+  {
+    char **temp;
+
+    temp = (char **) realloc (cfd->keys,
+        sizeof (char *) * (cfd->keys_num + 1));
+    if (temp == NULL)
+    {
+      RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
+      return (FALSE);
+    }
+    cfd->keys = temp;
+    /* Make really sure this points to the _same_ place */
+    assert ((char *) key == ci->file);
+    cfd->keys[cfd->keys_num] = (char *) key;
+    cfd->keys_num++;
+  }
+
+  return (FALSE);
+} /* }}} gboolean tree_callback_flush */
+
+static int flush_old_values (int max_age)
+{
+  callback_flush_data_t cfd;
+  size_t k;
+
+  memset (&cfd, 0, sizeof (cfd));
+  /* Pass the current time as user data so that we don't need to call
+   * `time' for each node. */
+  cfd.now = time (NULL);
+  cfd.keys = NULL;
+  cfd.keys_num = 0;
+
+  if (max_age > 0)
+    cfd.abs_timeout = cfd.now - max_age;
+  else
+    cfd.abs_timeout = cfd.now + 1;
+
+  /* `tree_callback_flush' will return the keys of all values that haven't
+   * been touched in the last `config_flush_interval' seconds in `cfd'.
+   * The char*'s in this array point to the same memory as ci->file, so we
+   * don't need to free them separately. */
+  g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
+
+  for (k = 0; k < cfd.keys_num; k++)
+  {
+    cache_item_t *ci;
+
+    /* This must not fail. */
+    ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
+    assert (ci != NULL);
+
+    /* If we end up here with values available, something's seriously
+     * messed up. */
+    assert (ci->values_num == 0);
+
+    /* Remove the node from the tree */
+    g_tree_remove (cache_tree, cfd.keys[k]);
+    cfd.keys[k] = NULL;
+
+    /* Now free and clean up `ci'. */
+    free (ci->file);
+    ci->file = NULL;
+    free (ci);
+    ci = NULL;
+  } /* for (k = 0; k < cfd.keys_num; k++) */
+
+  if (cfd.keys != NULL)
+  {
+    free (cfd.keys);
+    cfd.keys = NULL;
+  }
+
+  return (0);
+} /* int flush_old_values */
+
+static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
+{
+  struct timeval now;
+  struct timespec next_flush;
+
+  gettimeofday (&now, NULL);
+  next_flush.tv_sec = now.tv_sec + config_flush_interval;
+  next_flush.tv_nsec = 1000 * now.tv_usec;
+
+  pthread_mutex_lock (&cache_lock);
+  while ((do_shutdown == 0) || (cache_queue_head != NULL))
+  {
+    cache_item_t *ci;
+    char *file;
+    char **values;
+    int values_num;
+    int status;
+    int i;
+
+    /* First, check if it's time to do the cache flush. */
+    gettimeofday (&now, NULL);
+    if ((now.tv_sec > next_flush.tv_sec)
+        || ((now.tv_sec == next_flush.tv_sec)
+          && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
+    {
+      /* Flush all values that haven't been written in the last
+       * `config_write_interval' seconds. */
+      flush_old_values (config_write_interval);
+
+      /* Determine the time of the next cache flush. */
+      while (next_flush.tv_sec <= now.tv_sec)
+        next_flush.tv_sec += config_flush_interval;
+
+      /* unlock the cache while we rotate so we don't block incoming
+       * updates if the fsync() blocks on disk I/O */
+      pthread_mutex_unlock(&cache_lock);
+      journal_rotate();
+      pthread_mutex_lock(&cache_lock);
+    }
+
+    /* Now, check if there's something to store away. If not, wait until
+     * something comes in or it's time to do the cache flush. */
+    if (cache_queue_head == NULL)
+    {
+      status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
+      if ((status != 0) && (status != ETIMEDOUT))
+      {
+        RRDD_LOG (LOG_ERR, "queue_thread_main: "
+            "pthread_cond_timedwait returned %i.", status);
+      }
+    }
+
+    /* We're about to shut down, so lets flush the entire tree. */
+    if ((do_shutdown != 0) && (cache_queue_head == NULL))
+      flush_old_values (/* max age = */ -1);
+
+    /* Check if a value has arrived. This may be NULL if we timed out or there
+     * was an interrupt such as a signal. */
+    if (cache_queue_head == NULL)
+      continue;
+
+    ci = cache_queue_head;
+
+    /* copy the relevant parts */
+    file = strdup (ci->file);
+    if (file == NULL)
+    {
+      RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
+      continue;
+    }
+
+    assert(ci->values != NULL);
+    assert(ci->values_num > 0);
+
+    values = ci->values;
+    values_num = ci->values_num;
+
+    _wipe_ci_values(ci, time(NULL));
+
+    cache_queue_head = ci->next;
+    if (cache_queue_head == NULL)
+      cache_queue_tail = NULL;
+    ci->next = NULL;
+
+    pthread_mutex_lock (&stats_lock);
+    assert (stats_queue_length > 0);
+    stats_queue_length--;
+    pthread_mutex_unlock (&stats_lock);
+
+    pthread_mutex_unlock (&cache_lock);
+
+    rrd_clear_error ();
+    status = rrd_update_r (file, NULL, values_num, (void *) values);
+    if (status != 0)
+    {
+      RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
+          "rrd_update_r (%s) failed with status %i. (%s)",
+          file, status, rrd_get_error());
+    }
+
+    journal_write("wrote", file);
+
+    for (i = 0; i < values_num; i++)
+      free (values[i]);
+
+    free(values);
+    free(file);
+
+    if (status == 0)
+    {
+      pthread_mutex_lock (&stats_lock);
+      stats_updates_written++;
+      stats_data_sets_written += values_num;
+      pthread_mutex_unlock (&stats_lock);
+    }
+
+    pthread_mutex_lock (&cache_lock);
+    pthread_cond_broadcast (&flush_cond);
+
+    /* We're about to shut down, so lets flush the entire tree. */
+    if ((do_shutdown != 0) && (cache_queue_head == NULL))
+      flush_old_values (/* max age = */ -1);
+  } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
+  pthread_mutex_unlock (&cache_lock);
+
+  assert(cache_queue_head == NULL);
+  RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
+  journal_done();
+
+  return (NULL);
+} /* }}} void *queue_thread_main */
+
+static int buffer_get_field (char **buffer_ret, /* {{{ */
+    size_t *buffer_size_ret, char **field_ret)
+{
+  char *buffer;
+  size_t buffer_pos;
+  size_t buffer_size;
+  char *field;
+  size_t field_size;
+  int status;
+
+  buffer = *buffer_ret;
+  buffer_pos = 0;
+  buffer_size = *buffer_size_ret;
+  field = *buffer_ret;
+  field_size = 0;
+
+  if (buffer_size <= 0)
+    return (-1);
+
+  /* This is ensured by `handle_request'. */
+  assert (buffer[buffer_size - 1] == '\0');
+
+  status = -1;
+  while (buffer_pos < buffer_size)
+  {
+    /* Check for end-of-field or end-of-buffer */
+    if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
+    {
+      field[field_size] = 0;
+      field_size++;
+      buffer_pos++;
+      status = 0;
+      break;
+    }
+    /* Handle escaped characters. */
+    else if (buffer[buffer_pos] == '\\')
+    {
+      if (buffer_pos >= (buffer_size - 1))
+        break;
+      buffer_pos++;
+      field[field_size] = buffer[buffer_pos];
+      field_size++;
+      buffer_pos++;
+    }
+    /* Normal operation */ 
+    else
+    {
+      field[field_size] = buffer[buffer_pos];
+      field_size++;
+      buffer_pos++;
+    }
+  } /* while (buffer_pos < buffer_size) */
+
+  if (status != 0)
+    return (status);
+
+  *buffer_ret = buffer + buffer_pos;
+  *buffer_size_ret = buffer_size - buffer_pos;
+  *field_ret = field;
+
+  return (0);
+} /* }}} int buffer_get_field */
+
+static int flush_file (const char *filename) /* {{{ */
+{
+  cache_item_t *ci;
+
+  pthread_mutex_lock (&cache_lock);
+
+  ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
+  if (ci == NULL)
+  {
+    pthread_mutex_unlock (&cache_lock);
+    return (ENOENT);
+  }
+
+  /* Enqueue at head */
+  enqueue_cache_item (ci, HEAD);
+  pthread_cond_signal (&cache_cond);
+
+  while ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
+  {
+    ci = NULL;
+
+    pthread_cond_wait (&flush_cond, &cache_lock);
+
+    ci = g_tree_lookup (cache_tree, filename);
+    if (ci == NULL)
+    {
+      RRDD_LOG (LOG_ERR, "flush_file: Tree node went away "
+          "while waiting for flush.");
+      pthread_mutex_unlock (&cache_lock);
+      return (-1);
+    }
+  }
+
+  pthread_mutex_unlock (&cache_lock);
+  return (0);
+} /* }}} int flush_file */
+
+static int handle_request_help (int fd, /* {{{ */
+    char *buffer, size_t buffer_size)
+{
+  int status;
+  char **help_text;
+  size_t help_text_len;
+  char *command;
+  size_t i;
+
+  char *help_help[] =
+  {
+    "4 Command overview\n",
+    "FLUSH <filename>\n",
+    "HELP [<command>]\n",
+    "UPDATE <filename> <values> [<values> ...]\n",
+    "STATS\n"
+  };
+  size_t help_help_len = sizeof (help_help) / sizeof (help_help[0]);
+
+  char *help_flush[] =
+  {
+    "4 Help for FLUSH\n",
+    "Usage: FLUSH <filename>\n",
+    "\n",
+    "Adds the given filename to the head of the update queue and returns\n",
+    "after is has been dequeued.\n"
+  };
+  size_t help_flush_len = sizeof (help_flush) / sizeof (help_flush[0]);
+
+  char *help_update[] =
+  {
+    "9 Help for UPDATE\n",
+    "Usage: UPDATE <filename> <values> [<values> ...]\n"
+    "\n",
+    "Adds the given file to the internal cache if it is not yet known and\n",
+    "appends the given value(s) to the entry. See the rrdcached(1) manpage\n",
+    "for details.\n",
+    "\n",
+    "Each <values> has the following form:\n",
+    "  <values> = <time>:<value>[:<value>[...]]\n",
+    "See the rrdupdate(1) manpage for details.\n"
+  };
+  size_t help_update_len = sizeof (help_update) / sizeof (help_update[0]);
+
+  char *help_stats[] =
+  {
+    "4 Help for STATS\n",
+    "Usage: STATS\n",
+    "\n",
+    "Returns some performance counters, see the rrdcached(1) manpage for\n",
+    "a description of the values.\n"
+  };
+  size_t help_stats_len = sizeof (help_stats) / sizeof (help_stats[0]);
+
+  status = buffer_get_field (&buffer, &buffer_size, &command);
+  if (status != 0)
+  {
+    help_text = help_help;
+    help_text_len = help_help_len;
+  }
+  else
+  {
+    if (strcasecmp (command, "update") == 0)
+    {
+      help_text = help_update;
+      help_text_len = help_update_len;
+    }
+    else if (strcasecmp (command, "flush") == 0)
+    {
+      help_text = help_flush;
+      help_text_len = help_flush_len;
+    }
+    else if (strcasecmp (command, "stats") == 0)
+    {
+      help_text = help_stats;
+      help_text_len = help_stats_len;
+    }
+    else
+    {
+      help_text = help_help;
+      help_text_len = help_help_len;
+    }
+  }
+
+  for (i = 0; i < help_text_len; i++)
+  {
+    status = swrite (fd, help_text[i], strlen (help_text[i]));
+    if (status < 0)
+    {
+      status = errno;
+      RRDD_LOG (LOG_ERR, "handle_request_help: swrite returned an error.");
+      return (status);
+    }
+  }
+
+  return (0);
+} /* }}} int handle_request_help */
+
+static int handle_request_stats (int fd, /* {{{ */
+    char *buffer __attribute__((unused)),
+    size_t buffer_size __attribute__((unused)))
+{
+  int status;
+  char outbuf[CMD_MAX];
+
+  uint64_t copy_queue_length;
+  uint64_t copy_updates_received;
+  uint64_t copy_flush_received;
+  uint64_t copy_updates_written;
+  uint64_t copy_data_sets_written;
+  uint64_t copy_journal_bytes;
+  uint64_t copy_journal_rotate;
+
+  uint64_t tree_nodes_number;
+  uint64_t tree_depth;
+
+  pthread_mutex_lock (&stats_lock);
+  copy_queue_length       = stats_queue_length;
+  copy_updates_received   = stats_updates_received;
+  copy_flush_received     = stats_flush_received;
+  copy_updates_written    = stats_updates_written;
+  copy_data_sets_written  = stats_data_sets_written;
+  copy_journal_bytes      = stats_journal_bytes;
+  copy_journal_rotate     = stats_journal_rotate;
+  pthread_mutex_unlock (&stats_lock);
+
+  pthread_mutex_lock (&cache_lock);
+  tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
+  tree_depth        = (uint64_t) g_tree_height (cache_tree);
+  pthread_mutex_unlock (&cache_lock);
+
+#define RRDD_STATS_SEND \
+  outbuf[sizeof (outbuf) - 1] = 0; \
+  status = swrite (fd, outbuf, strlen (outbuf)); \
+  if (status < 0) \
+  { \
+    status = errno; \
+    RRDD_LOG (LOG_INFO, "handle_request_stats: swrite returned an error."); \
+    return (status); \
+  }
+
+  strncpy (outbuf, "9 Statistics follow\n", sizeof (outbuf));
+  RRDD_STATS_SEND;
+
+  snprintf (outbuf, sizeof (outbuf),
+      "QueueLength: %"PRIu64"\n", copy_queue_length);
+  RRDD_STATS_SEND;
+
+  snprintf (outbuf, sizeof (outbuf),
+      "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
+  RRDD_STATS_SEND;
+
+  snprintf (outbuf, sizeof (outbuf),
+      "FlushesReceived: %"PRIu64"\n", copy_flush_received);
+  RRDD_STATS_SEND;
+
+  snprintf (outbuf, sizeof (outbuf),
+      "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
+  RRDD_STATS_SEND;
+
+  snprintf (outbuf, sizeof (outbuf),
+      "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
+  RRDD_STATS_SEND;
+
+  snprintf (outbuf, sizeof (outbuf),
+      "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
+  RRDD_STATS_SEND;
+
+  snprintf (outbuf, sizeof (outbuf),
+      "TreeDepth: %"PRIu64"\n", tree_depth);
+  RRDD_STATS_SEND;
+
+  snprintf (outbuf, sizeof(outbuf),
+      "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
+  RRDD_STATS_SEND;
+
+  snprintf (outbuf, sizeof(outbuf),
+      "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
+  RRDD_STATS_SEND;
+
+  return (0);
+#undef RRDD_STATS_SEND
+} /* }}} int handle_request_stats */
+
+static int handle_request_flush (int fd, /* {{{ */
+    char *buffer, size_t buffer_size)
+{
+  char *file;
+  int status;
+  char result[CMD_MAX];
+
+  status = buffer_get_field (&buffer, &buffer_size, &file);
+  if (status != 0)
+  {
+    strncpy (result, "-1 Usage: flush <filename>\n", sizeof (result));
+  }
+  else
+  {
+    pthread_mutex_lock(&stats_lock);
+    stats_flush_received++;
+    pthread_mutex_unlock(&stats_lock);
+
+    status = flush_file (file);
+    if (status == 0)
+      snprintf (result, sizeof (result), "0 Successfully flushed %s.\n", file);
+    else if (status == ENOENT)
+    {
+      /* no file in our tree; see whether it exists at all */
+      struct stat statbuf;
+
+      memset(&statbuf, 0, sizeof(statbuf));
+      if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
+        snprintf (result, sizeof (result), "0 Nothing to flush: %s.\n", file);
+      else
+        snprintf (result, sizeof (result), "-1 No such file: %s.\n", file);
+    }
+    else if (status < 0)
+      strncpy (result, "-1 Internal error.\n", sizeof (result));
+    else
+      snprintf (result, sizeof (result), "-1 Failed with status %i.\n", status);
+  }
+  result[sizeof (result) - 1] = 0;
+
+  status = swrite (fd, result, strlen (result));
+  if (status < 0)
+  {
+    status = errno;
+    RRDD_LOG (LOG_INFO, "handle_request_flush: swrite returned an error.");
+    return (status);
+  }
+
+  return (0);
+} /* }}} int handle_request_flush */
+
+static int handle_request_update (int fd, /* {{{ */
+    char *buffer, size_t buffer_size)
+{
+  char *file;
+  int values_num = 0;
+  int status;
+
+  time_t now;
+
+  cache_item_t *ci;
+  char answer[CMD_MAX];
+
+#define RRDD_UPDATE_SEND \
+  answer[sizeof (answer) - 1] = 0; \
+  status = swrite (fd, answer, strlen (answer)); \
+  if (status < 0) \
+  { \
+    status = errno; \
+    RRDD_LOG (LOG_INFO, "handle_request_update: swrite returned an error."); \
+    return (status); \
+  }
+
+  now = time (NULL);
+
+  status = buffer_get_field (&buffer, &buffer_size, &file);
+  if (status != 0)
+  {
+    strncpy (answer, "-1 Usage: UPDATE <filename> <values> [<values> ...]\n",
+        sizeof (answer));
+    RRDD_UPDATE_SEND;
+    return (0);
+  }
+
+  pthread_mutex_lock(&stats_lock);
+  stats_updates_received++;
+  pthread_mutex_unlock(&stats_lock);
+
+  pthread_mutex_lock (&cache_lock);
+
+  ci = g_tree_lookup (cache_tree, file);
+  if (ci == NULL) /* {{{ */
+  {
+    struct stat statbuf;
+
+    memset (&statbuf, 0, sizeof (statbuf));
+    status = stat (file, &statbuf);
+    if (status != 0)
+    {
+      pthread_mutex_unlock (&cache_lock);
+      RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
+
+      status = errno;
+      if (status == ENOENT)
+        snprintf (answer, sizeof (answer), "-1 No such file: %s\n", file);
+      else
+        snprintf (answer, sizeof (answer), "-1 stat failed with error %i.\n",
+            status);
+      RRDD_UPDATE_SEND;
+      return (0);
+    }
+    if (!S_ISREG (statbuf.st_mode))
+    {
+      pthread_mutex_unlock (&cache_lock);
+
+      snprintf (answer, sizeof (answer), "-1 Not a regular file: %s\n", file);
+      RRDD_UPDATE_SEND;
+      return (0);
+    }
+    if (access(file, R_OK|W_OK) != 0)
+    {
+      pthread_mutex_unlock (&cache_lock);
+
+      snprintf (answer, sizeof (answer), "-1 Cannot read/write %s: %s\n",
+                file, rrd_strerror(errno));
+      RRDD_UPDATE_SEND;
+      return (0);
+    }
+
+    ci = (cache_item_t *) malloc (sizeof (cache_item_t));
+    if (ci == NULL)
+    {
+      pthread_mutex_unlock (&cache_lock);
+      RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
+
+      strncpy (answer, "-1 malloc failed.\n", sizeof (answer));
+      RRDD_UPDATE_SEND;
+      return (0);
+    }
+    memset (ci, 0, sizeof (cache_item_t));
+
+    ci->file = strdup (file);
+    if (ci->file == NULL)
+    {
+      pthread_mutex_unlock (&cache_lock);
+      free (ci);
+      RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
+
+      strncpy (answer, "-1 strdup failed.\n", sizeof (answer));
+      RRDD_UPDATE_SEND;
+      return (0);
+    }
+
+    _wipe_ci_values(ci, now);
+    ci->flags = CI_FLAGS_IN_TREE;
+
+    g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
+  } /* }}} */
+  assert (ci != NULL);
+
+  while (buffer_size > 0)
+  {
+    char **temp;
+    char *value;
+
+    status = buffer_get_field (&buffer, &buffer_size, &value);
+    if (status != 0)
+    {
+      RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
+      break;
+    }
+
+    temp = (char **) realloc (ci->values,
+        sizeof (char *) * (ci->values_num + 1));
+    if (temp == NULL)
+    {
+      RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
+      continue;
+    }
+    ci->values = temp;
+
+    ci->values[ci->values_num] = strdup (value);
+    if (ci->values[ci->values_num] == NULL)
+    {
+      RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
+      continue;
+    }
+    ci->values_num++;
+
+    values_num++;
+  }
+
+  if (((now - ci->last_flush_time) >= config_write_interval)
+      && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
+      && (ci->values_num > 0))
+  {
+    enqueue_cache_item (ci, TAIL);
+    pthread_cond_signal (&cache_cond);
+  }
+
+  pthread_mutex_unlock (&cache_lock);
+
+  if (values_num < 1)
+  {
+    strncpy (answer, "-1 No values updated.\n", sizeof (answer));
+  }
+  else
+  {
+    snprintf (answer, sizeof (answer), "0 Enqueued %i value%s\n", values_num,
+        (values_num == 1) ? "" : "s");
+  }
+  RRDD_UPDATE_SEND;
+  return (0);
+#undef RRDD_UPDATE_SEND
+} /* }}} int handle_request_update */
+
+/* we came across a "WROTE" entry during journal replay.
+ * throw away any values that we have accumulated for this file
+ */
+static int handle_request_wrote (int fd __attribute__((unused)), /* {{{ */
+                                 const char *buffer,
+                                 size_t buffer_size __attribute__((unused)))
+{
+  int i;
+  cache_item_t *ci;
+  const char *file = buffer;
+
+  pthread_mutex_lock(&cache_lock);
+
+  ci = g_tree_lookup(cache_tree, file);
+  if (ci == NULL)
+  {
+    pthread_mutex_unlock(&cache_lock);
+    return (0);
+  }
+
+  if (ci->values)
+  {
+    for (i=0; i < ci->values_num; i++)
+      free(ci->values[i]);
+
+    free(ci->values);
+  }
+
+  _wipe_ci_values(ci, time(NULL));
+
+  pthread_mutex_unlock(&cache_lock);
+  return (0);
+} /* }}} int handle_request_wrote */
+
+/* if fd < 0, we are in journal replay mode */
+static int handle_request (int fd, char *buffer, size_t buffer_size) /* {{{ */
+{
+  char *buffer_ptr;
+  char *command;
+  int status;
+
+  assert (buffer[buffer_size - 1] == '\0');
+
+  buffer_ptr = buffer;
+  command = NULL;
+  status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
+  if (status != 0)
+  {
+    RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
+    return (-1);
+  }
+
+  if (strcasecmp (command, "update") == 0)
+  {
+    /* don't re-write updates in replay mode */
+    if (fd >= 0)
+      journal_write(command, buffer_ptr);
+
+    return (handle_request_update (fd, buffer_ptr, buffer_size));
+  }
+  else if (strcasecmp (command, "wrote") == 0 && fd < 0)
+  {
+    /* this is only valid in replay mode */
+    return (handle_request_wrote (fd, buffer_ptr, buffer_size));
+  }
+  else if (strcasecmp (command, "flush") == 0)
+  {
+    return (handle_request_flush (fd, buffer_ptr, buffer_size));
+  }
+  else if (strcasecmp (command, "stats") == 0)
+  {
+    return (handle_request_stats (fd, buffer_ptr, buffer_size));
+  }
+  else if (strcasecmp (command, "help") == 0)
+  {
+    return (handle_request_help (fd, buffer_ptr, buffer_size));
+  }
+  else
+  {
+    char result[CMD_MAX];
+
+    snprintf (result, sizeof (result), "-1 Unknown command: %s\n", command);
+    result[sizeof (result) - 1] = 0;
+
+    status = swrite (fd, result, strlen (result));
+    if (status < 0)
+    {
+      RRDD_LOG (LOG_ERR, "handle_request: swrite failed.");
+      return (-1);
+    }
+  }
+
+  return (0);
+} /* }}} int handle_request */
+
+/* MUST NOT hold journal_lock before calling this */
+static void journal_rotate(void) /* {{{ */
+{
+  FILE *old_fh = NULL;
+
+  if (journal_cur == NULL || journal_old == NULL)
+    return;
+
+  pthread_mutex_lock(&journal_lock);
+
+  /* we rotate this way (rename before close) so that the we can release
+   * the journal lock as fast as possible.  Journal writes to the new
+   * journal can proceed immediately after the new file is opened.  The
+   * fclose can then block without affecting new updates.
+   */
+  if (journal_fh != NULL)
+  {
+    old_fh = journal_fh;
+    rename(journal_cur, journal_old);
+    ++stats_journal_rotate;
+  }
+
+  journal_fh = fopen(journal_cur, "a");
+  pthread_mutex_unlock(&journal_lock);
+
+  if (old_fh != NULL)
+    fclose(old_fh);
+
+  if (journal_fh == NULL)
+    RRDD_LOG(LOG_CRIT,
+             "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
+             journal_cur, rrd_strerror(errno));
+
+} /* }}} static void journal_rotate */
+
+static void journal_done(void) /* {{{ */
+{
+  if (journal_cur == NULL)
+    return;
+
+  pthread_mutex_lock(&journal_lock);
+  if (journal_fh != NULL)
+  {
+    fclose(journal_fh);
+    journal_fh = NULL;
+  }
+
+  RRDD_LOG(LOG_INFO, "removing journals");
+
+  unlink(journal_old);
+  unlink(journal_cur);
+  pthread_mutex_unlock(&journal_lock);
+
+} /* }}} static void journal_done */
+
+static int journal_write(char *cmd, char *args) /* {{{ */
+{
+  int chars;
+
+  if (journal_fh == NULL)
+    return 0;
+
+  pthread_mutex_lock(&journal_lock);
+  chars = fprintf(journal_fh, "%s %s\n", cmd, args);
+  pthread_mutex_unlock(&journal_lock);
+
+  if (chars > 0)
+  {
+    pthread_mutex_lock(&stats_lock);
+    stats_journal_bytes += chars;
+    pthread_mutex_unlock(&stats_lock);
+  }
+
+  return chars;
+} /* }}} static int journal_write */
+
+static int journal_replay (const char *file) /* {{{ */
+{
+  FILE *fh;
+  int entry_cnt = 0;
+  int fail_cnt = 0;
+  uint64_t line = 0;
+  char entry[CMD_MAX];
+
+  if (file == NULL) return 0;
+
+  fh = fopen(file, "r");
+  if (fh == NULL)
+  {
+    if (errno != ENOENT)
+      RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
+               file, rrd_strerror(errno));
+    return 0;
+  }
+  else
+    RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
+
+  while(!feof(fh))
+  {
+    size_t entry_len;
+
+    ++line;
+    fgets(entry, sizeof(entry), fh);
+    entry_len = strlen(entry);
+
+    /* check \n termination in case journal writing crashed mid-line */
+    if (entry_len == 0)
+      continue;
+    else if (entry[entry_len - 1] != '\n')
+    {
+      RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
+      ++fail_cnt;
+      continue;
+    }
+
+    entry[entry_len - 1] = '\0';
+
+    if (handle_request(-1, entry, entry_len) == 0)
+      ++entry_cnt;
+    else
+      ++fail_cnt;
+  }
+
+  fclose(fh);
+
+  if (entry_cnt > 0)
+  {
+    RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
+             entry_cnt, fail_cnt);
+    return 1;
+  }
+  else
+    return 0;
+
+} /* }}} static int journal_replay */
+
+static void *connection_thread_main (void *args) /* {{{ */
+{
+  pthread_t self;
+  int i;
+  int fd;
+  
+  fd = *((int *) args);
+  free (args);
+
+  pthread_mutex_lock (&connection_threads_lock);
+  {
+    pthread_t *temp;
+
+    temp = (pthread_t *) realloc (connection_threads,
+        sizeof (pthread_t) * (connection_threads_num + 1));
+    if (temp == NULL)
+    {
+      RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
+    }
+    else
+    {
+      connection_threads = temp;
+      connection_threads[connection_threads_num] = pthread_self ();
+      connection_threads_num++;
+    }
+  }
+  pthread_mutex_unlock (&connection_threads_lock);
+
+  while (do_shutdown == 0)
+  {
+    char buffer[CMD_MAX];
+
+    struct pollfd pollfd;
+    int status;
+
+    pollfd.fd = fd;
+    pollfd.events = POLLIN | POLLPRI;
+    pollfd.revents = 0;
+
+    status = poll (&pollfd, 1, /* timeout = */ 500);
+    if (status == 0) /* timeout */
+      continue;
+    else if (status < 0) /* error */
+    {
+      status = errno;
+      if (status == EINTR)
+        continue;
+      RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
+      continue;
+    }
+
+    if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
+    {
+      close (fd);
+      break;
+    }
+    else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
+    {
+      RRDD_LOG (LOG_WARNING, "connection_thread_main: "
+          "poll(2) returned something unexpected: %#04hx",
+          pollfd.revents);
+      close (fd);
+      break;
+    }
+
+    status = (int) sread (fd, buffer, sizeof (buffer));
+    if (status <= 0)
+    {
+      close (fd);
+
+      if (status < 0)
+        RRDD_LOG(LOG_ERR, "connection_thread_main: sread failed.");
+
+      break;
+    }
+
+    status = handle_request (fd, buffer, /*buffer_size=*/ status);
+    if (status != 0)
+    {
+      close (fd);
+      break;
+    }
+  }
+
+  self = pthread_self ();
+  /* Remove this thread from the connection threads list */
+  pthread_mutex_lock (&connection_threads_lock);
+  /* Find out own index in the array */
+  for (i = 0; i < connection_threads_num; i++)
+    if (pthread_equal (connection_threads[i], self) != 0)
+      break;
+  assert (i < connection_threads_num);
+
+  /* Move the trailing threads forward. */
+  if (i < (connection_threads_num - 1))
+  {
+    memmove (connection_threads + i,
+        connection_threads + i + 1,
+        sizeof (pthread_t) * (connection_threads_num - i - 1));
+  }
+
+  connection_threads_num--;
+  pthread_mutex_unlock (&connection_threads_lock);
+
+  return (NULL);
+} /* }}} void *connection_thread_main */
+
+static int open_listen_socket_unix (const char *path) /* {{{ */
+{
+  int fd;
+  struct sockaddr_un sa;
+  listen_socket_t *temp;
+  int status;
+
+  temp = (listen_socket_t *) realloc (listen_fds,
+      sizeof (listen_fds[0]) * (listen_fds_num + 1));
+  if (temp == NULL)
+  {
+    RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
+    return (-1);
+  }
+  listen_fds = temp;
+  memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
+
+  fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
+  if (fd < 0)
+  {
+    RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
+    return (-1);
+  }
+
+  memset (&sa, 0, sizeof (sa));
+  sa.sun_family = AF_UNIX;
+  strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
+
+  status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
+  if (status != 0)
+  {
+    RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
+    close (fd);
+    unlink (path);
+    return (-1);
+  }
+
+  status = listen (fd, /* backlog = */ 10);
+  if (status != 0)
+  {
+    RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
+    close (fd);
+    unlink (path);
+    return (-1);
+  }
+  
+  listen_fds[listen_fds_num].fd = fd;
+  snprintf (listen_fds[listen_fds_num].path,
+      sizeof (listen_fds[listen_fds_num].path) - 1,
+      "unix:%s", path);
+  listen_fds_num++;
+
+  return (0);
+} /* }}} int open_listen_socket_unix */
+
+static int open_listen_socket (const char *addr) /* {{{ */
+{
+  struct addrinfo ai_hints;
+  struct addrinfo *ai_res;
+  struct addrinfo *ai_ptr;
+  int status;
+
+  assert (addr != NULL);
+
+  if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
+    return (open_listen_socket_unix (addr + strlen ("unix:")));
+  else if (addr[0] == '/')
+    return (open_listen_socket_unix (addr));
+
+  memset (&ai_hints, 0, sizeof (ai_hints));
+  ai_hints.ai_flags = 0;
+#ifdef AI_ADDRCONFIG
+  ai_hints.ai_flags |= AI_ADDRCONFIG;
+#endif
+  ai_hints.ai_family = AF_UNSPEC;
+  ai_hints.ai_socktype = SOCK_STREAM;
+
+  ai_res = NULL;
+  status = getaddrinfo (addr, RRDCACHED_DEFAULT_PORT, &ai_hints, &ai_res);
+  if (status != 0)
+  {
+    RRDD_LOG (LOG_ERR, "open_listen_socket: getaddrinfo(%s) failed: "
+        "%s", addr, gai_strerror (status));
+    return (-1);
+  }
+
+  for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
+  {
+    int fd;
+    listen_socket_t *temp;
+
+    temp = (listen_socket_t *) realloc (listen_fds,
+        sizeof (listen_fds[0]) * (listen_fds_num + 1));
+    if (temp == NULL)
+    {
+      RRDD_LOG (LOG_ERR, "open_listen_socket: realloc failed.");
+      continue;
+    }
+    listen_fds = temp;
+    memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
+
+    fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
+    if (fd < 0)
+    {
+      RRDD_LOG (LOG_ERR, "open_listen_socket: socket(2) failed.");
+      continue;
+    }
+
+    status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
+    if (status != 0)
+    {
+      RRDD_LOG (LOG_ERR, "open_listen_socket: bind(2) failed.");
+      close (fd);
+      continue;
+    }
+
+    status = listen (fd, /* backlog = */ 10);
+    if (status != 0)
+    {
+      RRDD_LOG (LOG_ERR, "open_listen_socket: listen(2) failed.");
+      close (fd);
+      return (-1);
+    }
+
+    listen_fds[listen_fds_num].fd = fd;
+    strncpy (listen_fds[listen_fds_num].path, addr,
+        sizeof (listen_fds[listen_fds_num].path) - 1);
+    listen_fds_num++;
+  } /* for (ai_ptr) */
+
+  return (0);
+} /* }}} int open_listen_socket */
+
+static int close_listen_sockets (void) /* {{{ */
+{
+  size_t i;
+
+  for (i = 0; i < listen_fds_num; i++)
+  {
+    close (listen_fds[i].fd);
+    if (strncmp ("unix:", listen_fds[i].path, strlen ("unix:")) == 0)
+      unlink (listen_fds[i].path + strlen ("unix:"));
+  }
+
+  free (listen_fds);
+  listen_fds = NULL;
+  listen_fds_num = 0;
+
+  return (0);
+} /* }}} int close_listen_sockets */
+
+static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
+{
+  struct pollfd *pollfds;
+  int pollfds_num;
+  int status;
+  int i;
+
+  for (i = 0; i < config_listen_address_list_len; i++)
+    open_listen_socket (config_listen_address_list[i]);
+
+  if (config_listen_address_list_len < 1)
+    open_listen_socket (RRDCACHED_DEFAULT_ADDRESS);
+
+  if (listen_fds_num < 1)
+  {
+    RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
+        "could be opened. Sorry.");
+    return (NULL);
+  }
+
+  pollfds_num = listen_fds_num;
+  pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
+  if (pollfds == NULL)
+  {
+    RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
+    return (NULL);
+  }
+  memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
+
+  RRDD_LOG(LOG_INFO, "listening for connections");
+
+  while (do_shutdown == 0)
+  {
+    assert (pollfds_num == ((int) listen_fds_num));
+    for (i = 0; i < pollfds_num; i++)
+    {
+      pollfds[i].fd = listen_fds[i].fd;
+      pollfds[i].events = POLLIN | POLLPRI;
+      pollfds[i].revents = 0;
+    }
+
+    status = poll (pollfds, pollfds_num, /* timeout = */ -1);
+    if (status < 1)
+    {
+      status = errno;
+      if (status != EINTR)
+      {
+        RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
+      }
+      continue;
+    }
+
+    for (i = 0; i < pollfds_num; i++)
+    {
+      int *client_sd;
+      struct sockaddr_storage client_sa;
+      socklen_t client_sa_size;
+      pthread_t tid;
+      pthread_attr_t attr;
+
+      if (pollfds[i].revents == 0)
+        continue;
+
+      if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
+      {
+        RRDD_LOG (LOG_ERR, "listen_thread_main: "
+            "poll(2) returned something unexpected for listen FD #%i.",
+            pollfds[i].fd);
+        continue;
+      }
+
+      client_sd = (int *) malloc (sizeof (int));
+      if (client_sd == NULL)
+      {
+        RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
+        continue;
+      }
+
+      client_sa_size = sizeof (client_sa);
+      *client_sd = accept (pollfds[i].fd,
+          (struct sockaddr *) &client_sa, &client_sa_size);
+      if (*client_sd < 0)
+      {
+        RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
+        continue;
+      }
+
+      pthread_attr_init (&attr);
+      pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
+
+      status = pthread_create (&tid, &attr, connection_thread_main,
+          /* args = */ (void *) client_sd);
+      if (status != 0)
+      {
+        RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
+        close (*client_sd);
+        free (client_sd);
+        continue;
+      }
+    } /* for (pollfds_num) */
+  } /* while (do_shutdown == 0) */
+
+  RRDD_LOG(LOG_INFO, "starting shutdown");
+
+  close_listen_sockets ();
+
+  pthread_mutex_lock (&connection_threads_lock);
+  while (connection_threads_num > 0)
+  {
+    pthread_t wait_for;
+
+    wait_for = connection_threads[0];
+
+    pthread_mutex_unlock (&connection_threads_lock);
+    pthread_join (wait_for, /* retval = */ NULL);
+    pthread_mutex_lock (&connection_threads_lock);
+  }
+  pthread_mutex_unlock (&connection_threads_lock);
+
+  return (NULL);
+} /* }}} void *listen_thread_main */
+
+static int daemonize (void) /* {{{ */
+{
+  int status;
+
+  /* These structures are static, because `sigaction' behaves weird if the are
+   * overwritten.. */
+  static struct sigaction sa_int;
+  static struct sigaction sa_term;
+  static struct sigaction sa_pipe;
+
+  if (!stay_foreground)
+  {
+    pid_t child;
+    char *base_dir;
+
+    child = fork ();
+    if (child < 0)
+    {
+      fprintf (stderr, "daemonize: fork(2) failed.\n");
+      return (-1);
+    }
+    else if (child > 0)
+    {
+      return (1);
+    }
+
+    /* Change into the /tmp directory. */
+    base_dir = (config_base_dir != NULL)
+      ? config_base_dir
+      : "/tmp";
+    status = chdir (base_dir);
+    if (status != 0)
+    {
+      fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
+      return (-1);
+    }
+
+    /* Become session leader */
+    setsid ();
+
+    /* Open the first three file descriptors to /dev/null */
+    close (2);
+    close (1);
+    close (0);
+
+    open ("/dev/null", O_RDWR);
+    dup (0);
+    dup (0);
+  } /* if (!stay_foreground) */
+
+  /* Install signal handlers */
+  memset (&sa_int, 0, sizeof (sa_int));
+  sa_int.sa_handler = sig_int_handler;
+  sigaction (SIGINT, &sa_int, NULL);
+
+  memset (&sa_term, 0, sizeof (sa_term));
+  sa_term.sa_handler = sig_term_handler;
+  sigaction (SIGTERM, &sa_term, NULL);
+
+  memset (&sa_pipe, 0, sizeof (sa_pipe));
+  sa_pipe.sa_handler = SIG_IGN;
+  sigaction (SIGPIPE, &sa_pipe, NULL);
+
+  openlog ("rrdcached", LOG_PID, LOG_DAEMON);
+  RRDD_LOG(LOG_INFO, "starting up");
+
+  cache_tree = g_tree_new ((GCompareFunc) strcmp);
+  if (cache_tree == NULL)
+  {
+    RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
+    return (-1);
+  }
+
+  status = write_pidfile ();
+  return status;
+} /* }}} int daemonize */
+
+static int cleanup (void) /* {{{ */
+{
+  do_shutdown++;
+
+  pthread_cond_signal (&cache_cond);
+  pthread_join (queue_thread, /* return = */ NULL);
+
+  remove_pidfile ();
+
+  RRDD_LOG(LOG_INFO, "goodbye");
+  closelog ();
+
+  return (0);
+} /* }}} int cleanup */
+
+static int read_options (int argc, char **argv) /* {{{ */
+{
+  int option;
+  int status = 0;
+
+  while ((option = getopt(argc, argv, "gl:f:w:b:z:p:j:h?")) != -1)
+  {
+    switch (option)
+    {
+      case 'g':
+        stay_foreground=1;
+        break;
+
+      case 'l':
+      {
+        char **temp;
+
+        temp = (char **) realloc (config_listen_address_list,
+            sizeof (char *) * (config_listen_address_list_len + 1));
+        if (temp == NULL)
+        {
+          fprintf (stderr, "read_options: realloc failed.\n");
+          return (2);
+        }
+        config_listen_address_list = temp;
+
+        temp[config_listen_address_list_len] = strdup (optarg);
+        if (temp[config_listen_address_list_len] == NULL)
+        {
+          fprintf (stderr, "read_options: strdup failed.\n");
+          return (2);
+        }
+        config_listen_address_list_len++;
+      }
+      break;
+
+      case 'f':
+      {
+        int temp;
+
+        temp = atoi (optarg);
+        if (temp > 0)
+          config_flush_interval = temp;
+        else
+        {
+          fprintf (stderr, "Invalid flush interval: %s\n", optarg);
+          status = 3;
+        }
+      }
+      break;
+
+      case 'w':
+      {
+        int temp;
+
+        temp = atoi (optarg);
+        if (temp > 0)
+          config_write_interval = temp;
+        else
+        {
+          fprintf (stderr, "Invalid write interval: %s\n", optarg);
+          status = 2;
+        }
+      }
+      break;
+
+      case 'z':
+      {
+        int temp;
+
+        temp = atoi(optarg);
+        if (temp > 0)
+          config_write_jitter = temp;
+        else
+        {
+          fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
+          status = 2;
+        }
+
+        break;
+      }
+
+      case 'b':
+      {
+        size_t len;
+
+        if (config_base_dir != NULL)
+          free (config_base_dir);
+        config_base_dir = strdup (optarg);
+        if (config_base_dir == NULL)
+        {
+          fprintf (stderr, "read_options: strdup failed.\n");
+          return (3);
+        }
+
+        len = strlen (config_base_dir);
+        while ((len > 0) && (config_base_dir[len - 1] == '/'))
+        {
+          config_base_dir[len - 1] = 0;
+          len--;
+        }
+
+        if (len < 1)
+        {
+          fprintf (stderr, "Invalid base directory: %s\n", optarg);
+          return (4);
+        }
+      }
+      break;
+
+      case 'p':
+      {
+        if (config_pid_file != NULL)
+          free (config_pid_file);
+        config_pid_file = strdup (optarg);
+        if (config_pid_file == NULL)
+        {
+          fprintf (stderr, "read_options: strdup failed.\n");
+          return (3);
+        }
+      }
+      break;
+
+      case 'j':
+      {
+        struct stat statbuf;
+        const char *dir = optarg;
+
+        status = stat(dir, &statbuf);
+        if (status != 0)
+        {
+          fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
+          return 6;
+        }
+
+        if (!S_ISDIR(statbuf.st_mode)
+            || access(dir, R_OK|W_OK|X_OK) != 0)
+        {
+          fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
+                  errno ? rrd_strerror(errno) : "");
+          return 6;
+        }
+
+        journal_cur = malloc(PATH_MAX + 1);
+        journal_old = malloc(PATH_MAX + 1);
+        if (journal_cur == NULL || journal_old == NULL)
+        {
+          fprintf(stderr, "malloc failure for journal files\n");
+          return 6;
+        }
+        else 
+        {
+          snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
+          snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
+        }
+      }
+      break;
+
+      case 'h':
+      case '?':
+        printf ("RRDd %s  Copyright (C) 2008 Florian octo Forster\n"
+            "\n"
+            "Usage: rrdcached [options]\n"
+            "\n"
+            "Valid options are:\n"
+            "  -l <address>  Socket address to listen to.\n"
+            "  -w <seconds>  Interval in which to write data.\n"
+            "  -z <delay>    Delay writes up to <delay> seconds to spread load" \
+            "  -f <seconds>  Interval in which to flush dead data.\n"
+            "  -p <file>     Location of the PID-file.\n"
+            "  -b <dir>      Base directory to change to.\n"
+            "\n"
+            "For more information and a detailed description of all options "
+            "please refer\n"
+            "to the rrdcached(1) manual page.\n",
+            VERSION);
+        status = -1;
+        break;
+    } /* switch (option) */
+  } /* while (getopt) */
+
+  /* advise the user when values are not sane */
+  if (config_flush_interval < 2 * config_write_interval)
+    fprintf(stderr, "WARNING: flush interval (-f) should be at least"
+            " 2x write interval (-w) !\n");
+  if (config_write_jitter > config_write_interval)
+    fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
+            " write interval (-w) !\n");
+
+  return (status);
+} /* }}} int read_options */
+
+int main (int argc, char **argv)
+{
+  int status;
+
+  status = read_options (argc, argv);
+  if (status != 0)
+  {
+    if (status < 0)
+      status = 0;
+    return (status);
+  }
+
+  status = daemonize ();
+  if (status == 1)
+  {
+    struct sigaction sigchld;
+
+    memset (&sigchld, 0, sizeof (sigchld));
+    sigchld.sa_handler = SIG_IGN;
+    sigaction (SIGCHLD, &sigchld, NULL);
+
+    return (0);
+  }
+  else if (status != 0)
+  {
+    fprintf (stderr, "daemonize failed, exiting.\n");
+    return (1);
+  }
+
+  if (journal_cur != NULL)
+  {
+    int had_journal = 0;
+
+    pthread_mutex_lock(&journal_lock);
+
+    RRDD_LOG(LOG_INFO, "checking for journal files");
+
+    had_journal += journal_replay(journal_old);
+    had_journal += journal_replay(journal_cur);
+
+    if (had_journal)
+      flush_old_values(-1);
+
+    pthread_mutex_unlock(&journal_lock);
+    journal_rotate();
+
+    RRDD_LOG(LOG_INFO, "journal processing complete");
+  }
+
+  /* start the queue thread */
+  memset (&queue_thread, 0, sizeof (queue_thread));
+  status = pthread_create (&queue_thread,
+                           NULL, /* attr */
+                           queue_thread_main,
+                           NULL); /* args */
+  if (status != 0)
+  {
+    RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
+    cleanup();
+    return (1);
+  }
+
+  listen_thread_main (NULL);
+  cleanup ();
+
+  return (0);
+} /* int main */
+
+/*
+ * vim: set sw=2 sts=2 ts=8 et fdm=marker :
+ */
index 093de98..a32f4fb 100644 (file)
@@ -43,6 +43,7 @@
  *****************************************************************************/
 #include "rrd_tool.h"
 #include "rrd_rpncalc.h"
+#include "rrd_client.h"
 
 #if !(defined(NETWARE) || defined(WIN32))
 extern char *tzname[2];
@@ -441,6 +442,7 @@ int rrd_dump(
 {
     int       rc;
     int       opt_noheader = 0;
+    char     *opt_daemon = NULL;
 
     /* init rrd clean */
 
@@ -451,16 +453,28 @@ int rrd_dump(
         int       opt;
         int       option_index = 0;
         static struct option long_options[] = {
+            {"daemon", required_argument, 0, 'd'},
             {"no-header", no_argument, 0, 'n'},
             {0, 0, 0, 0}
         };
 
-        opt = getopt_long(argc, argv, "n", long_options, &option_index);
+        opt = getopt_long(argc, argv, "d:n", long_options, &option_index);
 
         if (opt == EOF)
             break;
 
         switch (opt) {
+        case 'd':
+            if (opt_daemon != NULL)
+                    free (opt_daemon);
+            opt_daemon = strdup (optarg);
+            if (opt_daemon == NULL)
+            {
+                rrd_set_error ("strdup failed.");
+                return (-1);
+            }
+            break;
+
         case 'n':
             opt_noheader = 1;
             break;
@@ -479,6 +493,10 @@ int rrd_dump(
         return (-1);
     }
 
+    rc = rrdc_flush_if_daemon(opt_daemon, argv[optind]);
+    if (opt_daemon) free(opt_daemon);
+    if (rc) return (rc);
+
     if ((argc - optind) == 2) {
         rc = rrd_dump_opt_r(argv[optind], argv[optind + 1], opt_noheader);
     } else {
index 5da64c5..568e262 100644 (file)
@@ -53,6 +53,7 @@
  *****************************************************************************/
 
 #include "rrd_tool.h"
+#include "rrd_client.h"
 
 #include "rrd_is_thread_safe.h"
 /*#define DEBUG*/
@@ -72,6 +73,8 @@ int rrd_fetch(
     long      step_tmp = 1;
     time_t    start_tmp = 0, end_tmp = 0;
     const char *cf;
+    char *opt_daemon = NULL;
+    int status;
 
     rrd_time_value_t start_tv, end_tv;
     char     *parsetime_error = NULL;
@@ -79,6 +82,7 @@ int rrd_fetch(
         {"resolution", required_argument, 0, 'r'},
         {"start", required_argument, 0, 's'},
         {"end", required_argument, 0, 'e'},
+        {"daemon", required_argument, 0, 'd'},
         {0, 0, 0, 0}
     };
 
@@ -93,7 +97,7 @@ int rrd_fetch(
         int       option_index = 0;
         int       opt;
 
-        opt = getopt_long(argc, argv, "r:s:e:", long_options, &option_index);
+        opt = getopt_long(argc, argv, "r:s:e:d:", long_options, &option_index);
 
         if (opt == EOF)
             break;
@@ -114,6 +118,18 @@ int rrd_fetch(
         case 'r':
             step_tmp = atol(optarg);
             break;
+
+        case 'd':
+            if (opt_daemon != NULL)
+                    free (opt_daemon);
+            opt_daemon = strdup (optarg);
+            if (opt_daemon == NULL)
+            {
+                rrd_set_error ("strdup failed.");
+                return (-1);
+            }
+            break;
+
         case '?':
             rrd_set_error("unknown option '-%c'", optopt);
             return (-1);
@@ -151,10 +167,15 @@ int rrd_fetch(
         return -1;
     }
 
+    status = rrdc_flush_if_daemon(opt_daemon, argv[optind]);
+    if (opt_daemon) free (opt_daemon);
+    if (status) return (-1);
+
     cf = argv[optind + 1];
 
-    if (rrd_fetch_r(argv[optind], cf, start, end, step, ds_cnt, ds_namv, data)
-        != 0)
+    status = rrd_fetch_r(argv[optind], cf, start, end, step,
+            ds_cnt, ds_namv, data);
+    if (status != 0)
         return (-1);
     return (0);
 }
@@ -179,7 +200,7 @@ int rrd_fetch_r(
 
     return (rrd_fetch_fn
             (filename, cf_idx, start, end, step, ds_cnt, ds_namv, data));
-}
+} /* int rrd_fetch_r */
 
 int rrd_fetch_fn(
     const char *filename,   /* name of the rrd */
diff --git a/src/rrd_flush.c b/src/rrd_flush.c
new file mode 100644 (file)
index 0000000..218a65a
--- /dev/null
@@ -0,0 +1,95 @@
+/**
+ * RRDTool - src/rrd_flush.c
+ * Copyright (C) 2008 Florian octo Forster
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; only version 2 of the License is applicable.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
+ *
+ * Authors:
+ *   Florian octo Forster <octo at verplant.org>
+ **/
+
+#include "rrd_tool.h"
+#include "rrd_client.h"
+
+int rrd_cmd_flush (int argc, char **argv)
+{
+    char *opt_daemon = NULL;
+    int status;
+
+    /* initialize getopt */
+    optind = 0;
+    opterr = 0;
+
+    while (42)
+    {
+        int opt;
+        static struct option long_options[] =
+        {
+            {"daemon", required_argument, 0, 'd'},
+            {0, 0, 0, 0}
+        };
+
+        opt = getopt_long(argc, argv, "d:", long_options, NULL);
+
+        if (opt == -1)
+            break;
+
+        switch (opt)
+        {
+            case 'd':
+                if (opt_daemon != NULL)
+                    free (opt_daemon);
+                opt_daemon = strdup (optarg);
+                if (opt_daemon == NULL)
+                {
+                    rrd_set_error ("strdup failed.");
+                    return (-1);
+                }
+                break;
+
+            default:
+                rrd_set_error ("Usage: rrdtool %s [--daemon <addr>] <file>",
+                        argv[0]);
+                return (-1);
+        }
+    } /* while (42) */
+
+    if ((argc - optind) != 1)
+    {
+        rrd_set_error ("Usage: rrdtool %s [--daemon <addr>] <file>", argv[0]);
+        return (-1);
+    }
+
+    /* try to connect to rrdcached */
+    status = rrdc_connect(opt_daemon);
+    if (opt_daemon) free(opt_daemon);
+    if (status != 0) return status;
+
+    if (! rrdc_is_connected(opt_daemon))
+    {
+        rrd_set_error ("Daemon address unknown. Please use the \"--daemon\" "
+                "option to set an address on the command line or set the "
+                "\"%s\" environment variable.",
+                ENV_RRDCACHED_ADDRESS);
+        return (-1);
+    }
+
+    status = rrdc_flush(argv[optind]);
+
+    return ((status == 0) ? 0 : -1);
+} /* int rrd_flush */
+
+/*
+ * vim: set sw=4 sts=4 et fdm=marker :
+ */
index cb0627c..ea5d8c3 100644 (file)
@@ -26,6 +26,7 @@
 #endif
 
 #include "rrd_graph.h"
+#include "rrd_client.h"
 
 /* some constant definitions */
 
@@ -305,6 +306,10 @@ int im_free(
 
     if (im == NULL)
         return 0;
+
+    if (im->daemon_addr != NULL)
+      free(im->daemon_addr);
+
     for (i = 0; i < (unsigned) im->gdes_c; i++) {
         if (im->gdes[i].data_first) {
             /* careful here, because a single pointer can occur several times */
@@ -833,6 +838,36 @@ int data_fetch(
         if (!skip) {
             unsigned long ft_step = im->gdes[i].step;   /* ft_step will record what we got from fetch */
 
+            /* Flush the file if
+             * - a connection to the daemon has been established
+             * - this is the first occurrence of that RRD file
+             */
+            if (rrdc_is_connected(im->daemon_addr))
+            {
+                int status;
+
+                status = 0;
+                for (ii = 0; ii < i; ii++)
+                {
+                    if (strcmp (im->gdes[i].rrd, im->gdes[ii].rrd) == 0)
+                    {
+                        status = 1;
+                        break;
+                    }
+                }
+
+                if (status == 0)
+                {
+                    status = rrdc_flush (im->gdes[i].rrd);
+                    if (status != 0)
+                    {
+                        rrd_set_error ("rrdc_flush (%s) failed with status %i.",
+                                im->gdes[i].rrd, status);
+                        return (-1);
+                    }
+                }
+            } /* if (rrdc_is_connected()) */
+
             if ((rrd_fetch_fn(im->gdes[i].rrd,
                               im->gdes[i].cf,
                               &im->gdes[i].start,
@@ -3710,6 +3745,7 @@ void rrd_graph_init(
 #endif
 #endif
     im->base = 1000;
+    im->daemon_addr = NULL;
     im->draw_x_grid = 1;
     im->draw_y_grid = 1;
     im->extra_flags = 0;
@@ -3856,6 +3892,7 @@ void rrd_graph_options(
         { "watermark",          required_argument, 0, 'W'},
         { "alt-y-mrtg",         no_argument,       0, 1000},    /* this has no effect it is just here to save old apps from crashing when they use it */
         { "pango-markup",       no_argument,       0, 'P'},
+        { "daemon",             required_argument, 0, 'd'},
         {  0, 0, 0, 0}
 };
 /* *INDENT-ON* */
@@ -3870,7 +3907,7 @@ void rrd_graph_options(
         int       col_start, col_end;
 
         opt = getopt_long(argc, argv,
-                          "s:e:x:y:v:w:h:D:iu:l:rb:oc:n:m:t:f:a:I:zgjFYAMEX:L:S:T:NR:B:W:kP",
+                          "s:e:x:y:v:w:h:D:iu:l:rb:oc:n:m:t:f:a:I:zgjFYAMEX:L:S:T:NR:B:W:kPd:",
                           long_options, &option_index);
         if (opt == EOF)
             break;
@@ -4212,6 +4249,24 @@ void rrd_graph_options(
             strncpy(im->watermark, optarg, 100);
             im->watermark[99] = '\0';
             break;
+        case 'd':
+        {
+            if (im->daemon_addr != NULL)
+            {
+                rrd_set_error ("You cannot specify --daemon "
+                        "more than once.");
+                return;
+            }
+
+            im->daemon_addr = strdup(optarg);
+            if (im->daemon_addr == NULL)
+            {
+              rrd_set_error("strdup failed");
+              return;
+            }
+
+            break;
+        }
         case '?':
             if (optopt != 0)
                 rrd_set_error("unknown option '%c'", optopt);
@@ -4219,6 +4274,11 @@ void rrd_graph_options(
                 rrd_set_error("unknown option '%s'", argv[optind - 1]);
             return;
         }
+    } /* while (1) */
+
+    {   /* try to connect to rrdcached */
+        int status = rrdc_connect(im->daemon_addr);
+        if (status != 0) return;
     }
     
     pango_cairo_context_set_font_options(pango_layout_get_context(im->layout), im->font_options);
index 6276508..8b86e86 100644 (file)
@@ -213,6 +213,7 @@ typedef struct image_desc_t {
     char     *imginfo;  /* construct an <IMG ... tag and return 
                            as first retval */
     enum gfx_if_en imgformat;   /* image format */
+    char     *daemon_addr;  /* rrdcached connection string */
     int       lazy;     /* only update the image if there is
                            reasonable probablility that the
                            existing one is out of date */
index 5641eda..679b024 100644 (file)
@@ -6,6 +6,7 @@
 
 #include "rrd_tool.h"
 #include "rrd_rpncalc.h"
+#include "rrd_client.h"
 #include <stdarg.h>
 
 /* proto */
@@ -80,18 +81,59 @@ rrd_info_t *rrd_info(
     char **argv)
 {
     rrd_info_t *info;
+    char *opt_daemon = NULL;
+    int status;
 
-    if (argc < 2) {
-        rrd_set_error("please specify an rrd");
-        return NULL;
-    }
+    optind = 0;
+    opterr = 0;         /* initialize getopt */
 
-    info = rrd_info_r(argv[1]);
+    while (42) {
+        int       opt;
+        int       option_index = 0;
+        static struct option long_options[] = {
+            {"daemon", required_argument, 0, 'd'},
+            {0, 0, 0, 0}
+        };
 
-    return (info);
-}
+        opt = getopt_long(argc, argv, "d:", long_options, &option_index);
 
+        if (opt == EOF)
+            break;
 
+        switch (opt) {
+        case 'd':
+            if (opt_daemon != NULL)
+                    free (opt_daemon);
+            opt_daemon = strdup (optarg);
+            if (opt_daemon == NULL)
+            {
+                rrd_set_error ("strdup failed.");
+                return (NULL);
+            }
+            break;
+
+        default:
+            rrd_set_error ("Usage: rrdtool %s [--daemon <addr>] <file>",
+                    argv[0]);
+            return (NULL);
+            break;
+        }
+    }                   /* while (42) */
+
+    if ((argc - optind) != 1) {
+        rrd_set_error ("Usage: rrdtool %s [--daemon <addr>] <file>",
+                argv[0]);
+        return (NULL);
+    }
+
+    status = rrdc_flush_if_daemon(opt_daemon, argv[optind]);
+    if (opt_daemon) free (opt_daemon);
+    if (status) return (NULL);
+
+    info = rrd_info_r(argv[optind]);
+
+    return (info);
+} /* rrd_info_t *rrd_info */
 
 rrd_info_t *rrd_info_r(
     char *filename)
index 2749feb..517b65a 100644 (file)
@@ -7,19 +7,63 @@
  *****************************************************************************/
 
 #include "rrd_tool.h"
+#include "rrd_client.h"
 
 time_t rrd_last(
     int argc,
     char **argv)
 {
-    if (argc < 2) {
-        rrd_set_error("please specify an rrd");
+    char *opt_daemon = NULL;
+    int status;
+
+    optind = 0;
+    opterr = 0;         /* initialize getopt */
+
+    while (42) {
+        int       opt;
+        int       option_index = 0;
+        static struct option long_options[] = {
+            {"daemon", required_argument, 0, 'd'},
+            {0, 0, 0, 0}
+        };
+
+        opt = getopt_long(argc, argv, "d:", long_options, &option_index);
+
+        if (opt == EOF)
+            break;
+
+        switch (opt) {
+        case 'd':
+            if (opt_daemon != NULL)
+                    free (opt_daemon);
+            opt_daemon = strdup (optarg);
+            if (opt_daemon == NULL)
+            {
+                rrd_set_error ("strdup failed.");
+                return (-1);
+            }
+            break;
+
+        default:
+            rrd_set_error ("Usage: rrdtool %s [--daemon <addr>] <file>",
+                    argv[0]);
+            return (-1);
+            break;
+        }
+    }                   /* while (42) */
+
+    if ((argc - optind) != 1) {
+        rrd_set_error ("Usage: rrdtool %s [--daemon <addr>] <file>",
+                argv[0]);
         return (-1);
     }
 
-    return (rrd_last_r(argv[1]));
-}
+    status = rrdc_flush_if_daemon(opt_daemon, argv[optind]);
+    if (opt_daemon) free(opt_daemon);
+    if (status) return (-1);
 
+    return (rrd_last_r (argv[optind]));
+}
 
 time_t rrd_last_r(
     const char *filename)
index e2ad334..62e3b8b 100644 (file)
 /*****************************************************************************
  * RRDtool 1.3.2  Copyright by Tobi Oetiker, 1997-2008
+ *                Copyright by Florian Forster, 2008
  *****************************************************************************
  * rrd_lastupdate  Get the last datum entered for each DS
  *****************************************************************************/
 
 #include "rrd_tool.h"
 #include "rrd_rpncalc.h"
+#include "rrd_client.h"
 #include <stdarg.h>
 
-int rrd_lastupdate(
-    int argc,
-    char **argv,
-    time_t *last_update,
-    unsigned long *ds_cnt,
-    char ***ds_namv,
-    char ***last_ds)
+int rrd_lastupdate (int argc, char **argv)
+{
+    time_t    last_update;
+    char    **ds_names;
+    char    **last_ds;
+    unsigned long ds_count, i;
+    int status;
+
+    char *opt_daemon = NULL;
+
+    optind = 0;
+    opterr = 0;         /* initialize getopt */
+
+    while (42) {
+        int       opt;
+        int       option_index = 0;
+        static struct option long_options[] = {
+            {"daemon", required_argument, 0, 'd'},
+            {0, 0, 0, 0}
+        };
+
+        opt = getopt_long (argc, argv, "d:", long_options, &option_index);
+
+        if (opt == EOF)
+            break;
+
+        switch (opt) {
+        case 'd':
+            if (opt_daemon != NULL)
+                    free (opt_daemon);
+            opt_daemon = strdup (optarg);
+            if (opt_daemon == NULL)
+            {
+                rrd_set_error ("strdup failed.");
+                return (-1);
+            }
+            break;
+
+        default:
+            rrd_set_error ("Usage: rrdtool %s [--daemon <addr>] <file>",
+                    argv[0]);
+            return (-1);
+            break;
+        }
+    }                   /* while (42) */
+
+    if ((argc - optind) != 1) {
+        rrd_set_error ("Usage: rrdtool %s [--daemon <addr>] <file>",
+                argv[0]);
+        return (-1);
+    }
+
+    status = rrdc_flush_if_daemon(opt_daemon, argv[optind]);
+    if (opt_daemon) free (opt_daemon);
+    if (status) return (-1);
+
+    status = rrd_lastupdate_r (argv[optind],
+            &last_update, &ds_count, &ds_names, &last_ds);
+    if (status != 0)
+        return (status);
+
+    for (i = 0; i < ds_count; i++)
+        printf(" %s", ds_names[i]);
+    printf ("\n\n");
+
+    printf ("%10lu:", last_update);
+    for (i = 0; i < ds_count; i++) {
+        printf(" %s", last_ds[i]);
+        free(last_ds[i]);
+        free(ds_names[i]);
+    }
+    printf("\n");
+
+    free(last_ds);
+    free(ds_names);
+
+    return (0);
+} /* int rrd_lastupdate */
+
+int rrd_lastupdate_r(const char *filename,
+        time_t *ret_last_update,
+        unsigned long *ret_ds_count,
+        char ***ret_ds_names,
+        char ***ret_last_ds)
 {
     unsigned long i = 0;
-    char     *filename;
     rrd_t     rrd;
     rrd_file_t *rrd_file;
 
-    if (argc < 2) {
-        rrd_set_error("please specify an rrd");
-        goto err_out;
+    rrd_file = rrd_open(filename, &rrd, RRD_READONLY);
+    if (rrd_file == NULL) {
+        rrd_free(&rrd);
+        return (-1);
     }
-    filename = argv[1];
 
-    rrd_file = rrd_open(filename, &rrd, RRD_READONLY);
-    if (rrd_file == NULL)
-        goto err_free;
-
-    *last_update = rrd.live_head->last_up;
-    *ds_cnt = rrd.stat_head->ds_cnt;
-    if (((*ds_namv) =
-         (char **) malloc(rrd.stat_head->ds_cnt * sizeof(char *))) == NULL) {
-        rrd_set_error("malloc fetch ds_namv array");
-        goto err_close;
+    *ret_last_update = rrd.live_head->last_up;
+    *ret_ds_count = rrd.stat_head->ds_cnt;
+    *ret_ds_names = (char **) malloc (rrd.stat_head->ds_cnt * sizeof(char *));
+    if (*ret_ds_names == NULL) {
+        rrd_set_error ("malloc fetch ret_ds_names array");
+        rrd_close (rrd_file);
+        rrd_free (&rrd);
+        return (-1);
     }
+    memset (*ret_ds_names, 0, rrd.stat_head->ds_cnt * sizeof(char *));
 
-    if (((*last_ds) =
-         (char **) malloc(rrd.stat_head->ds_cnt * sizeof(char *))) == NULL) {
-        rrd_set_error("malloc fetch last_ds array");
-        goto err_free_ds_namv;
+    *ret_last_ds = (char **) malloc (rrd.stat_head->ds_cnt * sizeof(char *));
+    if (*ret_last_ds == NULL) {
+        rrd_set_error ("malloc fetch ret_last_ds array");
+        free (*ret_ds_names);
+        *ret_ds_names = NULL;
+        rrd_close (rrd_file);
+        rrd_free (&rrd);
+        return (-1);
     }
+    memset (*ret_last_ds, 0, rrd.stat_head->ds_cnt * sizeof(char *));
 
     for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
-        (*ds_namv)[i] = sprintf_alloc("%s", rrd.ds_def[i].ds_nam);
-        (*last_ds)[i] = sprintf_alloc("%s", rrd.pdp_prep[i].last_ds);
+        (*ret_ds_names)[i] = sprintf_alloc("%s", rrd.ds_def[i].ds_nam);
+        (*ret_last_ds)[i] = sprintf_alloc("%s", rrd.pdp_prep[i].last_ds);
+
+        if (((*ret_ds_names)[i] == NULL) || ((*ret_last_ds)[i] == NULL))
+            break;
+    }
+
+    /* Check if all names and values could be copied and free everything if
+     * not. */
+    if (i < rrd.stat_head->ds_cnt) {
+        rrd_set_error ("sprintf_alloc failed");
+        for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
+            if ((*ret_ds_names)[i] != NULL)
+            {
+                free ((*ret_ds_names)[i]);
+                (*ret_ds_names)[i] = NULL;
+            }
+            if ((*ret_last_ds)[i] != NULL)
+            {
+                free ((*ret_last_ds)[i]);
+                (*ret_last_ds)[i] = NULL;
+            }
+        }
+        free (*ret_ds_names);
+        *ret_ds_names = NULL;
+        free (*ret_last_ds);
+        *ret_last_ds = NULL;
+        rrd_close (rrd_file);
+        rrd_free (&rrd);
+        return (-1);
     }
 
     rrd_free(&rrd);
     rrd_close(rrd_file);
     return (0);
-
-  err_free_ds_namv:
-    free(*ds_namv);
-  err_close:
-    rrd_close(rrd_file);
-  err_free:
-    rrd_free(&rrd);
-  err_out:
-    return (-1);
-}
+} /* int rrd_lastupdate_r */
index d50e608..d054386 100644 (file)
@@ -55,7 +55,7 @@ void PrintUsage(
         N_
         ("Valid commands: create, update, updatev, graph, graphv,  dump, restore,\n"
          "\t\tlast, lastupdate, first, info, fetch, tune,\n"
-         "\t\tresize, xport\n\n");
+         "\t\tresize, xport, flush\n\n");
 
     const char *help_listremote =
         N_("Valid remote commands: quit, ls, cd, mkdir, pwd\n\n");
@@ -95,7 +95,8 @@ void PrintUsage(
     const char *help_update =
         N_("* update - update an RRD\n\n"
            "\trrdtool update filename\n"
-           "\t\t--template|-t ds-name:ds-name:...\n"
+           "\t\t[--template|-t ds-name:ds-name:...]\n"
+          "\t\t[--daemon <address>]\n"
            "\t\ttime|N:value[:value...]\n\n"
            "\t\tat-time@value[:value...]\n\n"
            "\t\t[ time:value[:value...] ..]\n\n");
@@ -104,7 +105,7 @@ void PrintUsage(
         N_("* updatev - a verbose version of update\n"
            "\treturns information about values, RRAs, and datasources updated\n\n"
            "\trrdtool updatev filename\n"
-           "\t\t--template|-t ds-name:ds-name:...\n"
+           "\t\t[--template|-t ds-name:ds-name:...]\n"
            "\t\ttime|N:value[:value...]\n\n"
            "\t\tat-time@value[:value...]\n\n"
            "\t\t[ time:value[:value...] ..]\n\n");
@@ -113,7 +114,13 @@ void PrintUsage(
         N_("* fetch - fetch data out of an RRD\n\n"
            "\trrdtool fetch filename.rrd CF\n"
            "\t\t[-r|--resolution resolution]\n"
-           "\t\t[-s|--start start] [-e|--end end]\n\n");
+           "\t\t[-s|--start start] [-e|--end end]\n"
+          "\t\t[--daemon <address>]\n\n");
+
+    const char *help_flush =
+        N_("* flush - flush cached data out to an RRD file\n\n"
+           "\trrdtool flush filename.rrd\n"
+          "\t\t[--daemon <address>]\n\n");
 
 /* break up very large strings (help_graph, help_tune) for ISO C89 compliance*/
 
@@ -132,7 +139,7 @@ void PrintUsage(
            "\t\t[-h|--height pixels] [-o|--logarithmic]\n"
            "\t\t[-u|--upper-limit value] [-z|--lazy]\n"
            "\t\t[-l|--lower-limit value] [-r|--rigid]\n"
-           "\t\t[-g|--no-legend]\n"
+           "\t\t[-g|--no-legend] [--daemon <address>]\n"
            "\t\t[-F|--force-rules-legend]\n" "\t\t[-j|--only-graph]\n");
     const char *help_graph2 =
         N_("\t\t[-n|--font FONTTAG:size:font]\n"
@@ -217,7 +224,7 @@ void PrintUsage(
         C_LASTUPDATE, C_FIRST, C_UPDATE, C_FETCH, C_GRAPH, C_GRAPHV,
         C_TUNE,
         C_RESIZE, C_XPORT, C_QUIT, C_LS, C_CD, C_MKDIR, C_PWD,
-        C_UPDATEV
+        C_UPDATEV, C_FLUSH
     };
     int       help_cmd = C_NONE;
 
@@ -242,6 +249,8 @@ void PrintUsage(
             help_cmd = C_UPDATEV;
         else if (!strcmp(cmd, "fetch"))
             help_cmd = C_FETCH;
+        else if (!strcmp(cmd, "flush"))
+            help_cmd = C_FLUSH;
         else if (!strcmp(cmd, "graph"))
             help_cmd = C_GRAPH;
         else if (!strcmp(cmd, "graphv"))
@@ -302,6 +311,9 @@ void PrintUsage(
     case C_FETCH:
         fputs(_(help_fetch), stdout);
         break;
+    case C_FLUSH:
+        fputs(_(help_flush), stdout);
+        break;
     case C_GRAPH:
         fputs(_(help_graph0), stdout);
         fputs(_(help_graph1), stdout);
@@ -647,26 +659,7 @@ int HandleInputLine(
     else if (strcmp("last", argv[1]) == 0)
         printf("%ld\n", rrd_last(argc - 1, &argv[1]));
     else if (strcmp("lastupdate", argv[1]) == 0) {
-        time_t    last_update;
-        char    **ds_namv;
-        char    **last_ds;
-        unsigned long ds_cnt, i;
-
-        if (rrd_lastupdate(argc - 1, &argv[1], &last_update,
-                           &ds_cnt, &ds_namv, &last_ds) == 0) {
-            for (i = 0; i < ds_cnt; i++)
-                printf(" %s", ds_namv[i]);
-            printf("\n\n");
-            printf("%10lu:", last_update);
-            for (i = 0; i < ds_cnt; i++) {
-                printf(" %s", last_ds[i]);
-                free(last_ds[i]);
-                free(ds_namv[i]);
-            }
-            printf("\n");
-            free(last_ds);
-            free(ds_namv);
-        }
+        rrd_lastupdate(argc - 1, &argv[1]);
     } else if (strcmp("first", argv[1]) == 0)
         printf("%ld\n", rrd_first(argc - 1, &argv[1]));
     else if (strcmp("update", argv[1]) == 0)
@@ -815,6 +808,8 @@ int HandleInputLine(
 
     } else if (strcmp("tune", argv[1]) == 0)
         rrd_tune(argc - 1, &argv[1]);
+    else if (strcmp("flush", argv[1]) == 0)
+        rrd_cmd_flush(argc - 1, &argv[1]);
     else {
         rrd_set_error("unknown function '%s'", argv[1]);
     }
index 858b82b..43781da 100644 (file)
@@ -83,15 +83,14 @@ extern    "C" {
     int       rrd_create_fn(
     const char *file_name,
     rrd_t *rrd);
-    int       rrd_fetch_fn(
-    const char *filename,
-    enum cf_en cf_idx,
-    time_t *start,
-    time_t *end,
-    unsigned long *step,
-    unsigned long *ds_cnt,
-    char ***ds_namv,
-    rrd_value_t **data);
+    int rrd_fetch_fn (const char *filename,
+            enum cf_en cf_idx,
+            time_t *start,
+            time_t *end,
+            unsigned long *step,
+            unsigned long *ds_cnt,
+            char ***ds_namv,
+            rrd_value_t **data);
 
 #define RRD_READONLY    (1<<0)
 #define RRD_READWRITE   (1<<1)
@@ -110,7 +109,7 @@ extern    "C" {
     char *a,
     char *b);
 
-#endif
+#endif /* _RRD_TOOL_H */
 
 #ifdef  __cplusplus
 }
index d7ee4ac..9d3e854 100644 (file)
@@ -1,6 +1,7 @@
 
 /*****************************************************************************
  * RRDtool 1.3.2  Copyright by Tobi Oetiker, 1997-2008
+ *                Copyright by Florian Forster, 2008
  *****************************************************************************
  * rrd_update.c  RRD Update Function
  *****************************************************************************
@@ -23,6 +24,8 @@
 #include "rrd_is_thread_safe.h"
 #include "unused.h"
 
+#include "rrd_client.h"
+
 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
 /*
  * WIN32 does not have gettimeofday    and struct timeval. This is a quick and dirty
@@ -319,6 +322,7 @@ rrd_info_t *rrd_update_v(
     char     *tmplt = NULL;
     rrd_info_t *result = NULL;
     rrd_infoval_t rc;
+    char *opt_daemon = NULL;
     struct option long_options[] = {
         {"template", required_argument, 0, 't'},
         {0, 0, 0, 0}
@@ -348,6 +352,15 @@ rrd_info_t *rrd_update_v(
         }
     }
 
+    opt_daemon = getenv (ENV_RRDCACHED_ADDRESS);
+    if (opt_daemon != NULL) {
+        rrd_set_error ("The \"%s\" environment variable is defined, "
+                "but \"%s\" cannot work with rrdcached. Either unset "
+                "the environment variable or use \"update\" instead.",
+                ENV_RRDCACHED_ADDRESS, argv[0]);
+        goto end_tag;
+    }
+
     /* need at least 2 arguments: filename, data. */
     if (argc - optind < 2) {
         rrd_set_error("Not enough arguments");
@@ -369,18 +382,20 @@ int rrd_update(
 {
     struct option long_options[] = {
         {"template", required_argument, 0, 't'},
+        {"daemon",   required_argument, 0, 'd'},
         {0, 0, 0, 0}
     };
     int       option_index = 0;
     int       opt;
     char     *tmplt = NULL;
     int       rc = -1;
+    char     *opt_daemon = NULL;
 
     optind = 0;
     opterr = 0;         /* initialize getopt */
 
     while (1) {
-        opt = getopt_long(argc, argv, "t:", long_options, &option_index);
+        opt = getopt_long(argc, argv, "t:d:", long_options, &option_index);
 
         if (opt == EOF)
             break;
@@ -390,6 +405,17 @@ int rrd_update(
             tmplt = strdup(optarg);
             break;
 
+        case 'd':
+            if (opt_daemon != NULL)
+                free (opt_daemon);
+            opt_daemon = strdup (optarg);
+            if (opt_daemon == NULL)
+            {
+                rrd_set_error("strdup failed.");
+                goto out;
+            }
+            break;
+
         case '?':
             rrd_set_error("unknown option '%s'", argv[optind - 1]);
             goto out;
@@ -402,10 +428,48 @@ int rrd_update(
         goto out;
     }
 
-    rc = rrd_update_r(argv[optind], tmplt,
-                      argc - optind - 1, (const char **) (argv + optind + 1));
+    {   /* try to connect to rrdcached */
+        int status = rrdc_connect(opt_daemon);
+        if (status != 0) return status;
+    }
+
+    if ((tmplt != NULL) && rrdc_is_connected(opt_daemon))
+    {
+        rrd_set_error("The caching daemon cannot be used together with "
+                "templates yet.");
+        goto out;
+    }
+
+    if (! rrdc_is_connected(opt_daemon))
+    {
+      rc = rrd_update_r(argv[optind], tmplt,
+                        argc - optind - 1, (const char **) (argv + optind + 1));
+    }
+    else /* we are connected */
+    {
+        rc = rrdc_update (argv[optind], /* file */
+                          argc - optind - 1, /* values_num */
+                          (void *) (argv + optind + 1)); /* values */
+        if (rc != 0)
+        {
+            rrd_set_error("Failed sending the values to rrdcached: %s",
+                    (rc < 0)
+                    ? "Internal error"
+                    : rrd_strerror (rc));
+        }
+    }
+
   out:
-    free(tmplt);
+    if (tmplt != NULL)
+    {
+        free(tmplt);
+        tmplt = NULL;
+    }
+    if (opt_daemon != NULL)
+    {
+        free (opt_daemon);
+        opt_daemon = NULL;
+    }
     return rc;
 }
 
index 0dc6486..c561a5a 100644 (file)
@@ -10,6 +10,7 @@
 #include "rrd_graph.h"
 #include "rrd_xport.h"
 #include "unused.h"
+#include "rrd_client.h"
 
 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
 #include <io.h>
@@ -53,17 +54,19 @@ int rrd_xport(
     char ***legend_v,   /* legend entries */
     rrd_value_t **data)
 {                       /* two dimensional array containing the data */
-
     image_desc_t im;
     time_t    start_tmp = 0, end_tmp = 0;
     rrd_time_value_t start_tv, end_tv;
     char     *parsetime_error = NULL;
+    char     *opt_daemon = NULL;
+
     struct option long_options[] = {
         {"start", required_argument, 0, 's'},
         {"end", required_argument, 0, 'e'},
         {"maxrows", required_argument, 0, 'm'},
         {"step", required_argument, 0, 261},
         {"enumds", no_argument, 0, 262},    /* these are handled in the frontend ... */
+        {"daemon", required_argument, 0, 'd'},
         {0, 0, 0, 0}
     };
 
@@ -79,7 +82,7 @@ int rrd_xport(
         int       option_index = 0;
         int       opt;
 
-        opt = getopt_long(argc, argv, "s:e:m:", long_options, &option_index);
+        opt = getopt_long(argc, argv, "s:e:m:d:", long_options, &option_index);
 
         if (opt == EOF)
             break;
@@ -109,6 +112,24 @@ int rrd_xport(
                 return -1;
             }
             break;
+        case 'd':
+        {
+            if (opt_daemon != NULL)
+            {
+                rrd_set_error ("You cannot specify --daemon "
+                        "more than once.");
+                return (-1);
+            }
+
+            opt_daemon = strdup(optarg);
+            if (opt_daemon == NULL)
+            {
+                rrd_set_error("strdup error");
+                return -1;
+            }
+            break;
+        }
+
         case '?':
             rrd_set_error("unknown option '%s'", argv[optind - 1]);
             return -1;
@@ -147,6 +168,12 @@ int rrd_xport(
         return (-1);
     }
 
+    {   /* try to connect to rrdcached */
+        int status = rrdc_connect(opt_daemon);
+        if (opt_daemon) free(opt_daemon);
+        if (status != 0) return status;
+    }
+
     if (rrd_xport_fn(&im, start, end, step, col_cnt, legend_v, data) == -1) {
         im_free(&im);
         return -1;