static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cache_cond = PTHREAD_COND_INITIALIZER;
+static int config_write_interval = 300;
+static int config_flush_interval = 3600;
+
/*
* Functions
*/
char *file;
char **values;
int values_num;
+ int status;
int i;
if (cache_queue_head == NULL)
RRDD_LOG (LOG_DEBUG, "queue_thread_main: rrd_update (%s, %i, %p)",
file, values_num, (void *) values);
+ status = rrd_update_r (file, NULL, values_num, values);
+ if (status != 0)
+ {
+ RRDD_LOG (LOG_ERR, "queue_thread_main: "
+ "rrd_update_r failed with status %i.",
+ status);
+ }
+
free (file);
for (i = 0; i < values_num; i++)
free (values[i]);
char *value;
char *buffer_ptr;
int values_num = 0;
+ int status;
time_t now;
values_num++;
}
- /* FIXME: Timeout should not be hard-coded. */
- if (((now - ci->last_flush_time) > 300)
+ if (((now - ci->last_flush_time) >= config_write_interval)
&& ((ci->flags & CI_FLAGS_IN_QUEUE) == 0))
{
RRDD_LOG (LOG_DEBUG, "handle_request_update: Adding %s to the update queue.",
snprintf (answer, sizeof (answer), "0 Enqueued %i value(s)\n", values_num);
answer[sizeof (answer) - 1] = 0;
- write (fd, answer, sizeof (answer));
+ status = write (fd, answer, sizeof (answer));
+ if (status < 0)
+ {
+ status = errno;
+ RRDD_LOG (LOG_INFO, "handle_request_update: write(2) returned an error.");
+ return (status);
+ }
return (0);
} /* }}} int handle_request_update */
fd = *((int *) args);
+ RRDD_LOG (LOG_DEBUG, "connection_thread_main: Adding myself to "
+ "connetion_threads[]..");
+ pthread_mutex_lock (&connetion_threads_lock);
+ {
+ pthread_t *temp;
+
+ temp = (pthread_t *) realloc (connetion_threads,
+ sizeof (pthread_t) * (connetion_threads_num + 1));
+ if (temp == NULL)
+ {
+ RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
+ }
+ else
+ {
+ connetion_threads = temp;
+ connetion_threads[connetion_threads_num] = pthread_self ();
+ connetion_threads_num++;
+ }
+ }
+ pthread_mutex_unlock (&connetion_threads_lock);
+ RRDD_LOG (LOG_DEBUG, "connection_thread_main: done");
+
while (do_shutdown == 0)
{
struct pollfd pollfd;
if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
{
+ RRDD_LOG (LOG_DEBUG, "connection_thread_main: "
+ "poll(2) returned POLLHUP.");
close (fd);
break;
}
else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
{
- RRDD_LOG (LOG_WARNING, "connection_thread_main: poll(2) returned "
- "something unexpected.");
+ RRDD_LOG (LOG_WARNING, "connection_thread_main: "
+ "poll(2) returned something unexpected: %#04hx",
+ pollfd.revents);
close (fd);
break;
}
continue;
}
- /* FIXME: bug: if the new thread is run first, it may run into the
- * assert (i < connetion_threads_num);
- * assertion. */
- pthread_mutex_lock (&connetion_threads_lock);
- {
- pthread_t *temp;
-
- temp = (pthread_t *) realloc (connetion_threads,
- sizeof (pthread_t) * (connetion_threads_num + 1));
- if (temp == NULL)
- {
- RRDD_LOG (LOG_ERR, "listen_thread_main: realloc failed.");
- }
- else
- {
- connetion_threads = temp;
- connetion_threads[connetion_threads_num] = tid;
- connetion_threads_num++;
- }
- }
- pthread_mutex_unlock (&connetion_threads_lock);
+ RRDD_LOG (LOG_DEBUG, "listen_thread_main: pthread_create succeeded: "
+ "tid = %lu",
+ *((unsigned long *) &tid));
} /* while (do_shutdown == 0) */
close_listen_sockets ();
memset (&sa, 0, sizeof (sa));
sa.sa_handler = sig_term_handler;
sigaction (SIGINT, &sa, NULL);
+
+ memset (&sa, 0, sizeof (sa));
+ sa.sa_handler = SIG_IGN;
+ sigaction (SIGPIPE, &sa, NULL);
}
openlog ("rrdd", LOG_PID, LOG_DAEMON);
do_shutdown++;
+ RRDD_LOG (LOG_DEBUG, "cleanup: Joining queue_thread..");
+ pthread_cond_signal (&cache_cond);
pthread_join (queue_thread, /* return = */ NULL);
+ RRDD_LOG (LOG_DEBUG, "cleanup: done");
closelog ();
return (0);
} /* }}} int cleanup */
+static int read_options (int argc, char **argv) /* {{{ */
+{
+ int option;
+ int status = 0;
+
+ while ((option = getopt(argc, argv, "l:f:w:h?")) != -1)
+ {
+ switch (option)
+ {
+ case 'l':
+ {
+ printf ("Listening to: %s\n", optarg);
+ }
+ break;
+
+ case 'f':
+ {
+ int temp;
+
+ temp = atoi (optarg);
+ if (temp > 0)
+ config_flush_interval = temp;
+ else
+ {
+ fprintf (stderr, "Invalid flush interval: %s\n", optarg);
+ status = 3;
+ }
+ }
+ break;
+
+ case 'w':
+ {
+ int temp;
+
+ temp = atoi (optarg);
+ if (temp > 0)
+ config_write_interval = temp;
+ else
+ {
+ fprintf (stderr, "Invalid write interval: %s\n", optarg);
+ status = 2;
+ }
+ }
+ break;
+
+ case 'h':
+ case '?':
+ printf ("RRDd %s Copyright (C) 2008 Florian octo Forster\n"
+ "\n"
+ "Usage: rrdd [options]\n"
+ "\n"
+ "Valid options are:\n"
+ " -l <address> Socket address to listen to.\n"
+ " -w <seconds> Interval in which to write data.\n"
+ " -f <seconds> Interval in which to flush dead data.\n"
+ "\n"
+ "For more information and a detailed description of all options "
+ "please refer\n"
+ "to the rrdd(1) manual page.\n",
+ PACKAGE_VERSION);
+ status = -1;
+ break;
+ } /* switch (option) */
+ } /* while (getopt) */
+
+ return (status);
+} /* }}} int read_options */
+
int main (int argc, char **argv)
{
int status;
- printf ("%s by Florian Forster, Version %s\n",
- PACKAGE_NAME, PACKAGE_VERSION);
+ status = read_options (argc, argv);
+ if (status != 0)
+ {
+ if (status < 0)
+ status = 0;
+ return (status);
+ }
status = daemonize ();
if (status == 1)