From cbf9f74e67a794dfd4658172125a3154e0beffc8 Mon Sep 17 00:00:00 2001 From: Florian Forster Date: Sat, 21 Jun 2008 15:44:43 +0200 Subject: [PATCH] src/rrdd.c: A first take at implementing a cache. This is WORK IN PROGRESS and not working correctly yet! Also in this commit: Many other unstructured changes, since this we're still in the big bang phase of this project ;) --- src/rrdd.c | 470 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 443 insertions(+), 27 deletions(-) diff --git a/src/rrdd.c b/src/rrdd.c index 13fdc90..6625da3 100644 --- a/src/rrdd.c +++ b/src/rrdd.c @@ -19,8 +19,16 @@ * Florian octo Forster **/ +#define RRDD_DEBUG 1 + #include "rrdd.h" +#if RRDD_DEBUG +# define RRDD_LOG(severity, ...) do { fprintf (stderr, __VA_ARGS__); fprintf (stderr, "\n"); } while (0) +#else +# define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__) +#endif + /* * Types */ @@ -31,6 +39,21 @@ struct listen_socket_s }; typedef struct listen_socket_s listen_socket_t; +struct cache_item_s; +typedef struct cache_item_s cache_item_t; +struct cache_item_s +{ + char *file; + char **values; + int values_num; + time_t last_flush_time; +#define CI_FLAGS_IN_TREE 0x01 +#define CI_FLAGS_IN_QUEUE 0x02 + int flags; + + cache_item_t *next; +}; + /* * Variables */ @@ -41,31 +64,352 @@ static int do_shutdown = 0; static pthread_t queue_thread; +static pthread_t *connetion_threads = NULL; +static pthread_mutex_t connetion_threads_lock = PTHREAD_MUTEX_INITIALIZER; +static int connetion_threads_num = 0; + +/* Cache stuff */ +static avl_tree_t *cache_tree = NULL; +static cache_item_t *cache_queue_head = NULL; +static cache_item_t *cache_queue_tail = NULL; +static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t cache_cond = PTHREAD_COND_INITIALIZER; + /* * Functions */ +static void sig_int_handler (int signal) /* {{{ */ +{ + do_shutdown++; +} /* }}} void sig_int_handler */ + +static void sig_term_handler (int signal) /* {{{ */ +{ + do_shutdown++; +} /* }}} void sig_term_handler */ + +static int cache_tree_compare (const void *v0, const void *v1) /* {{{ */ +{ + cache_item_t *c0 = (cache_item_t *) v0; + cache_item_t *c1 = (cache_item_t *) v1; + + assert (c0->file != NULL); + assert (c1->file != NULL); + + return (strcmp (c0->file, c1->file)); +} /* }}} int cache_tree_compare */ + +static void cache_tree_free (void *v) /* {{{ */ +{ + cache_item_t *c = (cache_item_t *) v; + + assert (c->values_num == 0); + assert ((c->flags & CI_FLAGS_IN_TREE) != 0); + assert ((c->flags & CI_FLAGS_IN_QUEUE) == 0); + + free (c->file); + c->file = NULL; + free (c); +} /* }}} void cache_tree_free */ + static void *queue_thread_main (void *args) /* {{{ */ { - while (do_shutdown == 0) + pthread_mutex_lock (&cache_lock); + while ((do_shutdown == 0) || (cache_queue_head != NULL)) { - syslog (LOG_DEBUG, "queue_thread_main: Just woke up."); - sleep (1); + cache_item_t *ci; + + char *file; + char **values; + int values_num; + int i; + + if (cache_queue_head == NULL) + pthread_cond_wait (&cache_cond, &cache_lock); + + if (cache_queue_head == NULL) + continue; + + ci = cache_queue_head; + + /* copy the relevant parts */ + file = strdup (ci->file); + if (file == NULL) + { + RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed."); + continue; + } + + values = ci->values; + values_num = ci->values_num; + + ci->values = NULL; + ci->values_num = 0; + + ci->last_flush_time = time (NULL); + ci->flags &= ~(CI_FLAGS_IN_QUEUE); + + cache_queue_head = ci->next; + if (cache_queue_head == NULL) + cache_queue_tail = NULL; + ci->next = NULL; + + pthread_mutex_unlock (&cache_lock); + + RRDD_LOG (LOG_DEBUG, "queue_thread_main: rrd_update (%s, %i, %p)", + file, values_num, (void *) values); + + free (file); + for (i = 0; i < values_num; i++) + free (values[i]); + + pthread_mutex_lock (&cache_lock); } /* while (do_shutdown == 0) */ + pthread_mutex_unlock (&cache_lock); - syslog (LOG_DEBUG, "queue_thread_main: Exiting."); + RRDD_LOG (LOG_DEBUG, "queue_thread_main: Exiting."); return (NULL); } /* }}} void *queue_thread_main */ -static void sig_int_handler (int signal) /* {{{ */ +static int handle_request_update (int fd, /* {{{ */ + char *buffer, int buffer_size) { - do_shutdown++; -} /* }}} void sig_int_handler */ + char *file; + char *value; + char *buffer_ptr; + int values_num = 0; -static void sig_term_handler (int signal) /* {{{ */ + time_t now; + + avl_node_t *node; + cache_item_t ci_temp; + cache_item_t *ci; + + char answer[4096]; + + now = time (NULL); + + RRDD_LOG (LOG_DEBUG, "handle_request_update (%i, %p, %i)", + fd, (void *) buffer, buffer_size); + + buffer_ptr = buffer; + + file = buffer_ptr; + buffer_ptr += strlen (file) + 1; + + ci_temp.file = file; + + pthread_mutex_lock (&cache_lock); + + node = avl_search (cache_tree, (void *) &ci_temp); + if (node == NULL) + { + ci = (cache_item_t *) malloc (sizeof (cache_item_t)); + if (ci == NULL) + { + pthread_mutex_unlock (&cache_lock); + RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed."); + return (-1); + } + memset (ci, 0, sizeof (cache_item_t)); + + ci->file = strdup (file); + if (ci->file == NULL) + { + pthread_mutex_unlock (&cache_lock); + RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed."); + free (ci); + return (-1); + } + + ci->values = NULL; + ci->values_num = 0; + ci->last_flush_time = now; + ci->flags = CI_FLAGS_IN_TREE; + + if (avl_insert (cache_tree, (void *) ci) == NULL) + { + pthread_mutex_unlock (&cache_lock); + RRDD_LOG (LOG_ERR, "handle_request_update: avl_insert failed."); + free (ci->file); + free (ci); + return (-1); + } + + RRDD_LOG (LOG_DEBUG, "handle_request_update: Created new AVL node %s.", + ci->file); + } + else /* if (ci != NULL) */ + { + ci = (cache_item_t *) node->item; + } + assert (ci != NULL); + + while (*buffer_ptr != 0) + { + char **temp; + + value = buffer_ptr; + buffer_ptr += strlen (value) + 1; + + temp = (char **) realloc (ci->values, + sizeof (char *) * (ci->values_num + 1)); + if (temp == NULL) + { + RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed."); + continue; + } + ci->values = temp; + + ci->values[ci->values_num] = strdup (value); + if (ci->values[ci->values_num] == NULL) + { + RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed."); + continue; + } + ci->values_num++; + + values_num++; + } + + /* FIXME: Timeout should not be hard-coded. */ + if (((now - ci->last_flush_time) > 300) + && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)) + { + RRDD_LOG (LOG_DEBUG, "handle_request_update: Adding %s to the update queue.", + ci->file); + + assert (ci->next == NULL); + + if (cache_queue_tail == NULL) + cache_queue_head = ci; + else + cache_queue_tail->next = ci; + cache_queue_tail = ci; + + pthread_cond_signal (&cache_cond); + } + + pthread_mutex_unlock (&cache_lock); + + snprintf (answer, sizeof (answer), "0 Enqueued %i value(s)\n", values_num); + answer[sizeof (answer) - 1] = 0; + + write (fd, answer, sizeof (answer)); + + return (0); +} /* }}} int handle_request_update */ + +static int handle_request (int fd) /* {{{ */ { - do_shutdown++; -} /* }}} void sig_term_handler */ + char buffer[4096]; + int buffer_size; + + RRDD_LOG (LOG_DEBUG, "handle_request (%i)", fd); + + buffer_size = read (fd, buffer, sizeof (buffer)); + if (buffer_size < 1) + { + RRDD_LOG (LOG_ERR, "handle_request: read(2) failed."); + return (-1); + } + assert (((size_t) buffer_size) <= sizeof (buffer)); + + if ((buffer[buffer_size - 2] != 0) + || (buffer[buffer_size - 1] != 0)) + { + RRDD_LOG (LOG_INFO, "handle_request: malformed request."); + return (-1); + } + + /* fields in the buffer a separated by null bytes. */ + if (strcmp (buffer, "update") == 0) + { + int offset = strlen ("update") + 1; + return (handle_request_update (fd, buffer + offset, + buffer_size - offset)); + } + else + { + RRDD_LOG (LOG_INFO, "handle_request: unknown command: %s.", buffer); + return (-1); + } +} /* }}} int handle_request */ + +static void *connection_thread_main (void *args) /* {{{ */ +{ + pthread_t self; + int i; + int fd; + + fd = *((int *) args); + + while (do_shutdown == 0) + { + struct pollfd pollfd; + int status; + + pollfd.fd = fd; + pollfd.events = POLLIN | POLLPRI; + pollfd.revents = 0; + + status = poll (&pollfd, 1, /* timeout = */ 500); + if (status == 0) /* timeout */ + continue; + else if (status < 0) /* error */ + { + status = errno; + if (status == EINTR) + continue; + RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed."); + continue; + } + + if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */ + { + close (fd); + break; + } + else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0) + { + RRDD_LOG (LOG_WARNING, "connection_thread_main: poll(2) returned " + "something unexpected."); + close (fd); + break; + } + + status = handle_request (fd); + if (status != 0) + { + close (fd); + break; + } + } + + self = pthread_self (); + /* Remove this thread from the connection threads list */ + pthread_mutex_lock (&connetion_threads_lock); + /* Find out own index in the array */ + for (i = 0; i < connetion_threads_num; i++) + if (pthread_equal (connetion_threads[i], self) != 0) + break; + assert (i < connetion_threads_num); + + /* Move the trailing threads forward. */ + if (i < (connetion_threads_num - 1)) + { + memmove (connetion_threads + i, + connetion_threads + i + 1, + sizeof (pthread_t) * (connetion_threads_num - i - 1)); + } + + connetion_threads_num--; + pthread_mutex_unlock (&connetion_threads_lock); + + free (args); + return (NULL); +} /* }}} void *connection_thread_main */ static int open_listen_socket (const char *path) /* {{{ */ { @@ -78,7 +422,7 @@ static int open_listen_socket (const char *path) /* {{{ */ sizeof (listen_fds[0]) * (listen_fds_num + 1)); if (temp == NULL) { - syslog (LOG_ERR, "open_listen_socket: realloc failed.\n"); + RRDD_LOG (LOG_ERR, "open_listen_socket: realloc failed."); return (-1); } listen_fds = temp; @@ -87,7 +431,7 @@ static int open_listen_socket (const char *path) /* {{{ */ fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0); if (fd < 0) { - syslog (LOG_ERR, "open_listen_socket: socket(2) failed.\n"); + RRDD_LOG (LOG_ERR, "open_listen_socket: socket(2) failed."); return (-1); } @@ -98,8 +442,18 @@ static int open_listen_socket (const char *path) /* {{{ */ status = bind (fd, (struct sockaddr *) &sa, sizeof (sa)); if (status != 0) { - syslog (LOG_ERR, "open_listen_socket: bind(2) failed.\n"); + RRDD_LOG (LOG_ERR, "open_listen_socket: bind(2) failed."); close (fd); + unlink (path); + return (-1); + } + + status = listen (fd, /* backlog = */ 10); + if (status != 0) + { + RRDD_LOG (LOG_ERR, "open_listen_socket: listen(2) failed."); + close (fd); + unlink (path); return (-1); } @@ -132,41 +486,92 @@ static void *listen_thread_main (void *args) /* {{{ */ { char buffer[4096]; int status; + int i; status = open_listen_socket (RRDD_SOCK_PATH); if (status != 0) { - syslog (LOG_ERR, "listen_thread_main: open_listen_socket failed."); + RRDD_LOG (LOG_ERR, "listen_thread_main: open_listen_socket failed."); return (NULL); } while (do_shutdown == 0) { - syslog (LOG_DEBUG, "listen_thread_main: Just woke up."); + int *client_sd; + struct sockaddr_un client_sa; + socklen_t client_sa_size; + pthread_t tid; - status = recv (listen_fds[0].fd, buffer, sizeof (buffer), - /* flags = */ 0); - if (status == 0) + client_sd = (int *) malloc (sizeof (int)); + if (client_sd == NULL) { + RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed."); + sleep (120); continue; } - else if (status < 0) + + client_sa_size = sizeof (client_sa); + /* FIXME: Don't implement listen_fds as a list or use poll(2) here! */ + *client_sd = accept (listen_fds[0].fd, + (struct sockaddr *) &client_sa, &client_sa_size); + if (*client_sd < 0) { - if (errno == EINTR) - continue; + RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed."); + continue; + } + + RRDD_LOG (LOG_DEBUG, "listen_thread_main: accept(2) returned fd #%i.", + *client_sd); + + status = pthread_create (&tid, /* attr = */ NULL, connection_thread_main, + /* args = */ (void *) client_sd); + if (status != 0) + { + RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed."); + close (*client_sd); + free (client_sd); + continue; + } + + /* FIXME: bug: if the new thread is run first, it may run into the + * assert (i < connetion_threads_num); + * assertion. */ + pthread_mutex_lock (&connetion_threads_lock); + { + pthread_t *temp; + + temp = (pthread_t *) realloc (connetion_threads, + sizeof (pthread_t) * (connetion_threads_num + 1)); + if (temp == NULL) + { + RRDD_LOG (LOG_ERR, "listen_thread_main: realloc failed."); + } else { - syslog (LOG_ERR, "listen_thread_main: recv failed."); - continue; + connetion_threads = temp; + connetion_threads[connetion_threads_num] = tid; + connetion_threads_num++; } } - - syslog (LOG_DEBUG, "listen_thread_main: Received %i bytes.\n", status); + pthread_mutex_unlock (&connetion_threads_lock); } /* while (do_shutdown == 0) */ close_listen_sockets (); - syslog (LOG_DEBUG, "listen_thread_main: Exiting."); + pthread_mutex_lock (&connetion_threads_lock); + while (connetion_threads_num > 0) + { + pthread_t wait_for; + + wait_for = connetion_threads[0]; + + pthread_mutex_unlock (&connetion_threads_lock); + pthread_join (wait_for, /* retval = */ NULL); + pthread_mutex_lock (&connetion_threads_lock); + } + pthread_mutex_unlock (&connetion_threads_lock); + + RRDD_LOG (LOG_DEBUG, "listen_thread_main: Exiting."); return (NULL); } /* }}} void *listen_thread_main */ @@ -176,6 +581,7 @@ static int daemonize (void) /* {{{ */ pid_t child; int status; +#if !RRDD_DEBUG child = fork (); if (child < 0) { @@ -201,6 +607,7 @@ static int daemonize (void) /* {{{ */ open ("/dev/null", O_RDWR); dup (0); dup (0); +#endif /* RRDD_DEBUG */ { struct sigaction sa; @@ -216,12 +623,19 @@ static int daemonize (void) /* {{{ */ openlog ("rrdd", LOG_PID, LOG_DAEMON); + cache_tree = avl_alloc_tree (cache_tree_compare, cache_tree_free); + if (cache_tree == NULL) + { + RRDD_LOG (LOG_ERR, "daemonize: avl_alloc_tree failed."); + return (-1); + } + memset (&queue_thread, 0, sizeof (queue_thread)); status = pthread_create (&queue_thread, /* attr = */ NULL, queue_thread_main, /* args = */ NULL); if (status != 0) { - syslog (LOG_ERR, "daemonize: pthread_create failed."); + RRDD_LOG (LOG_ERR, "daemonize: pthread_create failed."); return (-1); } @@ -230,6 +644,8 @@ static int daemonize (void) /* {{{ */ static int cleanup (void) /* {{{ */ { + RRDD_LOG (LOG_DEBUG, "cleanup ()"); + do_shutdown++; pthread_join (queue_thread, /* return = */ NULL); -- 2.11.0