X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Frrdd.c;h=03b9bbc22860e68572fe20bff972eb0f5ba72171;hb=057ea5766358de9f0a821bad3a5ab86ee8886e71;hp=14543ce08e23eb0a2aa339e3519119d9106b2983;hpb=6e43a35ad73df70b79248ba7f30741ea85be4d5c;p=rrdd.git diff --git a/src/rrdd.c b/src/rrdd.c index 14543ce..03b9bbc 100644 --- a/src/rrdd.c +++ b/src/rrdd.c @@ -22,6 +22,8 @@ #define RRDD_DEBUG 1 #include "rrdd.h" +#include +#include #if RRDD_DEBUG # define RRDD_LOG(severity, ...) do { fprintf (stderr, __VA_ARGS__); fprintf (stderr, "\n"); } while (0) @@ -69,12 +71,18 @@ static pthread_mutex_t connetion_threads_lock = PTHREAD_MUTEX_INITIALIZER; static int connetion_threads_num = 0; /* Cache stuff */ -static avl_tree_t *cache_tree = NULL; +static GTree *cache_tree = NULL; static cache_item_t *cache_queue_head = NULL; static cache_item_t *cache_queue_tail = NULL; static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t cache_cond = PTHREAD_COND_INITIALIZER; +static int config_write_interval = 300; +static int config_flush_interval = 3600; + +static char **config_listen_address_list = NULL; +static int config_listen_address_list_len = 0; + /* * Functions */ @@ -88,46 +96,114 @@ static void sig_term_handler (int signal) /* {{{ */ do_shutdown++; } /* }}} void sig_term_handler */ -static int cache_tree_compare (const void *v0, const void *v1) /* {{{ */ +/* + * enqueue_cache_item: + * `cache_lock' must be acquired before calling this function! + */ +static int enqueue_cache_item (cache_item_t *ci) /* {{{ */ { - cache_item_t *c0 = (cache_item_t *) v0; - cache_item_t *c1 = (cache_item_t *) v1; + RRDD_LOG (LOG_DEBUG, "handle_request_update: Adding %s to the update queue.", + ci->file); + + if (ci == NULL) + return (-1); + + if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0) + return (-1); - assert (c0->file != NULL); - assert (c1->file != NULL); + assert (ci->next == NULL); - return (strcmp (c0->file, c1->file)); -} /* }}} int cache_tree_compare */ + if (cache_queue_tail == NULL) + cache_queue_head = ci; + else + cache_queue_tail->next = ci; + cache_queue_tail = ci; + + return (0); +} /* }}} int enqueue_cache_item */ -static void cache_tree_free (void *v) /* {{{ */ +/* + * tree_callback_flush: + * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held + * while this is in progress. + */ +static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */ + gpointer data) { - cache_item_t *c = (cache_item_t *) v; + cache_item_t *ci; + time_t now; - assert (c->values_num == 0); - assert ((c->flags & CI_FLAGS_IN_TREE) != 0); - assert ((c->flags & CI_FLAGS_IN_QUEUE) == 0); + ci = (cache_item_t *) value; + now = *((time_t *) data); - free (c->file); - c->file = NULL; - free (c); -} /* }}} void cache_tree_free */ + if (((now - ci->last_flush_time) >= config_write_interval) + && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)) + enqueue_cache_item (ci); + + return (TRUE); +} /* }}} gboolean tree_callback_flush */ static void *queue_thread_main (void *args) /* {{{ */ { + struct timeval now; + struct timespec next_flush; + + gettimeofday (&now, NULL); + next_flush.tv_sec = now.tv_sec + config_flush_interval; + next_flush.tv_nsec = 1000 * now.tv_usec; + pthread_mutex_lock (&cache_lock); while ((do_shutdown == 0) || (cache_queue_head != NULL)) { cache_item_t *ci; - char *file; char **values; int values_num; int status; int i; + /* First, check if it's time to do the cache flush. */ + gettimeofday (&now, NULL); + if ((now.tv_sec > next_flush.tv_sec) + || ((now.tv_sec == next_flush.tv_sec) + && ((1000 * now.tv_usec) > next_flush.tv_nsec))) + { + time_t time_now; + + /* Pass the current time as user data so that we don't need to call + * `time' for each node. */ + time_now = time (NULL); + + g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &time_now); + + /* Determine the time of the next cache flush. */ + while (next_flush.tv_sec < now.tv_sec) + next_flush.tv_sec += config_flush_interval; + } + + /* Now, check if there's something to store away. If not, wait until + * something comes in or it's time to do the cache flush. */ if (cache_queue_head == NULL) - pthread_cond_wait (&cache_cond, &cache_lock); + { + struct timespec timeout; + + timeout.tv_sec = next_flush.tv_sec - now.tv_sec; + if (next_flush.tv_nsec < (1000 * now.tv_usec)) + { + timeout.tv_sec--; + timeout.tv_nsec = 1000000000 + next_flush.tv_nsec + - (1000 * now.tv_usec); + } + else + { + timeout.tv_nsec = next_flush.tv_nsec - (1000 * now.tv_usec); + } + + pthread_cond_timedwait (&cache_cond, &cache_lock, &timeout); + } + /* Check if a value has arrived. This may be NULL if we timed out or there + * was an interrupt such as a signal. */ if (cache_queue_head == NULL) continue; @@ -160,7 +236,7 @@ static void *queue_thread_main (void *args) /* {{{ */ RRDD_LOG (LOG_DEBUG, "queue_thread_main: rrd_update (%s, %i, %p)", file, values_num, (void *) values); - status = rrd_update_r (file, NULL, values_num, values); + status = rrd_update_r (file, NULL, values_num, (void *) values); if (status != 0) { RRDD_LOG (LOG_ERR, "queue_thread_main: " @@ -188,13 +264,11 @@ static int handle_request_update (int fd, /* {{{ */ char *value; char *buffer_ptr; int values_num = 0; + int status; time_t now; - avl_node_t *node; - cache_item_t ci_temp; cache_item_t *ci; - char answer[4096]; now = time (NULL); @@ -207,12 +281,10 @@ static int handle_request_update (int fd, /* {{{ */ file = buffer_ptr; buffer_ptr += strlen (file) + 1; - ci_temp.file = file; - pthread_mutex_lock (&cache_lock); - node = avl_search (cache_tree, (void *) &ci_temp); - if (node == NULL) + ci = g_tree_lookup (cache_tree, file); + if (ci == NULL) { ci = (cache_item_t *) malloc (sizeof (cache_item_t)); if (ci == NULL) @@ -237,22 +309,11 @@ static int handle_request_update (int fd, /* {{{ */ ci->last_flush_time = now; ci->flags = CI_FLAGS_IN_TREE; - if (avl_insert (cache_tree, (void *) ci) == NULL) - { - pthread_mutex_unlock (&cache_lock); - RRDD_LOG (LOG_ERR, "handle_request_update: avl_insert failed."); - free (ci->file); - free (ci); - return (-1); - } + g_tree_insert (cache_tree, (void *) ci->file, (void *) ci); - RRDD_LOG (LOG_DEBUG, "handle_request_update: Created new AVL node %s.", + RRDD_LOG (LOG_DEBUG, "handle_request_update: Created new tree node %s.", ci->file); } - else /* if (ci != NULL) */ - { - ci = (cache_item_t *) node->item; - } assert (ci != NULL); while (*buffer_ptr != 0) @@ -282,21 +343,10 @@ static int handle_request_update (int fd, /* {{{ */ values_num++; } - /* FIXME: Timeout should not be hard-coded. */ - if (((now - ci->last_flush_time) > 300) + if (((now - ci->last_flush_time) >= config_write_interval) && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)) { - RRDD_LOG (LOG_DEBUG, "handle_request_update: Adding %s to the update queue.", - ci->file); - - assert (ci->next == NULL); - - if (cache_queue_tail == NULL) - cache_queue_head = ci; - else - cache_queue_tail->next = ci; - cache_queue_tail = ci; - + enqueue_cache_item (ci); pthread_cond_signal (&cache_cond); } @@ -305,7 +355,13 @@ static int handle_request_update (int fd, /* {{{ */ snprintf (answer, sizeof (answer), "0 Enqueued %i value(s)\n", values_num); answer[sizeof (answer) - 1] = 0; - write (fd, answer, sizeof (answer)); + status = write (fd, answer, sizeof (answer)); + if (status < 0) + { + status = errno; + RRDD_LOG (LOG_INFO, "handle_request_update: write(2) returned an error."); + return (status); + } return (0); } /* }}} int handle_request_update */ @@ -399,13 +455,16 @@ static void *connection_thread_main (void *args) /* {{{ */ if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */ { + RRDD_LOG (LOG_DEBUG, "connection_thread_main: " + "poll(2) returned POLLHUP."); close (fd); break; } else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0) { - RRDD_LOG (LOG_WARNING, "connection_thread_main: poll(2) returned " - "something unexpected."); + RRDD_LOG (LOG_WARNING, "connection_thread_main: " + "poll(2) returned something unexpected: %#04hx", + pollfd.revents); close (fd); break; } @@ -442,7 +501,7 @@ static void *connection_thread_main (void *args) /* {{{ */ return (NULL); } /* }}} void *connection_thread_main */ -static int open_listen_socket (const char *path) /* {{{ */ +static int open_listen_socket_unix (const char *path) /* {{{ */ { int fd; struct sockaddr_un sa; @@ -453,7 +512,7 @@ static int open_listen_socket (const char *path) /* {{{ */ sizeof (listen_fds[0]) * (listen_fds_num + 1)); if (temp == NULL) { - RRDD_LOG (LOG_ERR, "open_listen_socket: realloc failed."); + RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed."); return (-1); } listen_fds = temp; @@ -462,7 +521,7 @@ static int open_listen_socket (const char *path) /* {{{ */ fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0); if (fd < 0) { - RRDD_LOG (LOG_ERR, "open_listen_socket: socket(2) failed."); + RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed."); return (-1); } @@ -473,7 +532,7 @@ static int open_listen_socket (const char *path) /* {{{ */ status = bind (fd, (struct sockaddr *) &sa, sizeof (sa)); if (status != 0) { - RRDD_LOG (LOG_ERR, "open_listen_socket: bind(2) failed."); + RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed."); close (fd); unlink (path); return (-1); @@ -482,18 +541,97 @@ static int open_listen_socket (const char *path) /* {{{ */ status = listen (fd, /* backlog = */ 10); if (status != 0) { - RRDD_LOG (LOG_ERR, "open_listen_socket: listen(2) failed."); + RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed."); close (fd); unlink (path); return (-1); } listen_fds[listen_fds_num].fd = fd; - strncpy (listen_fds[listen_fds_num].path, path, - sizeof (listen_fds[listen_fds_num].path) - 1); + snprintf (listen_fds[listen_fds_num].path, + sizeof (listen_fds[listen_fds_num].path) - 1, + "unix:%s", path); listen_fds_num++; return (0); +} /* }}} int open_listen_socket_unix */ + +static int open_listen_socket (const char *addr) /* {{{ */ +{ + struct addrinfo ai_hints; + struct addrinfo *ai_res; + struct addrinfo *ai_ptr; + int status; + + assert (addr != NULL); + + if (strncmp ("unix:", addr, strlen ("unix:")) == 0) + return (open_listen_socket_unix (addr + strlen ("unix:"))); + else if (addr[0] == '/') + return (open_listen_socket_unix (addr)); + + memset (&ai_hints, 0, sizeof (ai_hints)); + ai_hints.ai_flags = 0; +#ifdef AI_ADDRCONFIG + ai_hints.ai_flags |= AI_ADDRCONFIG; +#endif + ai_hints.ai_family = AF_UNSPEC; + ai_hints.ai_socktype = SOCK_STREAM; + + ai_res = NULL; + status = getaddrinfo (addr, DEFAULT_PORT, &ai_hints, &ai_res); + if (status != 0) + { + RRDD_LOG (LOG_ERR, "open_listen_socket: getaddrinfo(%s) failed: " + "%s", addr, gai_strerror (status)); + return (-1); + } + + for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next) + { + int fd; + listen_socket_t *temp; + + temp = (listen_socket_t *) realloc (listen_fds, + sizeof (listen_fds[0]) * (listen_fds_num + 1)); + if (temp == NULL) + { + RRDD_LOG (LOG_ERR, "open_listen_socket: realloc failed."); + continue; + } + listen_fds = temp; + memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0])); + + fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol); + if (fd < 0) + { + RRDD_LOG (LOG_ERR, "open_listen_socket: socket(2) failed."); + continue; + } + + status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen); + if (status != 0) + { + RRDD_LOG (LOG_ERR, "open_listen_socket: bind(2) failed."); + close (fd); + continue; + } + + status = listen (fd, /* backlog = */ 10); + if (status != 0) + { + RRDD_LOG (LOG_ERR, "open_listen_socket: listen(2) failed."); + close (fd); + return (-1); + } + + listen_fds[listen_fds_num].fd = fd; + strncpy (listen_fds[listen_fds_num].path, addr, + sizeof (listen_fds[listen_fds_num].path) - 1); + listen_fds_num++; + } /* for (ai_ptr) */ + + return (0); } /* }}} int open_listen_socket */ static int close_listen_sockets (void) /* {{{ */ @@ -503,7 +641,8 @@ static int close_listen_sockets (void) /* {{{ */ for (i = 0; i < listen_fds_num; i++) { close (listen_fds[i].fd); - unlink (listen_fds[i].path); + if (strncmp ("unix:", listen_fds[i].path, strlen ("unix:")) == 0) + unlink (listen_fds[i].path + strlen ("unix:")); } free (listen_fds); @@ -515,58 +654,109 @@ static int close_listen_sockets (void) /* {{{ */ static void *listen_thread_main (void *args) /* {{{ */ { - char buffer[4096]; + struct pollfd *pollfds; + int pollfds_num; int status; int i; - status = open_listen_socket (RRDD_SOCK_PATH); - if (status != 0) + for (i = 0; i < config_listen_address_list_len; i++) { - RRDD_LOG (LOG_ERR, "listen_thread_main: open_listen_socket failed."); + RRDD_LOG (LOG_DEBUG, "listen_thread_main: config_listen_address_list[%i] " + "= %s", i, config_listen_address_list[i]); + open_listen_socket (config_listen_address_list[i]); + } + + if (config_listen_address_list_len < 1) + open_listen_socket (RRDD_SOCK_PATH); + + if (listen_fds_num < 1) + { + RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets " + "could be opened. Sorry."); return (NULL); } - while (do_shutdown == 0) + pollfds_num = listen_fds_num; + pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num); + if (pollfds == NULL) { - int *client_sd; - struct sockaddr_un client_sa; - socklen_t client_sa_size; - pthread_t tid; + RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed."); + return (NULL); + } + memset (pollfds, 0, sizeof (*pollfds) * pollfds_num); - client_sd = (int *) malloc (sizeof (int)); - if (client_sd == NULL) + while (do_shutdown == 0) + { + assert (pollfds_num == listen_fds_num); + for (i = 0; i < pollfds_num; i++) { - RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed."); - sleep (120); - continue; + pollfds[i].fd = listen_fds[i].fd; + pollfds[i].events = POLLIN | POLLPRI; + pollfds[i].revents = 0; } - client_sa_size = sizeof (client_sa); - /* FIXME: Don't implement listen_fds as a list or use poll(2) here! */ - *client_sd = accept (listen_fds[0].fd, - (struct sockaddr *) &client_sa, &client_sa_size); - if (*client_sd < 0) + status = poll (pollfds, pollfds_num, /* timeout = */ -1); + if (status < 1) { - RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed."); + status = errno; + if (status != EINTR) + { + RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed."); + } continue; } - RRDD_LOG (LOG_DEBUG, "listen_thread_main: accept(2) returned fd #%i.", - *client_sd); - - status = pthread_create (&tid, /* attr = */ NULL, connection_thread_main, - /* args = */ (void *) client_sd); - if (status != 0) + for (i = 0; i < pollfds_num; i++) { - RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed."); - close (*client_sd); - free (client_sd); - continue; - } + int *client_sd; + struct sockaddr_storage client_sa; + socklen_t client_sa_size; + pthread_t tid; + + if (pollfds[i].revents == 0) + continue; + + if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0) + { + RRDD_LOG (LOG_ERR, "listen_thread_main: " + "poll(2) returned something unexpected for listen FD #%i.", + pollfds[i].fd); + continue; + } + + client_sd = (int *) malloc (sizeof (int)); + if (client_sd == NULL) + { + RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed."); + continue; + } + + client_sa_size = sizeof (client_sa); + *client_sd = accept (pollfds[i].fd, + (struct sockaddr *) &client_sa, &client_sa_size); + if (*client_sd < 0) + { + RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed."); + continue; + } + + RRDD_LOG (LOG_DEBUG, "listen_thread_main: accept(2) returned fd #%i.", + *client_sd); + + status = pthread_create (&tid, /* attr = */ NULL, connection_thread_main, + /* args = */ (void *) client_sd); + if (status != 0) + { + RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed."); + close (*client_sd); + free (client_sd); + continue; + } - RRDD_LOG (LOG_DEBUG, "listen_thread_main: pthread_create succeeded: " - "tid = %lu", - *((unsigned long *) &tid)); + RRDD_LOG (LOG_DEBUG, "listen_thread_main: pthread_create succeeded: " + "tid = %lu", + *((unsigned long *) &tid)); + } /* for (pollfds_num) */ } /* while (do_shutdown == 0) */ close_listen_sockets (); @@ -591,7 +781,9 @@ static void *listen_thread_main (void *args) /* {{{ */ static int daemonize (void) /* {{{ */ { +#if !RRDD_DEBUG pid_t child; +#endif int status; #if !RRDD_DEBUG @@ -640,10 +832,10 @@ static int daemonize (void) /* {{{ */ openlog ("rrdd", LOG_PID, LOG_DAEMON); - cache_tree = avl_alloc_tree (cache_tree_compare, cache_tree_free); + cache_tree = g_tree_new ((GCompareFunc) strcmp); if (cache_tree == NULL) { - RRDD_LOG (LOG_ERR, "daemonize: avl_alloc_tree failed."); + RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed."); return (-1); } @@ -675,12 +867,102 @@ static int cleanup (void) /* {{{ */ return (0); } /* }}} int cleanup */ +static int read_options (int argc, char **argv) /* {{{ */ +{ + int option; + int status = 0; + + while ((option = getopt(argc, argv, "l:f:w:h?")) != -1) + { + switch (option) + { + case 'l': + { + char **temp; + + temp = (char **) realloc (config_listen_address_list, + sizeof (char *) * (config_listen_address_list_len + 1)); + if (temp == NULL) + { + fprintf (stderr, "read_options: realloc failed.\n"); + return (2); + } + config_listen_address_list = temp; + + temp[config_listen_address_list_len] = strdup (optarg); + if (temp[config_listen_address_list_len] == NULL) + { + fprintf (stderr, "read_options: strdup failed.\n"); + return (2); + } + config_listen_address_list_len++; + } + break; + + case 'f': + { + int temp; + + temp = atoi (optarg); + if (temp > 0) + config_flush_interval = temp; + else + { + fprintf (stderr, "Invalid flush interval: %s\n", optarg); + status = 3; + } + } + break; + + case 'w': + { + int temp; + + temp = atoi (optarg); + if (temp > 0) + config_write_interval = temp; + else + { + fprintf (stderr, "Invalid write interval: %s\n", optarg); + status = 2; + } + } + break; + + case 'h': + case '?': + printf ("RRDd %s Copyright (C) 2008 Florian octo Forster\n" + "\n" + "Usage: rrdd [options]\n" + "\n" + "Valid options are:\n" + " -l
Socket address to listen to.\n" + " -w Interval in which to write data.\n" + " -f Interval in which to flush dead data.\n" + "\n" + "For more information and a detailed description of all options " + "please refer\n" + "to the rrdd(1) manual page.\n", + PACKAGE_VERSION); + status = -1; + break; + } /* switch (option) */ + } /* while (getopt) */ + + return (status); +} /* }}} int read_options */ + int main (int argc, char **argv) { int status; - printf ("%s by Florian Forster, Version %s\n", - PACKAGE_NAME, PACKAGE_VERSION); + status = read_options (argc, argv); + if (status != 0) + { + if (status < 0) + status = 0; + return (status); + } status = daemonize (); if (status == 1)