src/rrdd.c: Implement option parsing.
[rrdd.git] / src / rrdd.c
index 6625da3..c2f7936 100644 (file)
@@ -75,6 +75,9 @@ static cache_item_t   *cache_queue_tail = NULL;
 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
 
+static int config_write_interval = 300;
+static int config_flush_interval = 3600;
+
 /* 
  * Functions
  */
@@ -122,6 +125,7 @@ static void *queue_thread_main (void *args) /* {{{ */
     char *file;
     char **values;
     int values_num;
+    int status;
     int i;
 
     if (cache_queue_head == NULL)
@@ -159,6 +163,14 @@ static void *queue_thread_main (void *args) /* {{{ */
     RRDD_LOG (LOG_DEBUG, "queue_thread_main: rrd_update (%s, %i, %p)",
         file, values_num, (void *) values);
 
+    status = rrd_update_r (file, NULL, values_num, values);
+    if (status != 0)
+    {
+      RRDD_LOG (LOG_ERR, "queue_thread_main: "
+          "rrd_update_r failed with status %i.",
+          status);
+    }
+
     free (file);
     for (i = 0; i < values_num; i++)
       free (values[i]);
@@ -179,6 +191,7 @@ static int handle_request_update (int fd, /* {{{ */
   char *value;
   char *buffer_ptr;
   int values_num = 0;
+  int status;
 
   time_t now;
 
@@ -273,8 +286,7 @@ static int handle_request_update (int fd, /* {{{ */
     values_num++;
   }
 
-  /* FIXME: Timeout should not be hard-coded. */
-  if (((now - ci->last_flush_time) > 300)
+  if (((now - ci->last_flush_time) >= config_write_interval)
       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0))
   {
     RRDD_LOG (LOG_DEBUG, "handle_request_update: Adding %s to the update queue.",
@@ -296,7 +308,13 @@ static int handle_request_update (int fd, /* {{{ */
   snprintf (answer, sizeof (answer), "0 Enqueued %i value(s)\n", values_num);
   answer[sizeof (answer) - 1] = 0;
 
-  write (fd, answer, sizeof (answer));
+  status = write (fd, answer, sizeof (answer));
+  if (status < 0)
+  {
+    status = errno;
+    RRDD_LOG (LOG_INFO, "handle_request_update: write(2) returned an error.");
+    return (status);
+  }
 
   return (0);
 } /* }}} int handle_request_update */
@@ -345,6 +363,28 @@ static void *connection_thread_main (void *args) /* {{{ */
   
   fd = *((int *) args);
 
+  RRDD_LOG (LOG_DEBUG, "connection_thread_main: Adding myself to "
+      "connetion_threads[]..");
+  pthread_mutex_lock (&connetion_threads_lock);
+  {
+    pthread_t *temp;
+
+    temp = (pthread_t *) realloc (connetion_threads,
+        sizeof (pthread_t) * (connetion_threads_num + 1));
+    if (temp == NULL)
+    {
+      RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
+    }
+    else
+    {
+      connetion_threads = temp;
+      connetion_threads[connetion_threads_num] = pthread_self ();
+      connetion_threads_num++;
+    }
+  }
+  pthread_mutex_unlock (&connetion_threads_lock);
+  RRDD_LOG (LOG_DEBUG, "connection_thread_main: done");
+
   while (do_shutdown == 0)
   {
     struct pollfd pollfd;
@@ -368,13 +408,16 @@ static void *connection_thread_main (void *args) /* {{{ */
 
     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
     {
+      RRDD_LOG (LOG_DEBUG, "connection_thread_main: "
+          "poll(2) returned POLLHUP.");
       close (fd);
       break;
     }
     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
     {
-      RRDD_LOG (LOG_WARNING, "connection_thread_main: poll(2) returned "
-          "something unexpected.");
+      RRDD_LOG (LOG_WARNING, "connection_thread_main: "
+          "poll(2) returned something unexpected: %#04hx",
+          pollfd.revents);
       close (fd);
       break;
     }
@@ -533,27 +576,9 @@ static void *listen_thread_main (void *args) /* {{{ */
       continue;
     }
 
-    /* FIXME: bug: if the new thread is run first, it may run into the
-     *   assert (i < connetion_threads_num);
-     * assertion. */
-    pthread_mutex_lock (&connetion_threads_lock);
-    {
-      pthread_t *temp;
-
-      temp = (pthread_t *) realloc (connetion_threads,
-          sizeof (pthread_t) * (connetion_threads_num + 1));
-      if (temp == NULL)
-      {
-        RRDD_LOG (LOG_ERR, "listen_thread_main: realloc failed.");
-      }
-      else
-      {
-        connetion_threads = temp;
-        connetion_threads[connetion_threads_num] = tid;
-        connetion_threads_num++;
-      }
-    }
-    pthread_mutex_unlock (&connetion_threads_lock);
+    RRDD_LOG (LOG_DEBUG, "listen_thread_main: pthread_create succeeded: "
+        "tid = %lu",
+        *((unsigned long *) &tid));
   } /* while (do_shutdown == 0) */
 
   close_listen_sockets ();
@@ -619,6 +644,10 @@ static int daemonize (void) /* {{{ */
     memset (&sa, 0, sizeof (sa));
     sa.sa_handler = sig_term_handler;
     sigaction (SIGINT, &sa, NULL);
+
+    memset (&sa, 0, sizeof (sa));
+    sa.sa_handler = SIG_IGN;
+    sigaction (SIGPIPE, &sa, NULL);
   }
 
   openlog ("rrdd", LOG_PID, LOG_DAEMON);
@@ -648,19 +677,95 @@ static int cleanup (void) /* {{{ */
 
   do_shutdown++;
 
+  RRDD_LOG (LOG_DEBUG, "cleanup: Joining queue_thread..");
+  pthread_cond_signal (&cache_cond);
   pthread_join (queue_thread, /* return = */ NULL);
+  RRDD_LOG (LOG_DEBUG, "cleanup: done");
 
   closelog ();
 
   return (0);
 } /* }}} int cleanup */
 
+static int read_options (int argc, char **argv) /* {{{ */
+{
+  int option;
+  int status = 0;
+
+  while ((option = getopt(argc, argv, "l:f:w:h?")) != -1)
+  {
+    switch (option)
+    {
+      case 'l':
+      {
+        printf ("Listening to: %s\n", optarg);
+      }
+      break;
+
+      case 'f':
+      {
+        int temp;
+
+        temp = atoi (optarg);
+        if (temp > 0)
+          config_flush_interval = temp;
+        else
+        {
+          fprintf (stderr, "Invalid flush interval: %s\n", optarg);
+          status = 3;
+        }
+      }
+      break;
+
+      case 'w':
+      {
+        int temp;
+
+        temp = atoi (optarg);
+        if (temp > 0)
+          config_write_interval = temp;
+        else
+        {
+          fprintf (stderr, "Invalid write interval: %s\n", optarg);
+          status = 2;
+        }
+      }
+      break;
+
+      case 'h':
+      case '?':
+        printf ("RRDd %s  Copyright (C) 2008 Florian octo Forster\n"
+            "\n"
+            "Usage: rrdd [options]\n"
+            "\n"
+            "Valid options are:\n"
+            "  -l <address>  Socket address to listen to.\n"
+            "  -w <seconds>  Interval in which to write data.\n"
+            "  -f <seconds>  Interval in which to flush dead data.\n"
+            "\n"
+            "For more information and a detailed description of all options "
+            "please refer\n"
+            "to the rrdd(1) manual page.\n",
+            PACKAGE_VERSION);
+        status = -1;
+        break;
+    } /* switch (option) */
+  } /* while (getopt) */
+
+  return (status);
+} /* }}} int read_options */
+
 int main (int argc, char **argv)
 {
   int status;
 
-  printf ("%s by Florian Forster, Version %s\n",
-      PACKAGE_NAME, PACKAGE_VERSION);
+  status = read_options (argc, argv);
+  if (status != 0)
+  {
+    if (status < 0)
+      status = 0;
+    return (status);
+  }
 
   status = daemonize ();
   if (status == 1)