src/rrd_daemon.c: Add caching daemon.
authorFlorian Forster <octo@leeloo.home.verplant.org>
Sun, 22 Jun 2008 17:46:08 +0000 (19:46 +0200)
committerFlorian Forster <octo@leeloo.home.verplant.org>
Sun, 22 Jun 2008 17:46:08 +0000 (19:46 +0200)
Add the daemon from the RRDd project.

doc/Makefile.am
doc/rrdd.pod [new file with mode: 0644]
src/Makefile.am
src/rrd_daemon.c [new file with mode: 0644]

index 16fd617..dd898ce 100644 (file)
@@ -10,7 +10,7 @@ CLEANFILES = *.1 *.html *.txt *-dircache RRD?.pod *.pdf *~ core *itemcache *.rej
 
 POD = bin_dec_hex.pod        rrddump.pod            rrdgraph_examples.pod  rrdrestore.pod         rrdupdate.pod  \
       cdeftutorial.pod       rrdfetch.pod           rrdgraph_graph.pod     rrdthreads.pod         rrdxport.pod   \
-      rpntutorial.pod        rrdfirst.pod           rrdgraph_rpn.pod       rrdtool.pod                           \
+      rpntutorial.pod        rrdfirst.pod           rrdgraph_rpn.pod       rrdtool.pod            rrdd.pod       \
       rrd-beginners.pod      rrdinfo.pod            rrdtune.pod            rrdbuild.pod                          \
       rrdcgi.pod             rrdgraph.pod           rrdlast.pod            rrdlastupdate.pod                     \
       rrdcreate.pod          rrdgraph_data.pod      rrdresize.pod          rrdtutorial.pod                       
diff --git a/doc/rrdd.pod b/doc/rrdd.pod
new file mode 100644 (file)
index 0000000..e0c322a
--- /dev/null
@@ -0,0 +1,62 @@
+=pod
+
+=head1 NAME
+
+RRDd - Data caching daemon for rrdtool
+
+=head1 SYNOPSIS
+
+B<rrdd> [B<-l> I<address>] [B<-w> I<timeout>] [B<-f> I<timeout>]
+
+=head1 DESCRIPTION
+
+RRDd is a daemon that receives updates to existing RRD files, accumulates them
+and, if enough have been received or a defined time has passed, writes the
+updates to the RRD file. A I<flush> command may be used to force writing of
+values to disk, so that graphing facilities and similar can work with
+up-to-date data.
+
+=head1 OPTIONS
+
+=over 4
+
+=item B<-l> I<address>
+
+Tells the daemon to bind to I<address> and accept incoming connections on that
+socket. If I<address> begins with C<unix:>, everthing following that prefix is
+interpreted as the path to a UNIX domain socket. Otherwise the address or node
+name are resolved using L<getaddrinfo(3)>.
+
+=item B<-w> I<timeout>
+
+Data is written to disk every I<timeout> seconds.
+
+=item B<-f> I<timeout>
+
+Every I<timeout> seconds the entire cache is searched for old values which are
+written to disk. This only concerns files to which updates have stopped, so
+setting this to a high value, such as 3600 seconds, is acceptable in most
+cases.
+
+=head1 BUGS
+
+=over 4
+
+=item 
+
+Uses way too much CPU time after running for a while.
+
+=item
+
+Base directory is currently hard coded. The daemon will chdir to C</tmp/>.
+
+=back
+
+=head1 SEE ALSO
+
+L<rrdtool(1)>, L<rrdgraph(1)>
+
+=head1 AUHOR
+
+RRDd and this manual page have been written by Florian Forster
+E<lt>octoE<nbsp>atE<nbsp>verplant.orgE<gt>.
index 5311592..d7f82ec 100644 (file)
@@ -82,7 +82,7 @@ librrd_th_la_LIBADD          = $(ALL_LIBS)
 
 include_HEADERS        = rrd.h rrd_format.h
 
-bin_PROGRAMS   = rrdtool rrdupdate
+bin_PROGRAMS   = rrdtool rrdupdate rrdd
 
 if BUILD_RRDCGI
 bin_PROGRAMS += rrdcgi
@@ -98,6 +98,11 @@ rrdtool_SOURCES = rrd_tool.c
 rrdtool_DEPENDENCIES = librrd.la
 rrdtool_LDADD  = librrd.la
 
+rrdd_SOURCES = rrd_daemon.c
+rrdd_DEPENDENCIES = librrd.la
+rrdd_CPPFLAGS = -DVERSION='"$(VERSION)"'
+rrdd_LDADD = librrd.la
+
 # strftime is here because we do not usually need it. unices have propper
 # iso date support
 EXTRA_DIST= strftime.c strftime.h \
diff --git a/src/rrd_daemon.c b/src/rrd_daemon.c
new file mode 100644 (file)
index 0000000..3af33dc
--- /dev/null
@@ -0,0 +1,1057 @@
+/**
+ * RRDTool - src/rrd_daemon.c
+ * Copyright (C) 2008 Florian octo Forster
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; only version 2 of the License is applicable.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
+ *
+ * Authors:
+ *   Florian octo Forster <octo at verplant.org>
+ **/
+
+/*
+ * First tell the compiler to stick to the C99 and POSIX standards as close as
+ * possible.
+ */
+#ifndef __STRICT_ANSI__ /* {{{ */
+# define __STRICT_ANSI__
+#endif
+
+#ifndef _ISOC99_SOURCE
+# define _ISOC99_SOURCE
+#endif
+
+#ifdef _POSIX_C_SOURCE
+# undef _POSIX_C_SOURCE
+#endif
+#define _POSIX_C_SOURCE 200112L
+
+/* Single UNIX needed for strdup. */
+#ifdef _XOPEN_SOURCE
+# undef _XOPEN_SOURCE
+#endif
+#define _XOPEN_SOURCE 500
+
+#ifndef _REENTRANT
+# define _REENTRANT
+#endif
+
+#ifndef _THREAD_SAFE
+# define _THREAD_SAFE
+#endif
+
+#ifdef _GNU_SOURCE
+# undef _GNU_SOURCE
+#endif
+/* }}} */
+
+/*
+ * Now for some includes..
+ */
+#include "rrd.h" /* {{{ */
+#include "rrd_client.h"
+
+#include <stdlib.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <string.h>
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <signal.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <netdb.h>
+#include <poll.h>
+#include <syslog.h>
+#include <pthread.h>
+#include <errno.h>
+#include <assert.h>
+#include <sys/time.h>
+#include <time.h>
+
+#include <glib-2.0/glib.h>
+/* }}} */
+
+#define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
+
+#ifndef __GNUC__
+# define __attribute__(x) /**/
+#endif
+
+#if 0
+/* FIXME: I don't want to declare strdup myself! */
+extern char *strdup (const char *s);
+#endif
+
+/*
+ * Types
+ */
+struct listen_socket_s
+{
+  int fd;
+  char path[PATH_MAX + 1];
+};
+typedef struct listen_socket_s listen_socket_t;
+
+struct cache_item_s;
+typedef struct cache_item_s cache_item_t;
+struct cache_item_s
+{
+  char *file;
+  char **values;
+  int values_num;
+  time_t last_flush_time;
+#define CI_FLAGS_IN_TREE  0x01
+#define CI_FLAGS_IN_QUEUE 0x02
+  int flags;
+
+  cache_item_t *next;
+};
+
+/*
+ * Variables
+ */
+static listen_socket_t *listen_fds = NULL;
+static size_t listen_fds_num = 0;
+
+static int do_shutdown = 0;
+
+static pthread_t queue_thread;
+
+static pthread_t *connetion_threads = NULL;
+static pthread_mutex_t connetion_threads_lock = PTHREAD_MUTEX_INITIALIZER;
+static int connetion_threads_num = 0;
+
+/* Cache stuff */
+static GTree          *cache_tree = NULL;
+static cache_item_t   *cache_queue_head = NULL;
+static cache_item_t   *cache_queue_tail = NULL;
+static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
+
+static int config_write_interval = 300;
+static int config_flush_interval = 3600;
+
+static char **config_listen_address_list = NULL;
+static int config_listen_address_list_len = 0;
+
+/* 
+ * Functions
+ */
+static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
+{
+  do_shutdown++;
+} /* }}} void sig_int_handler */
+
+static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
+{
+  do_shutdown++;
+} /* }}} void sig_term_handler */
+
+/*
+ * enqueue_cache_item:
+ * `cache_lock' must be acquired before calling this function!
+ */
+static int enqueue_cache_item (cache_item_t *ci) /* {{{ */
+{
+  RRDD_LOG (LOG_DEBUG, "handle_request_update: Adding %s to the update queue.",
+      ci->file);
+
+  if (ci == NULL)
+    return (-1);
+
+  if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
+    return (-1);
+
+  assert (ci->next == NULL);
+
+  if (cache_queue_tail == NULL)
+    cache_queue_head = ci;
+  else
+    cache_queue_tail->next = ci;
+  cache_queue_tail = ci;
+
+  return (0);
+} /* }}} int enqueue_cache_item */
+
+/*
+ * tree_callback_flush:
+ * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
+ * while this is in progress.
+ */
+static gboolean tree_callback_flush (gpointer key /* {{{ */
+    __attribute__((unused)), gpointer value, gpointer data)
+{
+  cache_item_t *ci;
+  time_t now;
+
+  key = NULL; /* make compiler happy */
+
+  ci = (cache_item_t *) value;
+  now = *((time_t *) data);
+
+  if (((now - ci->last_flush_time) >= config_write_interval)
+      && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0))
+    enqueue_cache_item (ci);
+
+  return (TRUE);
+} /* }}} gboolean tree_callback_flush */
+
+static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
+{
+  struct timeval now;
+  struct timespec next_flush;
+
+  gettimeofday (&now, NULL);
+  next_flush.tv_sec = now.tv_sec + config_flush_interval;
+  next_flush.tv_nsec = 1000 * now.tv_usec;
+
+  pthread_mutex_lock (&cache_lock);
+  while ((do_shutdown == 0) || (cache_queue_head != NULL))
+  {
+    cache_item_t *ci;
+    char *file;
+    char **values;
+    int values_num;
+    int status;
+    int i;
+
+    /* First, check if it's time to do the cache flush. */
+    gettimeofday (&now, NULL);
+    if ((now.tv_sec > next_flush.tv_sec)
+        || ((now.tv_sec == next_flush.tv_sec)
+          && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
+    {
+      time_t time_now;
+
+      /* Pass the current time as user data so that we don't need to call
+       * `time' for each node. */
+      time_now = time (NULL);
+
+      g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &time_now);
+
+      /* Determine the time of the next cache flush. */
+      while (next_flush.tv_sec < now.tv_sec)
+        next_flush.tv_sec += config_flush_interval;
+    }
+
+    /* Now, check if there's something to store away. If not, wait until
+     * something comes in or it's time to do the cache flush. */
+    if (cache_queue_head == NULL)
+    {
+      struct timespec timeout;
+
+      timeout.tv_sec = next_flush.tv_sec - now.tv_sec;
+      if (next_flush.tv_nsec < (1000 * now.tv_usec))
+      {
+        timeout.tv_sec--;
+        timeout.tv_nsec = 1000000000 + next_flush.tv_nsec
+          - (1000 * now.tv_usec);
+      }
+      else
+      {
+        timeout.tv_nsec = next_flush.tv_nsec - (1000 * now.tv_usec);
+      }
+
+      pthread_cond_timedwait (&cache_cond, &cache_lock, &timeout);
+    }
+
+    /* Check if a value has arrived. This may be NULL if we timed out or there
+     * was an interrupt such as a signal. */
+    if (cache_queue_head == NULL)
+      continue;
+
+    ci = cache_queue_head;
+
+    /* copy the relevant parts */
+    file = strdup (ci->file);
+    if (file == NULL)
+    {
+      RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
+      continue;
+    }
+
+    values = ci->values;
+    values_num = ci->values_num;
+
+    ci->values = NULL;
+    ci->values_num = 0;
+
+    ci->last_flush_time = time (NULL);
+    ci->flags &= ~(CI_FLAGS_IN_QUEUE);
+
+    cache_queue_head = ci->next;
+    if (cache_queue_head == NULL)
+      cache_queue_tail = NULL;
+    ci->next = NULL;
+
+    pthread_mutex_unlock (&cache_lock);
+
+    RRDD_LOG (LOG_DEBUG, "queue_thread_main: rrd_update (%s, %i, %p)",
+        file, values_num, (void *) values);
+
+    status = rrd_update_r (file, NULL, values_num, (void *) values);
+    if (status != 0)
+    {
+      RRDD_LOG (LOG_ERR, "queue_thread_main: "
+          "rrd_update_r failed with status %i.",
+          status);
+    }
+
+    free (file);
+    for (i = 0; i < values_num; i++)
+      free (values[i]);
+
+    pthread_mutex_lock (&cache_lock);
+  } /* while (do_shutdown == 0) */
+  pthread_mutex_unlock (&cache_lock);
+
+  RRDD_LOG (LOG_DEBUG, "queue_thread_main: Exiting.");
+
+  return (NULL);
+} /* }}} void *queue_thread_main */
+
+static int handle_request_update (int fd, /* {{{ */
+    char *buffer, int buffer_size)
+{
+  char *file;
+  char *value;
+  char *buffer_ptr;
+  int values_num = 0;
+  int status;
+
+  time_t now;
+
+  cache_item_t *ci;
+  char answer[4096];
+
+  now = time (NULL);
+
+  RRDD_LOG (LOG_DEBUG, "handle_request_update (%i, %p, %i)",
+      fd, (void *) buffer, buffer_size);
+
+  buffer_ptr = buffer;
+
+  file = buffer_ptr;
+  buffer_ptr += strlen (file) + 1;
+
+  pthread_mutex_lock (&cache_lock);
+
+  ci = g_tree_lookup (cache_tree, file);
+  if (ci == NULL)
+  {
+    ci = (cache_item_t *) malloc (sizeof (cache_item_t));
+    if (ci == NULL)
+    {
+      pthread_mutex_unlock (&cache_lock);
+      RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
+      return (-1);
+    }
+    memset (ci, 0, sizeof (cache_item_t));
+
+    ci->file = strdup (file);
+    if (ci->file == NULL)
+    {
+      pthread_mutex_unlock (&cache_lock);
+      RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
+      free (ci);
+      return (-1);
+    }
+
+    ci->values = NULL;
+    ci->values_num = 0;
+    ci->last_flush_time = now;
+    ci->flags = CI_FLAGS_IN_TREE;
+
+    g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
+
+    RRDD_LOG (LOG_DEBUG, "handle_request_update: Created new tree node %s.",
+        ci->file);
+  }
+  assert (ci != NULL);
+
+  while (*buffer_ptr != 0)
+  {
+    char **temp;
+
+    value = buffer_ptr;
+    buffer_ptr += strlen (value) + 1;
+
+    temp = (char **) realloc (ci->values,
+        sizeof (char *) * (ci->values_num + 1));
+    if (temp == NULL)
+    {
+      RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
+      continue;
+    }
+    ci->values = temp;
+
+    ci->values[ci->values_num] = strdup (value);
+    if (ci->values[ci->values_num] == NULL)
+    {
+      RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
+      continue;
+    }
+    ci->values_num++;
+
+    values_num++;
+  }
+
+  if (((now - ci->last_flush_time) >= config_write_interval)
+      && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0))
+  {
+    enqueue_cache_item (ci);
+    pthread_cond_signal (&cache_cond);
+  }
+
+  pthread_mutex_unlock (&cache_lock);
+
+  snprintf (answer, sizeof (answer), "0 Enqueued %i value(s)\n", values_num);
+  answer[sizeof (answer) - 1] = 0;
+
+  status = write (fd, answer, sizeof (answer));
+  if (status < 0)
+  {
+    status = errno;
+    RRDD_LOG (LOG_INFO, "handle_request_update: write(2) returned an error.");
+    return (status);
+  }
+
+  return (0);
+} /* }}} int handle_request_update */
+
+static int handle_request (int fd) /* {{{ */
+{
+  char buffer[4096];
+  int buffer_size;
+
+  RRDD_LOG (LOG_DEBUG, "handle_request (%i)", fd);
+
+  buffer_size = read (fd, buffer, sizeof (buffer));
+  if (buffer_size < 1)
+  {
+    RRDD_LOG (LOG_ERR, "handle_request: read(2) failed.");
+    return (-1);
+  }
+  assert (((size_t) buffer_size) <= sizeof (buffer));
+
+  if ((buffer[buffer_size - 2] != 0)
+      || (buffer[buffer_size - 1] != 0))
+  {
+    RRDD_LOG (LOG_INFO, "handle_request: malformed request.");
+    return (-1);
+  }
+
+  /* fields in the buffer a separated by null bytes. */
+  if (strcmp (buffer, "update") == 0)
+  {
+    int offset = strlen ("update") + 1;
+    return (handle_request_update (fd, buffer + offset,
+          buffer_size - offset));
+  }
+  else
+  {
+    RRDD_LOG (LOG_INFO, "handle_request: unknown command: %s.", buffer);
+    return (-1);
+  }
+} /* }}} int handle_request */
+
+static void *connection_thread_main (void *args /* {{{ */
+    __attribute__((unused)))
+{
+  pthread_t self;
+  int i;
+  int fd;
+  
+  fd = *((int *) args);
+
+  RRDD_LOG (LOG_DEBUG, "connection_thread_main: Adding myself to "
+      "connetion_threads[]..");
+  pthread_mutex_lock (&connetion_threads_lock);
+  {
+    pthread_t *temp;
+
+    temp = (pthread_t *) realloc (connetion_threads,
+        sizeof (pthread_t) * (connetion_threads_num + 1));
+    if (temp == NULL)
+    {
+      RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
+    }
+    else
+    {
+      connetion_threads = temp;
+      connetion_threads[connetion_threads_num] = pthread_self ();
+      connetion_threads_num++;
+    }
+  }
+  pthread_mutex_unlock (&connetion_threads_lock);
+  RRDD_LOG (LOG_DEBUG, "connection_thread_main: done");
+
+  while (do_shutdown == 0)
+  {
+    struct pollfd pollfd;
+    int status;
+
+    pollfd.fd = fd;
+    pollfd.events = POLLIN | POLLPRI;
+    pollfd.revents = 0;
+
+    status = poll (&pollfd, 1, /* timeout = */ 500);
+    if (status == 0) /* timeout */
+      continue;
+    else if (status < 0) /* error */
+    {
+      status = errno;
+      if (status == EINTR)
+        continue;
+      RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
+      continue;
+    }
+
+    if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
+    {
+      RRDD_LOG (LOG_DEBUG, "connection_thread_main: "
+          "poll(2) returned POLLHUP.");
+      close (fd);
+      break;
+    }
+    else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
+    {
+      RRDD_LOG (LOG_WARNING, "connection_thread_main: "
+          "poll(2) returned something unexpected: %#04hx",
+          pollfd.revents);
+      close (fd);
+      break;
+    }
+
+    status = handle_request (fd);
+    if (status != 0)
+    {
+      close (fd);
+      break;
+    }
+  }
+
+  self = pthread_self ();
+  /* Remove this thread from the connection threads list */
+  pthread_mutex_lock (&connetion_threads_lock);
+  /* Find out own index in the array */
+  for (i = 0; i < connetion_threads_num; i++)
+    if (pthread_equal (connetion_threads[i], self) != 0)
+      break;
+  assert (i < connetion_threads_num);
+
+  /* Move the trailing threads forward. */
+  if (i < (connetion_threads_num - 1))
+  {
+    memmove (connetion_threads + i,
+        connetion_threads + i + 1,
+        sizeof (pthread_t) * (connetion_threads_num - i - 1));
+  }
+
+  connetion_threads_num--;
+  pthread_mutex_unlock (&connetion_threads_lock);
+
+  free (args);
+  return (NULL);
+} /* }}} void *connection_thread_main */
+
+static int open_listen_socket_unix (const char *path) /* {{{ */
+{
+  int fd;
+  struct sockaddr_un sa;
+  listen_socket_t *temp;
+  int status;
+
+  temp = (listen_socket_t *) realloc (listen_fds,
+      sizeof (listen_fds[0]) * (listen_fds_num + 1));
+  if (temp == NULL)
+  {
+    RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
+    return (-1);
+  }
+  listen_fds = temp;
+  memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
+
+  fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
+  if (fd < 0)
+  {
+    RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
+    return (-1);
+  }
+
+  memset (&sa, 0, sizeof (sa));
+  sa.sun_family = AF_UNIX;
+  strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
+
+  status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
+  if (status != 0)
+  {
+    RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
+    close (fd);
+    unlink (path);
+    return (-1);
+  }
+
+  status = listen (fd, /* backlog = */ 10);
+  if (status != 0)
+  {
+    RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
+    close (fd);
+    unlink (path);
+    return (-1);
+  }
+  
+  listen_fds[listen_fds_num].fd = fd;
+  snprintf (listen_fds[listen_fds_num].path,
+      sizeof (listen_fds[listen_fds_num].path) - 1,
+      "unix:%s", path);
+  listen_fds_num++;
+
+  return (0);
+} /* }}} int open_listen_socket_unix */
+
+static int open_listen_socket (const char *addr) /* {{{ */
+{
+  struct addrinfo ai_hints;
+  struct addrinfo *ai_res;
+  struct addrinfo *ai_ptr;
+  int status;
+
+  assert (addr != NULL);
+
+  if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
+    return (open_listen_socket_unix (addr + strlen ("unix:")));
+  else if (addr[0] == '/')
+    return (open_listen_socket_unix (addr));
+
+  memset (&ai_hints, 0, sizeof (ai_hints));
+  ai_hints.ai_flags = 0;
+#ifdef AI_ADDRCONFIG
+  ai_hints.ai_flags |= AI_ADDRCONFIG;
+#endif
+  ai_hints.ai_family = AF_UNSPEC;
+  ai_hints.ai_socktype = SOCK_STREAM;
+
+  ai_res = NULL;
+  status = getaddrinfo (addr, DEFAULT_PORT, &ai_hints, &ai_res);
+  if (status != 0)
+  {
+    RRDD_LOG (LOG_ERR, "open_listen_socket: getaddrinfo(%s) failed: "
+        "%s", addr, gai_strerror (status));
+    return (-1);
+  }
+
+  for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
+  {
+    int fd;
+    listen_socket_t *temp;
+
+    temp = (listen_socket_t *) realloc (listen_fds,
+        sizeof (listen_fds[0]) * (listen_fds_num + 1));
+    if (temp == NULL)
+    {
+      RRDD_LOG (LOG_ERR, "open_listen_socket: realloc failed.");
+      continue;
+    }
+    listen_fds = temp;
+    memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
+
+    fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
+    if (fd < 0)
+    {
+      RRDD_LOG (LOG_ERR, "open_listen_socket: socket(2) failed.");
+      continue;
+    }
+
+    status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
+    if (status != 0)
+    {
+      RRDD_LOG (LOG_ERR, "open_listen_socket: bind(2) failed.");
+      close (fd);
+      continue;
+    }
+
+    status = listen (fd, /* backlog = */ 10);
+    if (status != 0)
+    {
+      RRDD_LOG (LOG_ERR, "open_listen_socket: listen(2) failed.");
+      close (fd);
+      return (-1);
+    }
+
+    listen_fds[listen_fds_num].fd = fd;
+    strncpy (listen_fds[listen_fds_num].path, addr,
+        sizeof (listen_fds[listen_fds_num].path) - 1);
+    listen_fds_num++;
+  } /* for (ai_ptr) */
+
+  return (0);
+} /* }}} int open_listen_socket */
+
+static int close_listen_sockets (void) /* {{{ */
+{
+  size_t i;
+
+  for (i = 0; i < listen_fds_num; i++)
+  {
+    close (listen_fds[i].fd);
+    if (strncmp ("unix:", listen_fds[i].path, strlen ("unix:")) == 0)
+      unlink (listen_fds[i].path + strlen ("unix:"));
+  }
+
+  free (listen_fds);
+  listen_fds = NULL;
+  listen_fds_num = 0;
+
+  return (0);
+} /* }}} int close_listen_sockets */
+
+static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
+{
+  struct pollfd *pollfds;
+  int pollfds_num;
+  int status;
+  int i;
+
+  for (i = 0; i < config_listen_address_list_len; i++)
+  {
+    RRDD_LOG (LOG_DEBUG, "listen_thread_main: config_listen_address_list[%i] "
+        "= %s", i, config_listen_address_list[i]);
+    open_listen_socket (config_listen_address_list[i]);
+  }
+
+  if (config_listen_address_list_len < 1)
+    open_listen_socket (RRDD_SOCK_PATH);
+
+  if (listen_fds_num < 1)
+  {
+    RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
+        "could be opened. Sorry.");
+    return (NULL);
+  }
+
+  pollfds_num = listen_fds_num;
+  pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
+  if (pollfds == NULL)
+  {
+    RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
+    return (NULL);
+  }
+  memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
+
+  while (do_shutdown == 0)
+  {
+    assert (pollfds_num == ((int) listen_fds_num));
+    for (i = 0; i < pollfds_num; i++)
+    {
+      pollfds[i].fd = listen_fds[i].fd;
+      pollfds[i].events = POLLIN | POLLPRI;
+      pollfds[i].revents = 0;
+    }
+
+    status = poll (pollfds, pollfds_num, /* timeout = */ -1);
+    if (status < 1)
+    {
+      status = errno;
+      if (status != EINTR)
+      {
+        RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
+      }
+      continue;
+    }
+
+    for (i = 0; i < pollfds_num; i++)
+    {
+      int *client_sd;
+      struct sockaddr_storage client_sa;
+      socklen_t client_sa_size;
+      pthread_t tid;
+
+      if (pollfds[i].revents == 0)
+        continue;
+
+      if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
+      {
+        RRDD_LOG (LOG_ERR, "listen_thread_main: "
+            "poll(2) returned something unexpected for listen FD #%i.",
+            pollfds[i].fd);
+        continue;
+      }
+
+      client_sd = (int *) malloc (sizeof (int));
+      if (client_sd == NULL)
+      {
+        RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
+        continue;
+      }
+
+      client_sa_size = sizeof (client_sa);
+      *client_sd = accept (pollfds[i].fd,
+          (struct sockaddr *) &client_sa, &client_sa_size);
+      if (*client_sd < 0)
+      {
+        RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
+        continue;
+      }
+
+      RRDD_LOG (LOG_DEBUG, "listen_thread_main: accept(2) returned fd #%i.",
+          *client_sd);
+
+      status = pthread_create (&tid, /* attr = */ NULL, connection_thread_main,
+          /* args = */ (void *) client_sd);
+      if (status != 0)
+      {
+        RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
+        close (*client_sd);
+        free (client_sd);
+        continue;
+      }
+
+      RRDD_LOG (LOG_DEBUG, "listen_thread_main: pthread_create succeeded: "
+          "tid = %lu",
+          *((unsigned long *) &tid));
+    } /* for (pollfds_num) */
+  } /* while (do_shutdown == 0) */
+
+  close_listen_sockets ();
+
+  pthread_mutex_lock (&connetion_threads_lock);
+  while (connetion_threads_num > 0)
+  {
+    pthread_t wait_for;
+
+    wait_for = connetion_threads[0];
+
+    pthread_mutex_unlock (&connetion_threads_lock);
+    pthread_join (wait_for, /* retval = */ NULL);
+    pthread_mutex_lock (&connetion_threads_lock);
+  }
+  pthread_mutex_unlock (&connetion_threads_lock);
+
+  RRDD_LOG (LOG_DEBUG, "listen_thread_main: Exiting.");
+
+  return (NULL);
+} /* }}} void *listen_thread_main */
+
+static int daemonize (void) /* {{{ */
+{
+  pid_t child;
+  int status;
+
+  child = fork ();
+  if (child < 0)
+  {
+    fprintf (stderr, "daemonize: fork(2) failed.\n");
+    return (-1);
+  }
+  else if (child > 0)
+  {
+    return (1);
+  }
+
+  /* Change into the /tmp directory. */
+  chdir ("/tmp");
+
+  /* Become session leader */
+  setsid ();
+
+  /* Open the first three file descriptors to /dev/null */
+  close (2);
+  close (1);
+  close (0);
+
+  open ("/dev/null", O_RDWR);
+  dup (0);
+  dup (0);
+
+  {
+    struct sigaction sa;
+
+    memset (&sa, 0, sizeof (sa));
+    sa.sa_handler = sig_int_handler;
+    sigaction (SIGINT, &sa, NULL);
+
+    memset (&sa, 0, sizeof (sa));
+    sa.sa_handler = sig_term_handler;
+    sigaction (SIGINT, &sa, NULL);
+
+    memset (&sa, 0, sizeof (sa));
+    sa.sa_handler = SIG_IGN;
+    sigaction (SIGPIPE, &sa, NULL);
+  }
+
+  openlog ("rrdd", LOG_PID, LOG_DAEMON);
+
+  cache_tree = g_tree_new ((GCompareFunc) strcmp);
+  if (cache_tree == NULL)
+  {
+    RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
+    return (-1);
+  }
+
+  memset (&queue_thread, 0, sizeof (queue_thread));
+  status = pthread_create (&queue_thread, /* attr = */ NULL,
+      queue_thread_main, /* args = */ NULL);
+  if (status != 0)
+  {
+    RRDD_LOG (LOG_ERR, "daemonize: pthread_create failed.");
+    return (-1);
+  }
+
+  return (0);
+} /* }}} int daemonize */
+
+static int cleanup (void) /* {{{ */
+{
+  RRDD_LOG (LOG_DEBUG, "cleanup ()");
+
+  do_shutdown++;
+
+  RRDD_LOG (LOG_DEBUG, "cleanup: Joining queue_thread..");
+  pthread_cond_signal (&cache_cond);
+  pthread_join (queue_thread, /* return = */ NULL);
+  RRDD_LOG (LOG_DEBUG, "cleanup: done");
+
+  closelog ();
+
+  return (0);
+} /* }}} int cleanup */
+
+static int read_options (int argc, char **argv) /* {{{ */
+{
+  int option;
+  int status = 0;
+
+  while ((option = getopt(argc, argv, "l:f:w:h?")) != -1)
+  {
+    switch (option)
+    {
+      case 'l':
+      {
+        char **temp;
+
+        temp = (char **) realloc (config_listen_address_list,
+            sizeof (char *) * (config_listen_address_list_len + 1));
+        if (temp == NULL)
+        {
+          fprintf (stderr, "read_options: realloc failed.\n");
+          return (2);
+        }
+        config_listen_address_list = temp;
+
+        temp[config_listen_address_list_len] = strdup (optarg);
+        if (temp[config_listen_address_list_len] == NULL)
+        {
+          fprintf (stderr, "read_options: strdup failed.\n");
+          return (2);
+        }
+        config_listen_address_list_len++;
+      }
+      break;
+
+      case 'f':
+      {
+        int temp;
+
+        temp = atoi (optarg);
+        if (temp > 0)
+          config_flush_interval = temp;
+        else
+        {
+          fprintf (stderr, "Invalid flush interval: %s\n", optarg);
+          status = 3;
+        }
+      }
+      break;
+
+      case 'w':
+      {
+        int temp;
+
+        temp = atoi (optarg);
+        if (temp > 0)
+          config_write_interval = temp;
+        else
+        {
+          fprintf (stderr, "Invalid write interval: %s\n", optarg);
+          status = 2;
+        }
+      }
+      break;
+
+      case 'h':
+      case '?':
+        printf ("RRDd %s  Copyright (C) 2008 Florian octo Forster\n"
+            "\n"
+            "Usage: rrdd [options]\n"
+            "\n"
+            "Valid options are:\n"
+            "  -l <address>  Socket address to listen to.\n"
+            "  -w <seconds>  Interval in which to write data.\n"
+            "  -f <seconds>  Interval in which to flush dead data.\n"
+            "\n"
+            "For more information and a detailed description of all options "
+            "please refer\n"
+            "to the rrdd(1) manual page.\n",
+            VERSION);
+        status = -1;
+        break;
+    } /* switch (option) */
+  } /* while (getopt) */
+
+  return (status);
+} /* }}} int read_options */
+
+int main (int argc, char **argv)
+{
+  int status;
+
+  status = read_options (argc, argv);
+  if (status != 0)
+  {
+    if (status < 0)
+      status = 0;
+    return (status);
+  }
+
+  status = daemonize ();
+  if (status == 1)
+  {
+    struct sigaction sigchld;
+
+    memset (&sigchld, 0, sizeof (sigchld));
+    sigchld.sa_handler = SIG_IGN;
+    sigaction (SIGCHLD, &sigchld, NULL);
+
+    return (0);
+  }
+  else if (status != 0)
+  {
+    fprintf (stderr, "daemonize failed, exiting.\n");
+    return (1);
+  }
+
+  listen_thread_main (NULL);
+
+  cleanup ();
+
+  return (0);
+} /* int main */
+
+/*
+ * vim: set sw=2 sts=2 ts=8 et fdm=marker :
+ */