X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Frrdd.c;h=c2f79365859139643612123a43cfb377318b5e3f;hb=5b7997e7e5e35b9fcf08969527994307ccec3d19;hp=6625da3f2725c632b206dc18c05cf4339b00e33b;hpb=cbf9f74e67a794dfd4658172125a3154e0beffc8;p=rrdd.git diff --git a/src/rrdd.c b/src/rrdd.c index 6625da3..c2f7936 100644 --- a/src/rrdd.c +++ b/src/rrdd.c @@ -75,6 +75,9 @@ static cache_item_t *cache_queue_tail = NULL; static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t cache_cond = PTHREAD_COND_INITIALIZER; +static int config_write_interval = 300; +static int config_flush_interval = 3600; + /* * Functions */ @@ -122,6 +125,7 @@ static void *queue_thread_main (void *args) /* {{{ */ char *file; char **values; int values_num; + int status; int i; if (cache_queue_head == NULL) @@ -159,6 +163,14 @@ static void *queue_thread_main (void *args) /* {{{ */ RRDD_LOG (LOG_DEBUG, "queue_thread_main: rrd_update (%s, %i, %p)", file, values_num, (void *) values); + status = rrd_update_r (file, NULL, values_num, values); + if (status != 0) + { + RRDD_LOG (LOG_ERR, "queue_thread_main: " + "rrd_update_r failed with status %i.", + status); + } + free (file); for (i = 0; i < values_num; i++) free (values[i]); @@ -179,6 +191,7 @@ static int handle_request_update (int fd, /* {{{ */ char *value; char *buffer_ptr; int values_num = 0; + int status; time_t now; @@ -273,8 +286,7 @@ static int handle_request_update (int fd, /* {{{ */ values_num++; } - /* FIXME: Timeout should not be hard-coded. */ - if (((now - ci->last_flush_time) > 300) + if (((now - ci->last_flush_time) >= config_write_interval) && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)) { RRDD_LOG (LOG_DEBUG, "handle_request_update: Adding %s to the update queue.", @@ -296,7 +308,13 @@ static int handle_request_update (int fd, /* {{{ */ snprintf (answer, sizeof (answer), "0 Enqueued %i value(s)\n", values_num); answer[sizeof (answer) - 1] = 0; - write (fd, answer, sizeof (answer)); + status = write (fd, answer, sizeof (answer)); + if (status < 0) + { + status = errno; + RRDD_LOG (LOG_INFO, "handle_request_update: write(2) returned an error."); + return (status); + } return (0); } /* }}} int handle_request_update */ @@ -345,6 +363,28 @@ static void *connection_thread_main (void *args) /* {{{ */ fd = *((int *) args); + RRDD_LOG (LOG_DEBUG, "connection_thread_main: Adding myself to " + "connetion_threads[].."); + pthread_mutex_lock (&connetion_threads_lock); + { + pthread_t *temp; + + temp = (pthread_t *) realloc (connetion_threads, + sizeof (pthread_t) * (connetion_threads_num + 1)); + if (temp == NULL) + { + RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed."); + } + else + { + connetion_threads = temp; + connetion_threads[connetion_threads_num] = pthread_self (); + connetion_threads_num++; + } + } + pthread_mutex_unlock (&connetion_threads_lock); + RRDD_LOG (LOG_DEBUG, "connection_thread_main: done"); + while (do_shutdown == 0) { struct pollfd pollfd; @@ -368,13 +408,16 @@ static void *connection_thread_main (void *args) /* {{{ */ if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */ { + RRDD_LOG (LOG_DEBUG, "connection_thread_main: " + "poll(2) returned POLLHUP."); close (fd); break; } else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0) { - RRDD_LOG (LOG_WARNING, "connection_thread_main: poll(2) returned " - "something unexpected."); + RRDD_LOG (LOG_WARNING, "connection_thread_main: " + "poll(2) returned something unexpected: %#04hx", + pollfd.revents); close (fd); break; } @@ -533,27 +576,9 @@ static void *listen_thread_main (void *args) /* {{{ */ continue; } - /* FIXME: bug: if the new thread is run first, it may run into the - * assert (i < connetion_threads_num); - * assertion. */ - pthread_mutex_lock (&connetion_threads_lock); - { - pthread_t *temp; - - temp = (pthread_t *) realloc (connetion_threads, - sizeof (pthread_t) * (connetion_threads_num + 1)); - if (temp == NULL) - { - RRDD_LOG (LOG_ERR, "listen_thread_main: realloc failed."); - } - else - { - connetion_threads = temp; - connetion_threads[connetion_threads_num] = tid; - connetion_threads_num++; - } - } - pthread_mutex_unlock (&connetion_threads_lock); + RRDD_LOG (LOG_DEBUG, "listen_thread_main: pthread_create succeeded: " + "tid = %lu", + *((unsigned long *) &tid)); } /* while (do_shutdown == 0) */ close_listen_sockets (); @@ -619,6 +644,10 @@ static int daemonize (void) /* {{{ */ memset (&sa, 0, sizeof (sa)); sa.sa_handler = sig_term_handler; sigaction (SIGINT, &sa, NULL); + + memset (&sa, 0, sizeof (sa)); + sa.sa_handler = SIG_IGN; + sigaction (SIGPIPE, &sa, NULL); } openlog ("rrdd", LOG_PID, LOG_DAEMON); @@ -648,19 +677,95 @@ static int cleanup (void) /* {{{ */ do_shutdown++; + RRDD_LOG (LOG_DEBUG, "cleanup: Joining queue_thread.."); + pthread_cond_signal (&cache_cond); pthread_join (queue_thread, /* return = */ NULL); + RRDD_LOG (LOG_DEBUG, "cleanup: done"); closelog (); return (0); } /* }}} int cleanup */ +static int read_options (int argc, char **argv) /* {{{ */ +{ + int option; + int status = 0; + + while ((option = getopt(argc, argv, "l:f:w:h?")) != -1) + { + switch (option) + { + case 'l': + { + printf ("Listening to: %s\n", optarg); + } + break; + + case 'f': + { + int temp; + + temp = atoi (optarg); + if (temp > 0) + config_flush_interval = temp; + else + { + fprintf (stderr, "Invalid flush interval: %s\n", optarg); + status = 3; + } + } + break; + + case 'w': + { + int temp; + + temp = atoi (optarg); + if (temp > 0) + config_write_interval = temp; + else + { + fprintf (stderr, "Invalid write interval: %s\n", optarg); + status = 2; + } + } + break; + + case 'h': + case '?': + printf ("RRDd %s Copyright (C) 2008 Florian octo Forster\n" + "\n" + "Usage: rrdd [options]\n" + "\n" + "Valid options are:\n" + " -l
Socket address to listen to.\n" + " -w Interval in which to write data.\n" + " -f Interval in which to flush dead data.\n" + "\n" + "For more information and a detailed description of all options " + "please refer\n" + "to the rrdd(1) manual page.\n", + PACKAGE_VERSION); + status = -1; + break; + } /* switch (option) */ + } /* while (getopt) */ + + return (status); +} /* }}} int read_options */ + int main (int argc, char **argv) { int status; - printf ("%s by Florian Forster, Version %s\n", - PACKAGE_NAME, PACKAGE_VERSION); + status = read_options (argc, argv); + if (status != 0) + { + if (status < 0) + status = 0; + return (status); + } status = daemonize (); if (status == 1)