From: Florian Forster Date: Sun, 29 Jun 2008 21:38:36 +0000 (+0200) Subject: Merge branch 'master' into ff/rrdd X-Git-Url: https://git.octo.it/?p=rrdtool.git;a=commitdiff_plain;h=9fd92a932867262d4f3eb239f05473252f1e98b2;hp=5c3b824e73337ebc150268f929246991b68d44f3 Merge branch 'master' into ff/rrdd --- diff --git a/CONTRIBUTORS b/CONTRIBUTORS index 847c1d1..a2dc232 100644 --- a/CONTRIBUTORS +++ b/CONTRIBUTORS @@ -21,7 +21,7 @@ David Grimes SQRT/SORT/REV/SHIFT/TREND David L. Barker xport function bug fixes Evan Miller Multiplicative HW Enhancements Frank Strauss TCL bindings -Florian octo Forster rrd_restore libxml2 rewrite deprecated function export +Florian octo Forster rrd_restore libxml2 rewrite, deprecated function export, rrdcached Henrik Storner functions for min/max values of data in graph Hermann Hueni (SunOS porting) Jakob Ilves HPUX 11 diff --git a/configure.ac b/configure.ac index 7a15b5c..822626e 100644 --- a/configure.ac +++ b/configure.ac @@ -355,8 +355,9 @@ CONFIGURE_PART(Audit Compilation Environment) dnl Check for the compiler and static/shared library creation. -AC_PROG_CC AC_PROG_CPP +AC_PROG_CC +AM_PROG_CC_C_O AC_PROG_LIBTOOL dnl Try to detect/use GNU features @@ -674,7 +675,7 @@ EX_CHECK_ALL(cairo, cairo_font_options_create, cairo.h, EX_CHECK_ALL(cairo, cairo_svg_surface_create, cairo-svg.h, cairo-svg, 1.4.6, http://cairographics.org/releases/, "") EX_CHECK_ALL(cairo, cairo_pdf_surface_create, cairo-pdf.h, cairo-pdf, 1.4.6, http://cairographics.org/releases/, "") EX_CHECK_ALL(cairo, cairo_ps_surface_create, cairo-ps.h, cairo-ps, 1.4.6, http://cairographics.org/releases/, "") -dnl EX_CHECK_ALL(glib-2.0, glib_check_version, glib.h, glib-2.0, 2.12.12, ftp://ftp.gtk.org/pub/glib/2.12/, "") +EX_CHECK_ALL(glib-2.0, glib_check_version, glib.h, glib-2.0, 2.12.12, ftp://ftp.gtk.org/pub/glib/2.12/, "") EX_CHECK_ALL(pango-1.0, pango_cairo_context_set_font_options, pango/pango.h, pangocairo, 1.17, http://ftp.gnome.org/pub/GNOME/sources/pango/1.17, "") EX_CHECK_ALL(xml2, xmlParseFile, libxml/parser.h, libxml-2.0, 2.6.31, http://xmlsoft.org/downloads.html, /usr/include/libxml2) diff --git a/doc/Makefile.am b/doc/Makefile.am index 16fd617..05d637d 100644 --- a/doc/Makefile.am +++ b/doc/Makefile.am @@ -10,7 +10,7 @@ CLEANFILES = *.1 *.html *.txt *-dircache RRD?.pod *.pdf *~ core *itemcache *.rej POD = bin_dec_hex.pod rrddump.pod rrdgraph_examples.pod rrdrestore.pod rrdupdate.pod \ cdeftutorial.pod rrdfetch.pod rrdgraph_graph.pod rrdthreads.pod rrdxport.pod \ - rpntutorial.pod rrdfirst.pod rrdgraph_rpn.pod rrdtool.pod \ + rpntutorial.pod rrdfirst.pod rrdgraph_rpn.pod rrdtool.pod rrdcached.pod \ rrd-beginners.pod rrdinfo.pod rrdtune.pod rrdbuild.pod \ rrdcgi.pod rrdgraph.pod rrdlast.pod rrdlastupdate.pod \ rrdcreate.pod rrdgraph_data.pod rrdresize.pod rrdtutorial.pod diff --git a/doc/rrdcached.pod b/doc/rrdcached.pod new file mode 100644 index 0000000..297f009 --- /dev/null +++ b/doc/rrdcached.pod @@ -0,0 +1,64 @@ +=pod + +=head1 NAME + +rrdcached - Data caching daemon for rrdtool + +=head1 SYNOPSIS + +B [B<-l> I
] [B<-w> I] [B<-f> I] + +=head1 DESCRIPTION + +B is a daemon that receives updates to existing RRD files, +accumulates them and, if enough have been received or a defined time has +passed, writes the updates to the RRD file. A I command may be used to +force writing of values to disk, so that graphing facilities and similar can +work with up-to-date data. + +=head1 OPTIONS + +=over 4 + +=item B<-l> I
+ +Tells the daemon to bind to I
and accept incoming connections on that +socket. If I
begins with C, everything following that prefix is +interpreted as the path to a UNIX domain socket. Otherwise the address or node +name are resolved using L. + +If the B<-l> option is not specified the default address, +C, will be used. + +=item B<-w> I + +Data is written to disk every I seconds. If this option is not +specified the default interval of 300Eseconds will be used. + +=item B<-f> I + +Every I seconds the entire cache is searched for old values which are +written to disk. This only concerns files to which updates have stopped, so +setting this to a high value, such as 3600Eseconds, is acceptable in most +cases. This timeout defaults to 3600Eseconds. + +=back + +=head1 BUGS + +=over 4 + +=item + +Base directory is currently hard coded. The daemon will chdir to C. + +=back + +=head1 SEE ALSO + +L, L + +=head1 AUHOR + +B and this manual page have been written by Florian Forster +EoctoEatEverplant.orgE. diff --git a/doc/rrdfetch.pod b/doc/rrdfetch.pod index 51b5ccd..9ef417a 100644 --- a/doc/rrdfetch.pod +++ b/doc/rrdfetch.pod @@ -48,6 +48,17 @@ the end of the time series in seconds since epoch. See also AT-STYLE TIME SPECIFICATION section for a detailed explanation of how to specify the end time. +=item B<--daemon> I
+ +Address of the L daemon. If specified, a C command is sent +to the server before reading the RRD files. This allows B to return +fresh data even if the daemon is configured to cache values for a long time. To +specify a UNIX domain socket use the prefix C, see example below. Other +addresses are interpreted as normal network addresses, i.Ee. IPv4 or IPv6 +addresses in most cases. + + rrdtool fetch --daemon unix:/var/run/rrdcached.sock /var/lib/rrd/foo.rrd AVERAGE + =back =head2 RESOLUTION INTERVAL @@ -257,6 +268,21 @@ I<931225537> -- 18:45 July 5th, 1999 I<19970703 12:45> -- 12:45 July 3th, 1997 (my favorite, and its even got an ISO number (8601)). +=head1 ENVIRONMENT VARIABLES + +The following environment variables may be used to change the behavior of +Cfetch>: + +=over 4 + +=item B + +If this environment variable is set it will have the same effect as specifying +the C<--daemon> option on the command line. If both are present, the command +line argument takes precedence. + +=back + =head1 AUTHOR Tobias Oetiker diff --git a/doc/rrdgraph.pod b/doc/rrdgraph.pod index 092e37f..99b2e6b 100644 --- a/doc/rrdgraph.pod +++ b/doc/rrdgraph.pod @@ -248,7 +248,7 @@ to the more robust B<--alt-y-grid> mode. How many digits should rrdtool assume the y-axis labels to be? You may have to use this option to make enough space once you start -fideling with the y-axis labeling. +fiddling with the y-axis labeling. [B<--units=si>] @@ -267,6 +267,17 @@ Note, that only the image size will be returned, if you run with lazy even when using graphv. +[B<--daemon> I
] + +Address of the L daemon. If specified, a C command is sent +to the server before reading the RRD files. This allows the graph to contain +fresh data even if the daemon is configured to cache values for a long time. To +specify a UNIX domain socket use the prefix C, see example below. Other +addresses are interpreted as normal network addresses, i.Ee. IPv4 or IPv6 +addresses in most cases. + + rrdtool graph [...] --daemon unix:/var/run/rrdcached.sock [...] + [B<-f>|B<--imginfo> I] After the image has been created, the graph function uses printf @@ -441,8 +452,6 @@ at least one print statement to generate a report. See L for the exact format. -=back - =head2 graphv Calling rrdtool with the graphv option will return information in the @@ -471,6 +480,21 @@ There is more information returned than in the standard interface. Especially the 'graph_*' keys are new. They help applications that want to know what is where on the graph. +=head1 ENVIRONMENT VARIABLES + +The following environment variables may be used to change the behavior of +Cgraph>: + +=over 4 + +=item B + +If this environment variable is set it will have the same effect as specifying +the C<--daemon> option on the command line. If both are present, the command +line argument takes precedence. + +=back + =head1 SEE ALSO L gives an overview of how B works. diff --git a/doc/rrdupdate.pod b/doc/rrdupdate.pod index cc0b452..d61cf12 100644 --- a/doc/rrdupdate.pod +++ b/doc/rrdupdate.pod @@ -6,6 +6,7 @@ rrdupdate - Store a new set of values into the RRD B {B | B} I S<[B<--template>|B<-t> I[B<:>I]...]> +S<[B<--daemon> I
]> S|IB<:>I[B<:>I...]> SB<@>I[B<:>I...]> S<[IB<:>I[B<:>I...] ...]> @@ -29,6 +30,9 @@ RRA (consolidation function and PDPs per CDP), and data source (name). Note that depending on the arguments of the current and previous call to update, the list may have no entries or a large number of entries. +Since B requires direct disk access, the B<--daemon> option cannot be +used with this command. + =item I The name of the B you want to update. @@ -56,6 +60,18 @@ function. If this is done accidentally (and this can only be done using the template switch), B will ignore the value specified for the COMPUTE B. +=item B<--daemon> I
+ +If given, B will try to connect to the caching daemon L +at I
and will fail if the connection cannot be established. If the +connection is successfully established the values will be sent to the daemon +instead of accessing the files directly. If I
begins with C +then everything after this prefix will be considered to be a UNIX domain +socket, see L below. Otherwise the address is interpreted as network +address or node name as understood by L. One practical +consequence is that both, IPv4 and IPv6, may be used if the system supports +it. This option is available for the B command only. + =item B|IB<:>I[B<:>I...] The data used for updating the RRD was acquired at a certain @@ -82,20 +98,65 @@ separator. =back -=head1 EXAMPLE +=head1 ENVIRONMENT VARIABLES + +The following environment variables may be used to change the behavior of +Cupdate>: + +=over 4 + +=item B + +If this environment variable is set it will have the same effect as specifying +the C<--daemon> option on the command line. If both are present, the command +line argument takes precedence. + +=back + +=head1 EXAMPLES + +=over 4 + +=item C Update the database file demo1.rrd with 3 known and one I<*UNKNOWN*> value. Use the current time as the update time. +=item + C Update the database file demo2.rrd which expects data from a single data-source, three times. First with an I<*UNKNOWN*> value then with two regular readings. The update interval seems to be around 300 seconds. -=head1 AUTHOR +=item + +C + +Update the file C with a single data source, using the +current time. If the caching daemon cannot be reached, do B fall back to +direct file access. + +=item + +C + +Use the UNIX domain socket C to contact the caching daemon. If +the caching daemon is not available, update the file C directly. +B Since a relative path is specified, the following disturbing effect +may occur: If the daemon is available, the file relative to the working +directory B is used. If the daemon is not available, the file +relative to the current working directory of the invoking process is used. +B Don't do relative paths, kids! + +=back + +=head1 AUTHORS -Tobias Oetiker +Tobias Oetiker , +Florian Forster atEverplant.org> diff --git a/src/Makefile.am b/src/Makefile.am index c567679..7b97fff 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -25,6 +25,7 @@ UPD_C_FILES = \ rrd_info.c \ rrd_error.c \ rrd_open.c \ + rrd_client.c \ rrd_nan_inf.c \ rrd_rpncalc.c \ rrd_update.c @@ -81,7 +82,7 @@ librrd_th_la_LIBADD = $(ALL_LIBS) include_HEADERS = rrd.h rrd_format.h -bin_PROGRAMS = rrdtool rrdupdate +bin_PROGRAMS = rrdtool rrdupdate rrdcached if BUILD_RRDCGI bin_PROGRAMS += rrdcgi @@ -97,6 +98,11 @@ rrdtool_SOURCES = rrd_tool.c rrdtool_DEPENDENCIES = librrd.la rrdtool_LDADD = librrd.la +rrdcached_SOURCES = rrd_daemon.c +rrdcached_DEPENDENCIES = librrd.la +rrdcached_CPPFLAGS = -DVERSION='"$(VERSION)"' -DLOCALSTATEDIR='"$(localstatedir)"' +rrdcached_LDADD = librrd.la + # strftime is here because we do not usually need it. unices have propper # iso date support EXTRA_DIST= strftime.c strftime.h \ diff --git a/src/rrd.h b/src/rrd.h index 31fd468..daed0e2 100644 --- a/src/rrd.h +++ b/src/rrd.h @@ -217,15 +217,16 @@ extern "C" { const char *_template, int argc, const char **argv); - int rrd_fetch_r( - const char *filename, - const char *cf, - time_t *start, - time_t *end, - unsigned long *step, - unsigned long *ds_cnt, - char ***ds_namv, - rrd_value_t **data); + int rrd_fetch_r ( + const char *filename, + const char *cf, + time_t *start, + time_t *end, + unsigned long *step, + const char *daemon, + unsigned long *ds_cnt, + char ***ds_namv, + rrd_value_t **data); int rrd_dump_r( const char *filename, char *outname); diff --git a/src/rrd_client.c b/src/rrd_client.c new file mode 100644 index 0000000..d9a7468 --- /dev/null +++ b/src/rrd_client.c @@ -0,0 +1,432 @@ +/** + * RRDTool - src/rrd_client.c + * Copyright (C) 2008 Florian octo Forster + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; only version 2 of the License is applicable. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * Authors: + * Florian octo Forster + **/ + +#include "rrd.h" +#include "rrd_client.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; +static int sd = -1; + +static ssize_t sread (void *buffer_void, size_t buffer_size) /* {{{ */ +{ + char *buffer; + size_t buffer_used; + size_t buffer_free; + ssize_t status; + + buffer = (char *) buffer_void; + buffer_used = 0; + buffer_free = buffer_size; + + while (buffer_free > 0) + { + status = read (sd, buffer + buffer_used, buffer_free); + if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR))) + continue; + + if (status < 0) + return (-1); + + if (status == 0) + { + close (sd); + sd = -1; + errno = EPROTO; + return (-1); + } + + assert ((0 > status) || (buffer_free >= (size_t) status)); + + buffer_free = buffer_free - status; + buffer_used = buffer_used + status; + + if (buffer[buffer_used - 1] == '\n') + break; + } + + if (buffer[buffer_used - 1] != '\n') + { + errno = ENOBUFS; + return (-1); + } + + buffer[buffer_used - 1] = 0; + return (buffer_used); +} /* }}} ssize_t sread */ + +static ssize_t swrite (const void *buf, size_t count) /* {{{ */ +{ + const char *ptr; + size_t nleft; + ssize_t status; + + ptr = (const char *) buf; + nleft = count; + + while (nleft > 0) + { + status = write (sd, (const void *) ptr, nleft); + + if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR))) + continue; + + if (status < 0) + return (status); + + nleft = nleft - status; + ptr = ptr + status; + } + + return (0); +} /* }}} ssize_t swrite */ + +static int buffer_add_string (const char *str, /* {{{ */ + char **buffer_ret, size_t *buffer_size_ret) +{ + char *buffer; + size_t buffer_size; + size_t buffer_pos; + size_t i; + int status; + + buffer = *buffer_ret; + buffer_size = *buffer_size_ret; + buffer_pos = 0; + + i = 0; + status = -1; + while (buffer_pos < buffer_size) + { + if (str[i] == 0) + { + buffer[buffer_pos] = ' '; + buffer_pos++; + status = 0; + break; + } + else if ((str[i] == ' ') || (str[i] == '\\')) + { + if (buffer_pos >= (buffer_size - 1)) + break; + buffer[buffer_pos] = '\\'; + buffer_pos++; + buffer[buffer_pos] = str[i]; + buffer_pos++; + } + else + { + buffer[buffer_pos] = str[i]; + buffer_pos++; + } + i++; + } /* while (buffer_pos < buffer_size) */ + + if (status != 0) + return (-1); + + *buffer_ret = buffer + buffer_pos; + *buffer_size_ret = buffer_size - buffer_pos; + + return (0); +} /* }}} int buffer_add_string */ + +static int buffer_add_value (const char *value, /* {{{ */ + char **buffer_ret, size_t *buffer_size_ret) +{ + char temp[4096]; + + if (strncmp (value, "N:", 2) == 0) + snprintf (temp, sizeof (temp), "%lu:%s", + (unsigned long) time (NULL), value + 2); + else + strncpy (temp, value, sizeof (temp)); + temp[sizeof (temp) - 1] = 0; + + return (buffer_add_string (temp, buffer_ret, buffer_size_ret)); +} /* }}} int buffer_add_value */ + +static int rrdc_connect_unix (const char *path) /* {{{ */ +{ + struct sockaddr_un sa; + int status; + + assert (path != NULL); + + pthread_mutex_lock (&lock); + + if (sd >= 0) + { + pthread_mutex_unlock (&lock); + return (0); + } + + sd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0); + if (sd < 0) + { + status = errno; + pthread_mutex_unlock (&lock); + return (status); + } + + memset (&sa, 0, sizeof (sa)); + sa.sun_family = AF_UNIX; + strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1); + + status = connect (sd, (struct sockaddr *) &sa, sizeof (sa)); + if (status != 0) + { + status = errno; + pthread_mutex_unlock (&lock); + return (status); + } + + pthread_mutex_unlock (&lock); + + return (0); +} /* }}} int rrdc_connect_unix */ + +int rrdc_connect (const char *addr) /* {{{ */ +{ + struct addrinfo ai_hints; + struct addrinfo *ai_res; + struct addrinfo *ai_ptr; + int status; + + if (addr == NULL) + addr = RRDCACHED_DEFAULT_ADDRESS; + + if (strncmp ("unix:", addr, strlen ("unix:")) == 0) + return (rrdc_connect_unix (addr + strlen ("unix:"))); + else if (addr[0] == '/') + return (rrdc_connect_unix (addr)); + + pthread_mutex_lock (&lock); + + if (sd >= 0) + { + pthread_mutex_unlock (&lock); + return (0); + } + + memset (&ai_hints, 0, sizeof (ai_hints)); + ai_hints.ai_flags = 0; +#ifdef AI_ADDRCONFIG + ai_hints.ai_flags |= AI_ADDRCONFIG; +#endif + ai_hints.ai_family = AF_UNSPEC; + ai_hints.ai_socktype = SOCK_STREAM; + + ai_res = NULL; + status = getaddrinfo (addr, RRDCACHED_DEFAULT_PORT, &ai_hints, &ai_res); + if (status != 0) + { + pthread_mutex_unlock (&lock); + return (status); + } + + for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next) + { + sd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol); + if (sd < 0) + { + status = errno; + sd = -1; + continue; + } + + status = connect (sd, ai_ptr->ai_addr, ai_ptr->ai_addrlen); + if (status != 0) + { + status = errno; + close (sd); + sd = -1; + continue; + } + + assert (status == 0); + break; + } /* for (ai_ptr) */ + pthread_mutex_unlock (&lock); + + return (status); +} /* }}} int rrdc_connect */ + +int rrdc_disconnect (void) /* {{{ */ +{ + pthread_mutex_lock (&lock); + + if (sd < 0) + { + pthread_mutex_unlock (&lock); + return (0); + } + + close (sd); + sd = -1; + + pthread_mutex_unlock (&lock); + + return (0); +} /* }}} int rrdc_disconnect */ + +int rrdc_update (const char *filename, int values_num, /* {{{ */ + const char * const *values) +{ + char buffer[4096]; + char *buffer_ptr; + size_t buffer_free; + size_t buffer_size; + int status; + int i; + + memset (buffer, 0, sizeof (buffer)); + buffer_ptr = &buffer[0]; + buffer_free = sizeof (buffer); + + status = buffer_add_string ("update", &buffer_ptr, &buffer_free); + if (status != 0) + return (ENOBUFS); + + status = buffer_add_string (filename, &buffer_ptr, &buffer_free); + if (status != 0) + return (ENOBUFS); + + for (i = 0; i < values_num; i++) + { + status = buffer_add_value (values[i], &buffer_ptr, &buffer_free); + if (status != 0) + return (ENOBUFS); + } + + assert (buffer_free < sizeof (buffer)); + buffer_size = sizeof (buffer) - buffer_free; + assert (buffer[buffer_size - 1] == ' '); + buffer[buffer_size - 1] = '\n'; + + pthread_mutex_lock (&lock); + + if (sd < 0) + { + pthread_mutex_unlock (&lock); + return (ENOTCONN); + } + + status = swrite (buffer, buffer_size); + if (status != 0) + { + pthread_mutex_unlock (&lock); + return (status); + } + + status = sread (buffer, sizeof (buffer)); + if (status < 0) + { + status = errno; + pthread_mutex_unlock (&lock); + return (status); + } + else if (status == 0) + { + pthread_mutex_unlock (&lock); + return (ENODATA); + } + + pthread_mutex_unlock (&lock); + + status = atoi (buffer); + return (status); +} /* }}} int rrdc_update */ + +int rrdc_flush (const char *filename) /* {{{ */ +{ + char buffer[4096]; + char *buffer_ptr; + size_t buffer_free; + size_t buffer_size; + int status; + + if (filename == NULL) + return (-1); + + memset (buffer, 0, sizeof (buffer)); + buffer_ptr = &buffer[0]; + buffer_free = sizeof (buffer); + + status = buffer_add_string ("flush", &buffer_ptr, &buffer_free); + if (status != 0) + return (ENOBUFS); + + status = buffer_add_string (filename, &buffer_ptr, &buffer_free); + if (status != 0) + return (ENOBUFS); + + assert (buffer_free < sizeof (buffer)); + buffer_size = sizeof (buffer) - buffer_free; + assert (buffer[buffer_size - 1] == ' '); + buffer[buffer_size - 1] = '\n'; + + pthread_mutex_lock (&lock); + + if (sd < 0) + { + pthread_mutex_unlock (&lock); + return (ENOTCONN); + } + + status = swrite (buffer, buffer_size); + if (status != 0) + { + pthread_mutex_unlock (&lock); + return (status); + } + + status = sread (buffer, sizeof (buffer)); + if (status < 0) + { + status = errno; + pthread_mutex_unlock (&lock); + return (status); + } + else if (status == 0) + { + pthread_mutex_unlock (&lock); + return (ENODATA); + } + + pthread_mutex_unlock (&lock); + + status = atoi (buffer); + return (status); +} /* }}} int rrdc_flush */ + +/* + * vim: set sw=2 sts=2 ts=8 et fdm=marker : + */ diff --git a/src/rrd_client.h b/src/rrd_client.h new file mode 100644 index 0000000..92d4c07 --- /dev/null +++ b/src/rrd_client.h @@ -0,0 +1,40 @@ +/** + * RRDTool - src/rrd_client.h + * Copyright (C) 2008 Florian octo Forster + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; only version 2 of the License is applicable. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * Authors: + * Florian octo Forster + **/ + +#ifndef __RRD_CLIENT_H +#define __RRD_CLIENT_H 1 + +#ifndef RRDCACHED_DEFAULT_ADDRESS +# define RRDCACHED_DEFAULT_ADDRESS "unix:/tmp/rrdcached.sock" +#endif + +#define RRDCACHED_DEFAULT_PORT "42217" +#define ENV_RRDCACHED_ADDRESS "RRDCACHED_ADDRESS" + +int rrdc_connect (const char *addr); +int rrdc_disconnect (void); + +int rrdc_update (const char *filename, int values_num, + const char * const *values); + +int rrdc_flush (const char *filename); + +#endif /* __RRD_CLIENT_H */ diff --git a/src/rrd_daemon.c b/src/rrd_daemon.c new file mode 100644 index 0000000..87ac8e9 --- /dev/null +++ b/src/rrd_daemon.c @@ -0,0 +1,1350 @@ +/** + * RRDTool - src/rrd_daemon.c + * Copyright (C) 2008 Florian octo Forster + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; only version 2 of the License is applicable. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * Authors: + * Florian octo Forster + **/ + +/* + * First tell the compiler to stick to the C99 and POSIX standards as close as + * possible. + */ +#ifndef __STRICT_ANSI__ /* {{{ */ +# define __STRICT_ANSI__ +#endif + +#ifndef _ISOC99_SOURCE +# define _ISOC99_SOURCE +#endif + +#ifdef _POSIX_C_SOURCE +# undef _POSIX_C_SOURCE +#endif +#define _POSIX_C_SOURCE 200112L + +/* Single UNIX needed for strdup. */ +#ifdef _XOPEN_SOURCE +# undef _XOPEN_SOURCE +#endif +#define _XOPEN_SOURCE 500 + +#ifndef _REENTRANT +# define _REENTRANT +#endif + +#ifndef _THREAD_SAFE +# define _THREAD_SAFE +#endif + +#ifdef _GNU_SOURCE +# undef _GNU_SOURCE +#endif +/* }}} */ + +/* + * Now for some includes.. + */ +#include "rrd.h" /* {{{ */ +#include "rrd_client.h" + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +/* }}} */ + +#define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__) + +#ifndef __GNUC__ +# define __attribute__(x) /**/ +#endif + +/* + * Types + */ +struct listen_socket_s +{ + int fd; + char path[PATH_MAX + 1]; +}; +typedef struct listen_socket_s listen_socket_t; + +struct cache_item_s; +typedef struct cache_item_s cache_item_t; +struct cache_item_s +{ + char *file; + char **values; + int values_num; + time_t last_flush_time; +#define CI_FLAGS_IN_TREE 0x01 +#define CI_FLAGS_IN_QUEUE 0x02 + int flags; + + cache_item_t *next; +}; + +enum queue_side_e +{ + HEAD, + TAIL +}; +typedef enum queue_side_e queue_side_t; + +/* + * Variables + */ +static listen_socket_t *listen_fds = NULL; +static size_t listen_fds_num = 0; + +static int do_shutdown = 0; + +static pthread_t queue_thread; + +static pthread_t *connetion_threads = NULL; +static pthread_mutex_t connetion_threads_lock = PTHREAD_MUTEX_INITIALIZER; +static int connetion_threads_num = 0; + +/* Cache stuff */ +static GTree *cache_tree = NULL; +static cache_item_t *cache_queue_head = NULL; +static cache_item_t *cache_queue_tail = NULL; +static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t cache_cond = PTHREAD_COND_INITIALIZER; + +static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER; + +static int config_write_interval = 300; +static int config_flush_interval = 3600; +static char *config_pid_file = NULL; +static char *config_base_dir = NULL; + +static char **config_listen_address_list = NULL; +static int config_listen_address_list_len = 0; + +/* + * Functions + */ +static void sig_int_handler (int s __attribute__((unused))) /* {{{ */ +{ + do_shutdown++; +} /* }}} void sig_int_handler */ + +static void sig_term_handler (int s __attribute__((unused))) /* {{{ */ +{ + do_shutdown++; +} /* }}} void sig_term_handler */ + +static int write_pidfile (void) /* {{{ */ +{ + pid_t pid; + char *file; + FILE *fh; + + pid = getpid (); + + file = (config_pid_file != NULL) + ? config_pid_file + : LOCALSTATEDIR "/run/rrdcached.pid"; + + fh = fopen (file, "w"); + if (fh == NULL) + { + RRDD_LOG (LOG_ERR, "write_pidfile: Opening `%s' failed.", file); + return (-1); + } + + fprintf (fh, "%i\n", (int) pid); + fclose (fh); + + return (0); +} /* }}} int write_pidfile */ + +static int remove_pidfile (void) /* {{{ */ +{ + char *file; + int status; + + file = (config_pid_file != NULL) + ? config_pid_file + : LOCALSTATEDIR "/run/rrdcached.pid"; + + status = unlink (file); + if (status == 0) + return (0); + return (errno); +} /* }}} int remove_pidfile */ + +/* + * enqueue_cache_item: + * `cache_lock' must be acquired before calling this function! + */ +static int enqueue_cache_item (cache_item_t *ci, /* {{{ */ + queue_side_t side) +{ + RRDD_LOG (LOG_DEBUG, "enqueue_cache_item: Adding %s to the update queue.", + ci->file); + + if (ci == NULL) + return (-1); + + if (ci->values_num == 0) + return (0); + + if (side == HEAD) + { + if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) + { + assert (ci->next == NULL); + ci->next = cache_queue_head; + cache_queue_head = ci; + + if (cache_queue_tail == NULL) + cache_queue_tail = cache_queue_head; + } + else if (cache_queue_head == ci) + { + /* do nothing */ + } + else /* enqueued, but not first entry */ + { + cache_item_t *prev; + + /* find previous entry */ + for (prev = cache_queue_head; prev != NULL; prev = prev->next) + if (prev->next == ci) + break; + assert (prev != NULL); + + /* move to the front */ + prev->next = ci->next; + ci->next = cache_queue_head; + cache_queue_head = ci; + + /* check if we need to adapt the tail */ + if (cache_queue_tail == ci) + cache_queue_tail = prev; + } + } + else /* (side == TAIL) */ + { + /* We don't move values back in the list.. */ + if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0) + return (0); + + assert (ci->next == NULL); + + if (cache_queue_tail == NULL) + cache_queue_head = ci; + else + cache_queue_tail->next = ci; + cache_queue_tail = ci; + } + + ci->flags |= CI_FLAGS_IN_QUEUE; + + return (0); +} /* }}} int enqueue_cache_item */ + +/* + * tree_callback_flush: + * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held + * while this is in progress. + */ +static gboolean tree_callback_flush (gpointer key /* {{{ */ + __attribute__((unused)), gpointer value, gpointer data) +{ + cache_item_t *ci; + time_t now; + + key = NULL; /* make compiler happy */ + + ci = (cache_item_t *) value; + now = *((time_t *) data); + + if (((now - ci->last_flush_time) >= config_write_interval) + && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) + && (ci->values_num > 0)) + enqueue_cache_item (ci, TAIL); + + return (TRUE); +} /* }}} gboolean tree_callback_flush */ + +static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ +{ + struct timeval now; + struct timespec next_flush; + + gettimeofday (&now, NULL); + next_flush.tv_sec = now.tv_sec + config_flush_interval; + next_flush.tv_nsec = 1000 * now.tv_usec; + + pthread_mutex_lock (&cache_lock); + while ((do_shutdown == 0) || (cache_queue_head != NULL)) + { + cache_item_t *ci; + char *file; + char **values; + int values_num; + int status; + int i; + + /* First, check if it's time to do the cache flush. */ + gettimeofday (&now, NULL); + if ((now.tv_sec > next_flush.tv_sec) + || ((now.tv_sec == next_flush.tv_sec) + && ((1000 * now.tv_usec) > next_flush.tv_nsec))) + { + time_t time_now; + + /* Pass the current time as user data so that we don't need to call + * `time' for each node. */ + time_now = time (NULL); + + g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &time_now); + + /* Determine the time of the next cache flush. */ + while (next_flush.tv_sec < now.tv_sec) + next_flush.tv_sec += config_flush_interval; + } + + /* Now, check if there's something to store away. If not, wait until + * something comes in or it's time to do the cache flush. */ + if (cache_queue_head == NULL) + { + status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush); + if ((status != 0) && (status != ETIMEDOUT)) + { + RRDD_LOG (LOG_ERR, "queue_thread_main: " + "pthread_cond_timedwait returned %i.", status); + } + } + + /* Check if a value has arrived. This may be NULL if we timed out or there + * was an interrupt such as a signal. */ + if (cache_queue_head == NULL) + continue; + + ci = cache_queue_head; + + /* copy the relevant parts */ + file = strdup (ci->file); + if (file == NULL) + { + RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed."); + continue; + } + + values = ci->values; + values_num = ci->values_num; + + ci->values = NULL; + ci->values_num = 0; + + ci->last_flush_time = time (NULL); + ci->flags &= ~(CI_FLAGS_IN_QUEUE); + + cache_queue_head = ci->next; + if (cache_queue_head == NULL) + cache_queue_tail = NULL; + ci->next = NULL; + + pthread_mutex_unlock (&cache_lock); + + RRDD_LOG (LOG_DEBUG, "queue_thread_main: rrd_update (%s, %i, %p)", + file, values_num, (void *) values); + + status = rrd_update_r (file, NULL, values_num, (void *) values); + if (status != 0) + { + RRDD_LOG (LOG_ERR, "queue_thread_main: " + "rrd_update_r failed with status %i.", + status); + } + + free (file); + for (i = 0; i < values_num; i++) + free (values[i]); + + pthread_mutex_lock (&cache_lock); + pthread_cond_broadcast (&flush_cond); + } /* while (do_shutdown == 0) */ + pthread_mutex_unlock (&cache_lock); + + RRDD_LOG (LOG_DEBUG, "queue_thread_main: Exiting."); + + return (NULL); +} /* }}} void *queue_thread_main */ + +static int buffer_get_field (char **buffer_ret, /* {{{ */ + size_t *buffer_size_ret, char **field_ret) +{ + char *buffer; + size_t buffer_pos; + size_t buffer_size; + char *field; + size_t field_size; + int status; + + buffer = *buffer_ret; + buffer_pos = 0; + buffer_size = *buffer_size_ret; + field = *buffer_ret; + field_size = 0; + + /* This is ensured by `handle_request'. */ + assert (buffer[buffer_size - 1] == ' '); + + status = -1; + while (buffer_pos < buffer_size) + { + /* Check for end-of-field or end-of-buffer */ + if (buffer[buffer_pos] == ' ') + { + field[field_size] = 0; + field_size++; + buffer_pos++; + status = 0; + break; + } + /* Handle escaped characters. */ + else if (buffer[buffer_pos] == '\\') + { + if (buffer_pos >= (buffer_size - 1)) + break; + buffer_pos++; + field[field_size] = buffer[buffer_pos]; + field_size++; + buffer_pos++; + } + /* Normal operation */ + else + { + field[field_size] = buffer[buffer_pos]; + field_size++; + buffer_pos++; + } + } /* while (buffer_pos < buffer_size) */ + + if (status != 0) + return (status); + + *buffer_ret = buffer + buffer_pos; + *buffer_size_ret = buffer_size - buffer_pos; + *field_ret = field; + + return (0); +} /* }}} int buffer_get_field */ + +static int flush_file (const char *filename) /* {{{ */ +{ + cache_item_t *ci; + + pthread_mutex_lock (&cache_lock); + + ci = g_tree_lookup (cache_tree, filename); + if (ci == NULL) + { + pthread_mutex_unlock (&cache_lock); + return (ENOENT); + } + + /* Enqueue at head */ + enqueue_cache_item (ci, HEAD); + pthread_cond_signal (&cache_cond); + + while ((ci->flags & CI_FLAGS_IN_QUEUE) != 0) + { + ci = NULL; + + pthread_cond_wait (&flush_cond, &cache_lock); + + ci = g_tree_lookup (cache_tree, filename); + if (ci == NULL) + { + RRDD_LOG (LOG_ERR, "flush_file: Tree node went away " + "while waiting for flush."); + pthread_mutex_unlock (&cache_lock); + return (-1); + } + } + + pthread_mutex_unlock (&cache_lock); + return (0); +} /* }}} int flush_file */ + +static int handle_request_flush (int fd, /* {{{ */ + char *buffer, size_t buffer_size) +{ + char *file; + int status; + char result[4096]; + + status = buffer_get_field (&buffer, &buffer_size, &file); + if (status != 0) + { + RRDD_LOG (LOG_INFO, "handle_request_flush: Cannot get file name."); + return (-1); + } + + status = flush_file (file); + if (status == 0) + snprintf (result, sizeof (result), "0 Successfully flushed %s.\n", file); + else if (status == ENOENT) + snprintf (result, sizeof (result), "-1 No such file: %s.\n", file); + else if (status < 0) + strncpy (result, "-1 Internal error.\n", sizeof (result)); + else + snprintf (result, sizeof (result), "-1 Failed with status %i.\n", status); + result[sizeof (result) - 1] = 0; + + status = write (fd, result, strlen (result)); + if (status < 0) + { + status = errno; + RRDD_LOG (LOG_INFO, "handle_request_flush: write(2) returned an error."); + return (status); + } + + return (0); +} /* }}} int handle_request_flush */ + +static int handle_request_update (int fd, /* {{{ */ + char *buffer, size_t buffer_size) +{ + char *file; + int values_num = 0; + int status; + + time_t now; + + cache_item_t *ci; + char answer[4096]; + + now = time (NULL); + + status = buffer_get_field (&buffer, &buffer_size, &file); + if (status != 0) + { + RRDD_LOG (LOG_INFO, "handle_request_update: Cannot get file name."); + return (-1); + } + + pthread_mutex_lock (&cache_lock); + + ci = g_tree_lookup (cache_tree, file); + if (ci == NULL) /* {{{ */ + { + ci = (cache_item_t *) malloc (sizeof (cache_item_t)); + if (ci == NULL) + { + pthread_mutex_unlock (&cache_lock); + RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed."); + return (-1); + } + memset (ci, 0, sizeof (cache_item_t)); + + ci->file = strdup (file); + if (ci->file == NULL) + { + pthread_mutex_unlock (&cache_lock); + RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed."); + free (ci); + return (-1); + } + + ci->values = NULL; + ci->values_num = 0; + ci->last_flush_time = now; + ci->flags = CI_FLAGS_IN_TREE; + + g_tree_insert (cache_tree, (void *) ci->file, (void *) ci); + + RRDD_LOG (LOG_DEBUG, "handle_request_update: Created new tree node %s.", + ci->file); + } /* }}} */ + assert (ci != NULL); + + while (buffer_size > 0) + { + char **temp; + char *value; + + status = buffer_get_field (&buffer, &buffer_size, &value); + if (status != 0) + { + RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field."); + break; + } + + temp = (char **) realloc (ci->values, + sizeof (char *) * (ci->values_num + 1)); + if (temp == NULL) + { + RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed."); + continue; + } + ci->values = temp; + + ci->values[ci->values_num] = strdup (value); + if (ci->values[ci->values_num] == NULL) + { + RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed."); + continue; + } + ci->values_num++; + + values_num++; + } + + if (((now - ci->last_flush_time) >= config_write_interval) + && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) + && (ci->values_num > 0)) + { + enqueue_cache_item (ci, TAIL); + pthread_cond_signal (&cache_cond); + } + + pthread_mutex_unlock (&cache_lock); + + snprintf (answer, sizeof (answer), "0 Enqueued %i value(s)\n", values_num); + answer[sizeof (answer) - 1] = 0; + + status = write (fd, answer, strlen (answer)); + if (status < 0) + { + status = errno; + RRDD_LOG (LOG_INFO, "handle_request_update: write(2) returned an error."); + return (status); + } + + return (0); +} /* }}} int handle_request_update */ + +static int handle_request (int fd) /* {{{ */ +{ + char buffer[4096]; + size_t buffer_size; + char *buffer_ptr; + char *command; + int status; + + status = read (fd, buffer, sizeof (buffer)); + if (status == 0) + { + return (1); + } + else if (status < 0) + { + RRDD_LOG (LOG_ERR, "handle_request: read(2) failed."); + return (-1); + } + buffer_size = status; + assert (((size_t) buffer_size) <= sizeof (buffer)); + + if (buffer[buffer_size - 1] != '\n') + { + RRDD_LOG (LOG_INFO, "handle_request: malformed request."); + return (-1); + } + + /* Accept Windows style line endings, too */ + if ((buffer_size > 2) && (buffer[buffer_size - 2] == '\r')) + { + buffer_size--; + buffer[buffer_size - 1] = '\n'; + } + + /* Place the normal field separator at the end to simplify + * `buffer_get_field's work. */ + buffer[buffer_size - 1] = ' '; + + buffer_ptr = buffer; + command = NULL; + status = buffer_get_field (&buffer_ptr, &buffer_size, &command); + if (status != 0) + { + RRDD_LOG (LOG_INFO, "handle_request: Unable parse command."); + return (-1); + } + + if (strcmp (command, "update") == 0) + { + return (handle_request_update (fd, buffer_ptr, buffer_size)); + } + else if (strcmp (command, "flush") == 0) + { + return (handle_request_flush (fd, buffer_ptr, buffer_size)); + } + else + { + RRDD_LOG (LOG_INFO, "handle_request: unknown command: %s.", buffer); + return (-1); + } +} /* }}} int handle_request */ + +static void *connection_thread_main (void *args /* {{{ */ + __attribute__((unused))) +{ + pthread_t self; + int i; + int fd; + + fd = *((int *) args); + + pthread_mutex_lock (&connetion_threads_lock); + { + pthread_t *temp; + + temp = (pthread_t *) realloc (connetion_threads, + sizeof (pthread_t) * (connetion_threads_num + 1)); + if (temp == NULL) + { + RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed."); + } + else + { + connetion_threads = temp; + connetion_threads[connetion_threads_num] = pthread_self (); + connetion_threads_num++; + } + } + pthread_mutex_unlock (&connetion_threads_lock); + + while (do_shutdown == 0) + { + struct pollfd pollfd; + int status; + + pollfd.fd = fd; + pollfd.events = POLLIN | POLLPRI; + pollfd.revents = 0; + + status = poll (&pollfd, 1, /* timeout = */ 500); + if (status == 0) /* timeout */ + continue; + else if (status < 0) /* error */ + { + status = errno; + if (status == EINTR) + continue; + RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed."); + continue; + } + + if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */ + { + close (fd); + break; + } + else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0) + { + RRDD_LOG (LOG_WARNING, "connection_thread_main: " + "poll(2) returned something unexpected: %#04hx", + pollfd.revents); + close (fd); + break; + } + + status = handle_request (fd); + if (status != 0) + { + close (fd); + break; + } + } + + self = pthread_self (); + /* Remove this thread from the connection threads list */ + pthread_mutex_lock (&connetion_threads_lock); + /* Find out own index in the array */ + for (i = 0; i < connetion_threads_num; i++) + if (pthread_equal (connetion_threads[i], self) != 0) + break; + assert (i < connetion_threads_num); + + /* Move the trailing threads forward. */ + if (i < (connetion_threads_num - 1)) + { + memmove (connetion_threads + i, + connetion_threads + i + 1, + sizeof (pthread_t) * (connetion_threads_num - i - 1)); + } + + connetion_threads_num--; + pthread_mutex_unlock (&connetion_threads_lock); + + free (args); + return (NULL); +} /* }}} void *connection_thread_main */ + +static int open_listen_socket_unix (const char *path) /* {{{ */ +{ + int fd; + struct sockaddr_un sa; + listen_socket_t *temp; + int status; + + temp = (listen_socket_t *) realloc (listen_fds, + sizeof (listen_fds[0]) * (listen_fds_num + 1)); + if (temp == NULL) + { + RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed."); + return (-1); + } + listen_fds = temp; + memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0])); + + fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0); + if (fd < 0) + { + RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed."); + return (-1); + } + + memset (&sa, 0, sizeof (sa)); + sa.sun_family = AF_UNIX; + strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1); + + status = bind (fd, (struct sockaddr *) &sa, sizeof (sa)); + if (status != 0) + { + RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed."); + close (fd); + unlink (path); + return (-1); + } + + status = listen (fd, /* backlog = */ 10); + if (status != 0) + { + RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed."); + close (fd); + unlink (path); + return (-1); + } + + listen_fds[listen_fds_num].fd = fd; + snprintf (listen_fds[listen_fds_num].path, + sizeof (listen_fds[listen_fds_num].path) - 1, + "unix:%s", path); + listen_fds_num++; + + return (0); +} /* }}} int open_listen_socket_unix */ + +static int open_listen_socket (const char *addr) /* {{{ */ +{ + struct addrinfo ai_hints; + struct addrinfo *ai_res; + struct addrinfo *ai_ptr; + int status; + + assert (addr != NULL); + + if (strncmp ("unix:", addr, strlen ("unix:")) == 0) + return (open_listen_socket_unix (addr + strlen ("unix:"))); + else if (addr[0] == '/') + return (open_listen_socket_unix (addr)); + + memset (&ai_hints, 0, sizeof (ai_hints)); + ai_hints.ai_flags = 0; +#ifdef AI_ADDRCONFIG + ai_hints.ai_flags |= AI_ADDRCONFIG; +#endif + ai_hints.ai_family = AF_UNSPEC; + ai_hints.ai_socktype = SOCK_STREAM; + + ai_res = NULL; + status = getaddrinfo (addr, RRDCACHED_DEFAULT_PORT, &ai_hints, &ai_res); + if (status != 0) + { + RRDD_LOG (LOG_ERR, "open_listen_socket: getaddrinfo(%s) failed: " + "%s", addr, gai_strerror (status)); + return (-1); + } + + for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next) + { + int fd; + listen_socket_t *temp; + + temp = (listen_socket_t *) realloc (listen_fds, + sizeof (listen_fds[0]) * (listen_fds_num + 1)); + if (temp == NULL) + { + RRDD_LOG (LOG_ERR, "open_listen_socket: realloc failed."); + continue; + } + listen_fds = temp; + memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0])); + + fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol); + if (fd < 0) + { + RRDD_LOG (LOG_ERR, "open_listen_socket: socket(2) failed."); + continue; + } + + status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen); + if (status != 0) + { + RRDD_LOG (LOG_ERR, "open_listen_socket: bind(2) failed."); + close (fd); + continue; + } + + status = listen (fd, /* backlog = */ 10); + if (status != 0) + { + RRDD_LOG (LOG_ERR, "open_listen_socket: listen(2) failed."); + close (fd); + return (-1); + } + + listen_fds[listen_fds_num].fd = fd; + strncpy (listen_fds[listen_fds_num].path, addr, + sizeof (listen_fds[listen_fds_num].path) - 1); + listen_fds_num++; + } /* for (ai_ptr) */ + + return (0); +} /* }}} int open_listen_socket */ + +static int close_listen_sockets (void) /* {{{ */ +{ + size_t i; + + for (i = 0; i < listen_fds_num; i++) + { + close (listen_fds[i].fd); + if (strncmp ("unix:", listen_fds[i].path, strlen ("unix:")) == 0) + unlink (listen_fds[i].path + strlen ("unix:")); + } + + free (listen_fds); + listen_fds = NULL; + listen_fds_num = 0; + + return (0); +} /* }}} int close_listen_sockets */ + +static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */ +{ + struct pollfd *pollfds; + int pollfds_num; + int status; + int i; + + for (i = 0; i < config_listen_address_list_len; i++) + { + RRDD_LOG (LOG_DEBUG, "listen_thread_main: config_listen_address_list[%i] " + "= %s", i, config_listen_address_list[i]); + open_listen_socket (config_listen_address_list[i]); + } + + if (config_listen_address_list_len < 1) + open_listen_socket (RRDCACHED_DEFAULT_ADDRESS); + + if (listen_fds_num < 1) + { + RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets " + "could be opened. Sorry."); + return (NULL); + } + + pollfds_num = listen_fds_num; + pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num); + if (pollfds == NULL) + { + RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed."); + return (NULL); + } + memset (pollfds, 0, sizeof (*pollfds) * pollfds_num); + + while (do_shutdown == 0) + { + assert (pollfds_num == ((int) listen_fds_num)); + for (i = 0; i < pollfds_num; i++) + { + pollfds[i].fd = listen_fds[i].fd; + pollfds[i].events = POLLIN | POLLPRI; + pollfds[i].revents = 0; + } + + status = poll (pollfds, pollfds_num, /* timeout = */ -1); + if (status < 1) + { + status = errno; + if (status != EINTR) + { + RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed."); + } + continue; + } + + for (i = 0; i < pollfds_num; i++) + { + int *client_sd; + struct sockaddr_storage client_sa; + socklen_t client_sa_size; + pthread_t tid; + + if (pollfds[i].revents == 0) + continue; + + if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0) + { + RRDD_LOG (LOG_ERR, "listen_thread_main: " + "poll(2) returned something unexpected for listen FD #%i.", + pollfds[i].fd); + continue; + } + + client_sd = (int *) malloc (sizeof (int)); + if (client_sd == NULL) + { + RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed."); + continue; + } + + client_sa_size = sizeof (client_sa); + *client_sd = accept (pollfds[i].fd, + (struct sockaddr *) &client_sa, &client_sa_size); + if (*client_sd < 0) + { + RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed."); + continue; + } + + status = pthread_create (&tid, /* attr = */ NULL, connection_thread_main, + /* args = */ (void *) client_sd); + if (status != 0) + { + RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed."); + close (*client_sd); + free (client_sd); + continue; + } + } /* for (pollfds_num) */ + } /* while (do_shutdown == 0) */ + + close_listen_sockets (); + + pthread_mutex_lock (&connetion_threads_lock); + while (connetion_threads_num > 0) + { + pthread_t wait_for; + + wait_for = connetion_threads[0]; + + pthread_mutex_unlock (&connetion_threads_lock); + pthread_join (wait_for, /* retval = */ NULL); + pthread_mutex_lock (&connetion_threads_lock); + } + pthread_mutex_unlock (&connetion_threads_lock); + + RRDD_LOG (LOG_DEBUG, "listen_thread_main: Exiting."); + + return (NULL); +} /* }}} void *listen_thread_main */ + +static int daemonize (void) /* {{{ */ +{ + pid_t child; + int status; + char *base_dir; + + /* These structures are static, because `sigaction' behaves weird if the are + * overwritten.. */ + static struct sigaction sa_int; + static struct sigaction sa_term; + static struct sigaction sa_pipe; + + child = fork (); + if (child < 0) + { + fprintf (stderr, "daemonize: fork(2) failed.\n"); + return (-1); + } + else if (child > 0) + { + return (1); + } + + /* Change into the /tmp directory. */ + base_dir = (config_base_dir != NULL) + ? config_base_dir + : "/tmp"; + status = chdir (base_dir); + if (status != 0) + { + fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir); + return (-1); + } + + /* Become session leader */ + setsid (); + + /* Open the first three file descriptors to /dev/null */ + close (2); + close (1); + close (0); + + open ("/dev/null", O_RDWR); + dup (0); + dup (0); + + /* Install signal handlers */ + memset (&sa_int, 0, sizeof (sa_int)); + sa_int.sa_handler = sig_int_handler; + sigaction (SIGINT, &sa_int, NULL); + + memset (&sa_term, 0, sizeof (sa_term)); + sa_term.sa_handler = sig_term_handler; + sigaction (SIGINT, &sa_term, NULL); + + memset (&sa_pipe, 0, sizeof (sa_pipe)); + sa_pipe.sa_handler = SIG_IGN; + sigaction (SIGPIPE, &sa_pipe, NULL); + + openlog ("rrdcached", LOG_PID, LOG_DAEMON); + + cache_tree = g_tree_new ((GCompareFunc) strcmp); + if (cache_tree == NULL) + { + RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed."); + return (-1); + } + + memset (&queue_thread, 0, sizeof (queue_thread)); + status = pthread_create (&queue_thread, /* attr = */ NULL, + queue_thread_main, /* args = */ NULL); + if (status != 0) + { + RRDD_LOG (LOG_ERR, "daemonize: pthread_create failed."); + return (-1); + } + + write_pidfile (); + + return (0); +} /* }}} int daemonize */ + +static int cleanup (void) /* {{{ */ +{ + RRDD_LOG (LOG_DEBUG, "cleanup ()"); + + do_shutdown++; + + RRDD_LOG (LOG_DEBUG, "cleanup: Joining queue_thread.."); + pthread_cond_signal (&cache_cond); + pthread_join (queue_thread, /* return = */ NULL); + RRDD_LOG (LOG_DEBUG, "cleanup: done"); + + remove_pidfile (); + + closelog (); + + return (0); +} /* }}} int cleanup */ + +static int read_options (int argc, char **argv) /* {{{ */ +{ + int option; + int status = 0; + + while ((option = getopt(argc, argv, "l:f:w:b:p:h?")) != -1) + { + switch (option) + { + case 'l': + { + char **temp; + + temp = (char **) realloc (config_listen_address_list, + sizeof (char *) * (config_listen_address_list_len + 1)); + if (temp == NULL) + { + fprintf (stderr, "read_options: realloc failed.\n"); + return (2); + } + config_listen_address_list = temp; + + temp[config_listen_address_list_len] = strdup (optarg); + if (temp[config_listen_address_list_len] == NULL) + { + fprintf (stderr, "read_options: strdup failed.\n"); + return (2); + } + config_listen_address_list_len++; + } + break; + + case 'f': + { + int temp; + + temp = atoi (optarg); + if (temp > 0) + config_flush_interval = temp; + else + { + fprintf (stderr, "Invalid flush interval: %s\n", optarg); + status = 3; + } + } + break; + + case 'w': + { + int temp; + + temp = atoi (optarg); + if (temp > 0) + config_write_interval = temp; + else + { + fprintf (stderr, "Invalid write interval: %s\n", optarg); + status = 2; + } + } + break; + + case 'b': + { + size_t len; + + if (config_base_dir != NULL) + free (config_base_dir); + config_base_dir = strdup (optarg); + if (config_base_dir == NULL) + { + fprintf (stderr, "read_options: strdup failed.\n"); + return (3); + } + + len = strlen (config_base_dir); + while ((len > 0) && (config_base_dir[len - 1] == '/')) + { + config_base_dir[len - 1] = 0; + len--; + } + + if (len < 1) + { + fprintf (stderr, "Invalid base directory: %s\n", optarg); + return (4); + } + } + break; + + case 'p': + { + if (config_pid_file != NULL) + free (config_pid_file); + config_pid_file = strdup (optarg); + if (config_pid_file == NULL) + { + fprintf (stderr, "read_options: strdup failed.\n"); + return (3); + } + } + break; + + case 'h': + case '?': + printf ("RRDd %s Copyright (C) 2008 Florian octo Forster\n" + "\n" + "Usage: rrdcached [options]\n" + "\n" + "Valid options are:\n" + " -l
Socket address to listen to.\n" + " -w Interval in which to write data.\n" + " -f Interval in which to flush dead data.\n" + " -p Location of the PID-file.\n" + " -b Base directory to change to.\n" + "\n" + "For more information and a detailed description of all options " + "please refer\n" + "to the rrdcached(1) manual page.\n", + VERSION); + status = -1; + break; + } /* switch (option) */ + } /* while (getopt) */ + + return (status); +} /* }}} int read_options */ + +int main (int argc, char **argv) +{ + int status; + + status = read_options (argc, argv); + if (status != 0) + { + if (status < 0) + status = 0; + return (status); + } + + status = daemonize (); + if (status == 1) + { + struct sigaction sigchld; + + memset (&sigchld, 0, sizeof (sigchld)); + sigchld.sa_handler = SIG_IGN; + sigaction (SIGCHLD, &sigchld, NULL); + + return (0); + } + else if (status != 0) + { + fprintf (stderr, "daemonize failed, exiting.\n"); + return (1); + } + + listen_thread_main (NULL); + + cleanup (); + + return (0); +} /* int main */ + +/* + * vim: set sw=2 sts=2 ts=8 et fdm=marker : + */ diff --git a/src/rrd_fetch.c b/src/rrd_fetch.c index 4ea2eb1..26c3324 100644 --- a/src/rrd_fetch.c +++ b/src/rrd_fetch.c @@ -53,6 +53,7 @@ *****************************************************************************/ #include "rrd_tool.h" +#include "rrd_client.h" #include "rrd_is_thread_safe.h" /*#define DEBUG*/ @@ -72,6 +73,7 @@ int rrd_fetch( long step_tmp = 1; time_t start_tmp = 0, end_tmp = 0; const char *cf; + char *daemon = NULL; rrd_time_value_t start_tv, end_tv; char *parsetime_error = NULL; @@ -79,6 +81,7 @@ int rrd_fetch( {"resolution", required_argument, 0, 'r'}, {"start", required_argument, 0, 's'}, {"end", required_argument, 0, 'e'}, + {"daemon", required_argument, 0, 'd'}, {0, 0, 0, 0} }; @@ -93,7 +96,7 @@ int rrd_fetch( int option_index = 0; int opt; - opt = getopt_long(argc, argv, "r:s:e:", long_options, &option_index); + opt = getopt_long(argc, argv, "r:s:e:d:", long_options, &option_index); if (opt == EOF) break; @@ -114,6 +117,18 @@ int rrd_fetch( case 'r': step_tmp = atol(optarg); break; + + case 'd': + if (daemon != NULL) + free (daemon); + daemon = strdup (optarg); + if (daemon == NULL) + { + rrd_set_error ("strdup failed."); + return (-1); + } + break; + case '?': rrd_set_error("unknown option '-%c'", optopt); return (-1); @@ -151,10 +166,26 @@ int rrd_fetch( return -1; } + if (daemon == NULL) + { + char *temp; + + temp = getenv (ENV_RRDCACHED_ADDRESS); + if (temp != NULL) + { + daemon = strdup (temp); + if (daemon == NULL) + { + rrd_set_error("strdup failed."); + return (-1); + } + } + } + cf = argv[optind + 1]; - if (rrd_fetch_r(argv[optind], cf, start, end, step, ds_cnt, ds_namv, data) - != 0) + if (rrd_fetch_r(argv[optind], cf, start, end, step, daemon, ds_cnt, + ds_namv, data) != 0) return (-1); return (0); } @@ -167,19 +198,36 @@ int rrd_fetch_r( * will be changed to represent reality */ unsigned long *step, /* which stepsize do you want? * will be changed to represent reality */ + const char *daemon, unsigned long *ds_cnt, /* number of data sources in file */ char ***ds_namv, /* names of data_sources */ rrd_value_t **data) { /* two dimensional array containing the data */ enum cf_en cf_idx; + int status; if ((int) (cf_idx = cf_conv(cf)) == -1) { return -1; } - return (rrd_fetch_fn - (filename, cf_idx, start, end, step, ds_cnt, ds_namv, data)); -} + if (daemon != NULL) + { + status = rrdc_connect (daemon); + if (status != 0) + { + rrd_set_error ("rrdc_connect failed with status %i.", status); + return (-1); + } + } + + status = rrd_fetch_fn (filename, cf_idx, start, end, step, + (daemon == NULL) ? 0 : 1, + ds_cnt, ds_namv, data); + + rrdc_disconnect (); + + return (status); +} /* int rrd_fetch_r */ int rrd_fetch_fn( const char *filename, /* name of the rrd */ @@ -189,6 +237,7 @@ int rrd_fetch_fn( * will be changed to represent reality */ unsigned long *step, /* which stepsize do you want? * will be changed to represent reality */ + int use_rrdcached, unsigned long *ds_cnt, /* number of data sources in file */ char ***ds_namv, /* names of data_sources */ rrd_value_t **data) @@ -208,6 +257,18 @@ int rrd_fetch_fn( rrd_value_t *data_ptr; unsigned long rows; + if (use_rrdcached) + { + int status; + + status = rrdc_flush (filename); + if (status != 0) + { + rrd_set_error ("rrdc_flush failed with status %i.", status); + return (-1); + } + } + #ifdef DEBUG fprintf(stderr, "Entered rrd_fetch_fn() searching for the best match\n"); fprintf(stderr, "Looking for: start %10lu end %10lu step %5lu\n", diff --git a/src/rrd_graph.c b/src/rrd_graph.c index e671f78..dbca157 100644 --- a/src/rrd_graph.c +++ b/src/rrd_graph.c @@ -26,6 +26,7 @@ #endif #include "rrd_graph.h" +#include "rrd_client.h" /* some constant definitions */ @@ -302,6 +303,13 @@ int im_free( if (im == NULL) return 0; + + if (im->use_rrdcached) + { + rrdc_disconnect (); + im->use_rrdcached = 0; + } + for (i = 0; i < (unsigned) im->gdes_c; i++) { if (im->gdes[i].data_first) { /* careful here, because a single pointer can occur several times */ @@ -829,6 +837,7 @@ int data_fetch( &im->gdes[i].start, &im->gdes[i].end, &ft_step, + im->use_rrdcached ? 1 : 0, &im->gdes[i].ds_cnt, &im->gdes[i].ds_namv, &im->gdes[i].data)) == -1) { @@ -3720,6 +3729,7 @@ void rrd_graph_init( im->grinfo_current = (rrd_info_t *) NULL; im->imgformat = IF_PNG; im->imginfo = NULL; + im->use_rrdcached = 0; im->lazy = 0; im->logarithmic = 0; im->maxval = DNAN; @@ -3838,6 +3848,7 @@ void rrd_graph_options( { "watermark", required_argument, 0, 'W'}, { "alt-y-mrtg", no_argument, 0, 1000}, /* this has no effect it is just here to save old apps from crashing when they use it */ { "pango-markup", no_argument, 0, 'P'}, + { "daemon", required_argument, 0, 'd'}, { 0, 0, 0, 0} }; /* *INDENT-ON* */ @@ -3852,7 +3863,7 @@ void rrd_graph_options( int col_start, col_end; opt = getopt_long(argc, argv, - "s:e:x:y:v:w:h:D:iu:l:rb:oc:n:m:t:f:a:I:zgjFYAMEX:L:S:T:NR:B:W:kP", + "s:e:x:y:v:w:h:D:iu:l:rb:oc:n:m:t:f:a:I:zgjFYAMEX:L:S:T:NR:B:W:kPd:", long_options, &option_index); if (opt == EOF) break; @@ -4196,6 +4207,25 @@ void rrd_graph_options( strncpy(im->watermark, optarg, 100); im->watermark[99] = '\0'; break; + case 'd': + { + int status; + if (im->use_rrdcached) + { + rrd_set_error ("You cannot specify --daemon " + "more than once."); + return; + } + status = rrdc_connect (optarg); + if (status != 0) + { + rrd_set_error ("rrdc_connect(%s) failed with status %i.", + optarg, status); + return; + } + im->use_rrdcached = 1; + break; + } case '?': if (optopt != 0) rrd_set_error("unknown option '%c'", optopt); @@ -4203,6 +4233,26 @@ void rrd_graph_options( rrd_set_error("unknown option '%s'", argv[optind - 1]); return; } + } /* while (1) */ + + if (im->use_rrdcached == 0) + { + char *temp; + + temp = getenv (ENV_RRDCACHED_ADDRESS); + if (temp != NULL) + { + int status; + + status = rrdc_connect (temp); + if (status != 0) + { + rrd_set_error ("rrdc_connect(%s) failed with status %i.", + temp, status); + return; + } + im->use_rrdcached = 1; + } } if (im->logarithmic && im->minval <= 0) { diff --git a/src/rrd_graph.h b/src/rrd_graph.h index 2b1c05b..c21f356 100644 --- a/src/rrd_graph.h +++ b/src/rrd_graph.h @@ -210,6 +210,7 @@ typedef struct image_desc_t { char *imginfo; /* construct an ]\n" "\t\ttime|N:value[:value...]\n\n" "\t\tat-time@value[:value...]\n\n" "\t\t[ time:value[:value...] ..]\n\n"); @@ -104,7 +105,7 @@ void PrintUsage( N_("* updatev - a verbose version of update\n" "\treturns information about values, RRAs, and datasources updated\n\n" "\trrdtool updatev filename\n" - "\t\t--template|-t ds-name:ds-name:...\n" + "\t\t[--template|-t ds-name:ds-name:...]\n" "\t\ttime|N:value[:value...]\n\n" "\t\tat-time@value[:value...]\n\n" "\t\t[ time:value[:value...] ..]\n\n"); @@ -113,7 +114,8 @@ void PrintUsage( N_("* fetch - fetch data out of an RRD\n\n" "\trrdtool fetch filename.rrd CF\n" "\t\t[-r|--resolution resolution]\n" - "\t\t[-s|--start start] [-e|--end end]\n\n"); + "\t\t[-s|--start start] [-e|--end end]\n" + "\t\t[--daemon
]\n\n"); /* break up very large strings (help_graph, help_tune) for ISO C89 compliance*/ @@ -132,7 +134,7 @@ void PrintUsage( "\t\t[-h|--height pixels] [-o|--logarithmic]\n" "\t\t[-u|--upper-limit value] [-z|--lazy]\n" "\t\t[-l|--lower-limit value] [-r|--rigid]\n" - "\t\t[-g|--no-legend]\n" + "\t\t[-g|--no-legend] [--daemon
]\n" "\t\t[-F|--force-rules-legend]\n" "\t\t[-j|--only-graph]\n"); const char *help_graph2 = N_("\t\t[-n|--font FONTTAG:size:font]\n" diff --git a/src/rrd_tool.h b/src/rrd_tool.h index 0be66e4..63359b6 100644 --- a/src/rrd_tool.h +++ b/src/rrd_tool.h @@ -77,15 +77,15 @@ extern "C" { int rrd_create_fn( const char *file_name, rrd_t *rrd); - int rrd_fetch_fn( - const char *filename, - enum cf_en cf_idx, - time_t *start, - time_t *end, - unsigned long *step, - unsigned long *ds_cnt, - char ***ds_namv, - rrd_value_t **data); + int rrd_fetch_fn (const char *filename, + enum cf_en cf_idx, + time_t *start, + time_t *end, + unsigned long *step, + int use_rrdcached, + unsigned long *ds_cnt, + char ***ds_namv, + rrd_value_t **data); #define RRD_READONLY (1<<0) #define RRD_READWRITE (1<<1) diff --git a/src/rrd_update.c b/src/rrd_update.c index fe6aea4..5536c3c 100644 --- a/src/rrd_update.c +++ b/src/rrd_update.c @@ -1,6 +1,7 @@ /***************************************************************************** * RRDtool 1.3.0 Copyright by Tobi Oetiker, 1997-2008 + * Copyright by Florian Forster, 2008 ***************************************************************************** * rrd_update.c RRD Update Function ***************************************************************************** @@ -23,6 +24,8 @@ #include "rrd_is_thread_safe.h" #include "unused.h" +#include "rrd_client.h" + #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) /* * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty @@ -374,18 +377,20 @@ int rrd_update( { struct option long_options[] = { {"template", required_argument, 0, 't'}, + {"daemon", required_argument, 0, 'd'}, {0, 0, 0, 0} }; int option_index = 0; int opt; char *tmplt = NULL; int rc = -1; + char *daemon = NULL; optind = 0; opterr = 0; /* initialize getopt */ while (1) { - opt = getopt_long(argc, argv, "t:", long_options, &option_index); + opt = getopt_long(argc, argv, "t:d:", long_options, &option_index); if (opt == EOF) break; @@ -395,6 +400,17 @@ int rrd_update( tmplt = strdup(optarg); break; + case 'd': + if (daemon != NULL) + free (daemon); + daemon = strdup (optarg); + if (daemon == NULL) + { + rrd_set_error("strdup failed."); + goto out; + } + break; + case '?': rrd_set_error("unknown option '%s'", argv[optind - 1]); goto out; @@ -407,10 +423,71 @@ int rrd_update( goto out; } + if ((tmplt != NULL) && (daemon != NULL)) + { + rrd_set_error("The caching daemon cannot be used together with " + "templates yet."); + goto out; + } + + if ((tmplt == NULL) && (daemon == NULL)) + { + char *temp; + + temp = getenv (ENV_RRDCACHED_ADDRESS); + if (temp != NULL) + { + daemon = strdup (temp); + if (daemon == NULL) + { + rrd_set_error("strdup failed."); + goto out; + } + } + } + + if (daemon != NULL) + { + int status; + + status = rrdc_connect (daemon); + if (status != 0) + { + rrd_set_error("Unable to connect to daemon: %s", + (status < 0) + ? "Internal error" + : rrd_strerror (status)); + goto out; + } + + status = rrdc_update (/* file = */ argv[optind], + /* values_num = */ argc - optind - 1, + /* values = */ (void *) (argv + optind + 1)); + if (status != 0) + { + rrd_set_error("Failed sending the values to the daemon: %s", + (status < 0) + ? "Internal error" + : rrd_strerror (status)); + } + + rrdc_disconnect (); + goto out; + } /* if (daemon != NULL) */ + rc = rrd_update_r(argv[optind], tmplt, argc - optind - 1, (const char **) (argv + optind + 1)); out: - free(tmplt); + if (tmplt != NULL) + { + free(tmplt); + tmplt = NULL; + } + if (daemon != NULL) + { + free (daemon); + daemon = NULL; + } return rc; }