2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008 Florian octo Forster
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the
7 * Free Software Foundation; only version 2 of the License is applicable.
9 * This program is distributed in the hope that it will be useful, but
10 * WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * General Public License for more details.
14 * You should have received a copy of the GNU General Public License along
15 * with this program; if not, write to the Free Software Foundation, Inc.,
16 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 * Florian octo Forster <octo at verplant.org>
23 * First tell the compiler to stick to the C99 and POSIX standards as close as
26 #ifndef __STRICT_ANSI__ /* {{{ */
27 # define __STRICT_ANSI__
30 #ifndef _ISOC99_SOURCE
31 # define _ISOC99_SOURCE
34 #ifdef _POSIX_C_SOURCE
35 # undef _POSIX_C_SOURCE
37 #define _POSIX_C_SOURCE 200112L
39 /* Single UNIX needed for strdup. */
43 #define _XOPEN_SOURCE 500
59 * Now for some includes..
61 #include "rrd.h" /* {{{ */
62 #include "rrd_client.h"
70 #include <sys/types.h>
74 #include <sys/socket.h>
85 #include <glib-2.0/glib.h>
88 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
91 # define __attribute__(x) /**/
95 /* FIXME: I don't want to declare strdup myself! */
96 extern char *strdup (const char *s);
102 struct listen_socket_s
105 char path[PATH_MAX + 1];
107 typedef struct listen_socket_s listen_socket_t;
110 typedef struct cache_item_s cache_item_t;
116 time_t last_flush_time;
117 #define CI_FLAGS_IN_TREE 0x01
118 #define CI_FLAGS_IN_QUEUE 0x02
127 static listen_socket_t *listen_fds = NULL;
128 static size_t listen_fds_num = 0;
130 static int do_shutdown = 0;
132 static pthread_t queue_thread;
134 static pthread_t *connetion_threads = NULL;
135 static pthread_mutex_t connetion_threads_lock = PTHREAD_MUTEX_INITIALIZER;
136 static int connetion_threads_num = 0;
139 static GTree *cache_tree = NULL;
140 static cache_item_t *cache_queue_head = NULL;
141 static cache_item_t *cache_queue_tail = NULL;
142 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
143 static pthread_cond_t cache_cond = PTHREAD_COND_INITIALIZER;
145 static int config_write_interval = 300;
146 static int config_flush_interval = 3600;
148 static char **config_listen_address_list = NULL;
149 static int config_listen_address_list_len = 0;
154 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
157 } /* }}} void sig_int_handler */
159 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
162 } /* }}} void sig_term_handler */
165 * enqueue_cache_item:
166 * `cache_lock' must be acquired before calling this function!
168 static int enqueue_cache_item (cache_item_t *ci) /* {{{ */
170 RRDD_LOG (LOG_DEBUG, "handle_request_update: Adding %s to the update queue.",
176 if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
179 assert (ci->next == NULL);
181 if (cache_queue_tail == NULL)
182 cache_queue_head = ci;
184 cache_queue_tail->next = ci;
185 cache_queue_tail = ci;
188 } /* }}} int enqueue_cache_item */
191 * tree_callback_flush:
192 * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
193 * while this is in progress.
195 static gboolean tree_callback_flush (gpointer key /* {{{ */
196 __attribute__((unused)), gpointer value, gpointer data)
201 key = NULL; /* make compiler happy */
203 ci = (cache_item_t *) value;
204 now = *((time_t *) data);
206 if (((now - ci->last_flush_time) >= config_write_interval)
207 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0))
208 enqueue_cache_item (ci);
211 } /* }}} gboolean tree_callback_flush */
213 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
216 struct timespec next_flush;
218 gettimeofday (&now, NULL);
219 next_flush.tv_sec = now.tv_sec + config_flush_interval;
220 next_flush.tv_nsec = 1000 * now.tv_usec;
222 pthread_mutex_lock (&cache_lock);
223 while ((do_shutdown == 0) || (cache_queue_head != NULL))
232 /* First, check if it's time to do the cache flush. */
233 gettimeofday (&now, NULL);
234 if ((now.tv_sec > next_flush.tv_sec)
235 || ((now.tv_sec == next_flush.tv_sec)
236 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
240 /* Pass the current time as user data so that we don't need to call
241 * `time' for each node. */
242 time_now = time (NULL);
244 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &time_now);
246 /* Determine the time of the next cache flush. */
247 while (next_flush.tv_sec < now.tv_sec)
248 next_flush.tv_sec += config_flush_interval;
251 /* Now, check if there's something to store away. If not, wait until
252 * something comes in or it's time to do the cache flush. */
253 if (cache_queue_head == NULL)
255 status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
256 if ((status != 0) && (status != ETIMEDOUT))
258 RRDD_LOG (LOG_ERR, "queue_thread_main: "
259 "pthread_cond_timedwait returned %i.", status);
263 /* Check if a value has arrived. This may be NULL if we timed out or there
264 * was an interrupt such as a signal. */
265 if (cache_queue_head == NULL)
268 ci = cache_queue_head;
270 /* copy the relevant parts */
271 file = strdup (ci->file);
274 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
279 values_num = ci->values_num;
284 ci->last_flush_time = time (NULL);
285 ci->flags &= ~(CI_FLAGS_IN_QUEUE);
287 cache_queue_head = ci->next;
288 if (cache_queue_head == NULL)
289 cache_queue_tail = NULL;
292 pthread_mutex_unlock (&cache_lock);
294 RRDD_LOG (LOG_DEBUG, "queue_thread_main: rrd_update (%s, %i, %p)",
295 file, values_num, (void *) values);
297 status = rrd_update_r (file, NULL, values_num, (void *) values);
300 RRDD_LOG (LOG_ERR, "queue_thread_main: "
301 "rrd_update_r failed with status %i.",
306 for (i = 0; i < values_num; i++)
309 pthread_mutex_lock (&cache_lock);
310 } /* while (do_shutdown == 0) */
311 pthread_mutex_unlock (&cache_lock);
313 RRDD_LOG (LOG_DEBUG, "queue_thread_main: Exiting.");
316 } /* }}} void *queue_thread_main */
318 static int handle_request_update (int fd, /* {{{ */
319 char *buffer, int buffer_size)
334 RRDD_LOG (LOG_DEBUG, "handle_request_update (%i, %p, %i)",
335 fd, (void *) buffer, buffer_size);
340 buffer_ptr += strlen (file) + 1;
342 pthread_mutex_lock (&cache_lock);
344 ci = g_tree_lookup (cache_tree, file);
347 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
350 pthread_mutex_unlock (&cache_lock);
351 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
354 memset (ci, 0, sizeof (cache_item_t));
356 ci->file = strdup (file);
357 if (ci->file == NULL)
359 pthread_mutex_unlock (&cache_lock);
360 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
367 ci->last_flush_time = now;
368 ci->flags = CI_FLAGS_IN_TREE;
370 g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
372 RRDD_LOG (LOG_DEBUG, "handle_request_update: Created new tree node %s.",
377 while (*buffer_ptr != 0)
382 buffer_ptr += strlen (value) + 1;
384 temp = (char **) realloc (ci->values,
385 sizeof (char *) * (ci->values_num + 1));
388 RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
393 ci->values[ci->values_num] = strdup (value);
394 if (ci->values[ci->values_num] == NULL)
396 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
404 if (((now - ci->last_flush_time) >= config_write_interval)
405 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0))
407 enqueue_cache_item (ci);
408 pthread_cond_signal (&cache_cond);
411 pthread_mutex_unlock (&cache_lock);
413 snprintf (answer, sizeof (answer), "0 Enqueued %i value(s)\n", values_num);
414 answer[sizeof (answer) - 1] = 0;
416 status = write (fd, answer, sizeof (answer));
420 RRDD_LOG (LOG_INFO, "handle_request_update: write(2) returned an error.");
425 } /* }}} int handle_request_update */
427 static int handle_request (int fd) /* {{{ */
432 RRDD_LOG (LOG_DEBUG, "handle_request (%i)", fd);
434 buffer_size = read (fd, buffer, sizeof (buffer));
437 RRDD_LOG (LOG_ERR, "handle_request: read(2) failed.");
440 assert (((size_t) buffer_size) <= sizeof (buffer));
442 if ((buffer[buffer_size - 2] != 0)
443 || (buffer[buffer_size - 1] != 0))
445 RRDD_LOG (LOG_INFO, "handle_request: malformed request.");
449 /* fields in the buffer a separated by null bytes. */
450 if (strcmp (buffer, "update") == 0)
452 int offset = strlen ("update") + 1;
453 return (handle_request_update (fd, buffer + offset,
454 buffer_size - offset));
458 RRDD_LOG (LOG_INFO, "handle_request: unknown command: %s.", buffer);
461 } /* }}} int handle_request */
463 static void *connection_thread_main (void *args /* {{{ */
464 __attribute__((unused)))
470 fd = *((int *) args);
472 RRDD_LOG (LOG_DEBUG, "connection_thread_main: Adding myself to "
473 "connetion_threads[]..");
474 pthread_mutex_lock (&connetion_threads_lock);
478 temp = (pthread_t *) realloc (connetion_threads,
479 sizeof (pthread_t) * (connetion_threads_num + 1));
482 RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
486 connetion_threads = temp;
487 connetion_threads[connetion_threads_num] = pthread_self ();
488 connetion_threads_num++;
491 pthread_mutex_unlock (&connetion_threads_lock);
492 RRDD_LOG (LOG_DEBUG, "connection_thread_main: done");
494 while (do_shutdown == 0)
496 struct pollfd pollfd;
500 pollfd.events = POLLIN | POLLPRI;
503 status = poll (&pollfd, 1, /* timeout = */ 500);
504 if (status == 0) /* timeout */
506 else if (status < 0) /* error */
511 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
515 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
517 RRDD_LOG (LOG_DEBUG, "connection_thread_main: "
518 "poll(2) returned POLLHUP.");
522 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
524 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
525 "poll(2) returned something unexpected: %#04hx",
531 status = handle_request (fd);
539 self = pthread_self ();
540 /* Remove this thread from the connection threads list */
541 pthread_mutex_lock (&connetion_threads_lock);
542 /* Find out own index in the array */
543 for (i = 0; i < connetion_threads_num; i++)
544 if (pthread_equal (connetion_threads[i], self) != 0)
546 assert (i < connetion_threads_num);
548 /* Move the trailing threads forward. */
549 if (i < (connetion_threads_num - 1))
551 memmove (connetion_threads + i,
552 connetion_threads + i + 1,
553 sizeof (pthread_t) * (connetion_threads_num - i - 1));
556 connetion_threads_num--;
557 pthread_mutex_unlock (&connetion_threads_lock);
561 } /* }}} void *connection_thread_main */
563 static int open_listen_socket_unix (const char *path) /* {{{ */
566 struct sockaddr_un sa;
567 listen_socket_t *temp;
570 temp = (listen_socket_t *) realloc (listen_fds,
571 sizeof (listen_fds[0]) * (listen_fds_num + 1));
574 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
578 memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
580 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
583 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
587 memset (&sa, 0, sizeof (sa));
588 sa.sun_family = AF_UNIX;
589 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
591 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
594 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
600 status = listen (fd, /* backlog = */ 10);
603 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
609 listen_fds[listen_fds_num].fd = fd;
610 snprintf (listen_fds[listen_fds_num].path,
611 sizeof (listen_fds[listen_fds_num].path) - 1,
616 } /* }}} int open_listen_socket_unix */
618 static int open_listen_socket (const char *addr) /* {{{ */
620 struct addrinfo ai_hints;
621 struct addrinfo *ai_res;
622 struct addrinfo *ai_ptr;
625 assert (addr != NULL);
627 if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
628 return (open_listen_socket_unix (addr + strlen ("unix:")));
629 else if (addr[0] == '/')
630 return (open_listen_socket_unix (addr));
632 memset (&ai_hints, 0, sizeof (ai_hints));
633 ai_hints.ai_flags = 0;
635 ai_hints.ai_flags |= AI_ADDRCONFIG;
637 ai_hints.ai_family = AF_UNSPEC;
638 ai_hints.ai_socktype = SOCK_STREAM;
641 status = getaddrinfo (addr, DEFAULT_PORT, &ai_hints, &ai_res);
644 RRDD_LOG (LOG_ERR, "open_listen_socket: getaddrinfo(%s) failed: "
645 "%s", addr, gai_strerror (status));
649 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
652 listen_socket_t *temp;
654 temp = (listen_socket_t *) realloc (listen_fds,
655 sizeof (listen_fds[0]) * (listen_fds_num + 1));
658 RRDD_LOG (LOG_ERR, "open_listen_socket: realloc failed.");
662 memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
664 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
667 RRDD_LOG (LOG_ERR, "open_listen_socket: socket(2) failed.");
671 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
674 RRDD_LOG (LOG_ERR, "open_listen_socket: bind(2) failed.");
679 status = listen (fd, /* backlog = */ 10);
682 RRDD_LOG (LOG_ERR, "open_listen_socket: listen(2) failed.");
687 listen_fds[listen_fds_num].fd = fd;
688 strncpy (listen_fds[listen_fds_num].path, addr,
689 sizeof (listen_fds[listen_fds_num].path) - 1);
694 } /* }}} int open_listen_socket */
696 static int close_listen_sockets (void) /* {{{ */
700 for (i = 0; i < listen_fds_num; i++)
702 close (listen_fds[i].fd);
703 if (strncmp ("unix:", listen_fds[i].path, strlen ("unix:")) == 0)
704 unlink (listen_fds[i].path + strlen ("unix:"));
712 } /* }}} int close_listen_sockets */
714 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
716 struct pollfd *pollfds;
721 for (i = 0; i < config_listen_address_list_len; i++)
723 RRDD_LOG (LOG_DEBUG, "listen_thread_main: config_listen_address_list[%i] "
724 "= %s", i, config_listen_address_list[i]);
725 open_listen_socket (config_listen_address_list[i]);
728 if (config_listen_address_list_len < 1)
729 open_listen_socket (RRDD_SOCK_PATH);
731 if (listen_fds_num < 1)
733 RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
734 "could be opened. Sorry.");
738 pollfds_num = listen_fds_num;
739 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
742 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
745 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
747 while (do_shutdown == 0)
749 assert (pollfds_num == ((int) listen_fds_num));
750 for (i = 0; i < pollfds_num; i++)
752 pollfds[i].fd = listen_fds[i].fd;
753 pollfds[i].events = POLLIN | POLLPRI;
754 pollfds[i].revents = 0;
757 status = poll (pollfds, pollfds_num, /* timeout = */ -1);
763 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
768 for (i = 0; i < pollfds_num; i++)
771 struct sockaddr_storage client_sa;
772 socklen_t client_sa_size;
775 if (pollfds[i].revents == 0)
778 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
780 RRDD_LOG (LOG_ERR, "listen_thread_main: "
781 "poll(2) returned something unexpected for listen FD #%i.",
786 client_sd = (int *) malloc (sizeof (int));
787 if (client_sd == NULL)
789 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
793 client_sa_size = sizeof (client_sa);
794 *client_sd = accept (pollfds[i].fd,
795 (struct sockaddr *) &client_sa, &client_sa_size);
798 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
802 RRDD_LOG (LOG_DEBUG, "listen_thread_main: accept(2) returned fd #%i.",
805 status = pthread_create (&tid, /* attr = */ NULL, connection_thread_main,
806 /* args = */ (void *) client_sd);
809 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
815 RRDD_LOG (LOG_DEBUG, "listen_thread_main: pthread_create succeeded: "
817 *((unsigned long *) &tid));
818 } /* for (pollfds_num) */
819 } /* while (do_shutdown == 0) */
821 close_listen_sockets ();
823 pthread_mutex_lock (&connetion_threads_lock);
824 while (connetion_threads_num > 0)
828 wait_for = connetion_threads[0];
830 pthread_mutex_unlock (&connetion_threads_lock);
831 pthread_join (wait_for, /* retval = */ NULL);
832 pthread_mutex_lock (&connetion_threads_lock);
834 pthread_mutex_unlock (&connetion_threads_lock);
836 RRDD_LOG (LOG_DEBUG, "listen_thread_main: Exiting.");
839 } /* }}} void *listen_thread_main */
841 static int daemonize (void) /* {{{ */
849 fprintf (stderr, "daemonize: fork(2) failed.\n");
857 /* Change into the /tmp directory. */
860 /* Become session leader */
863 /* Open the first three file descriptors to /dev/null */
868 open ("/dev/null", O_RDWR);
875 memset (&sa, 0, sizeof (sa));
876 sa.sa_handler = sig_int_handler;
877 sigaction (SIGINT, &sa, NULL);
879 memset (&sa, 0, sizeof (sa));
880 sa.sa_handler = sig_term_handler;
881 sigaction (SIGINT, &sa, NULL);
883 memset (&sa, 0, sizeof (sa));
884 sa.sa_handler = SIG_IGN;
885 sigaction (SIGPIPE, &sa, NULL);
888 openlog ("rrdd", LOG_PID, LOG_DAEMON);
890 cache_tree = g_tree_new ((GCompareFunc) strcmp);
891 if (cache_tree == NULL)
893 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
897 memset (&queue_thread, 0, sizeof (queue_thread));
898 status = pthread_create (&queue_thread, /* attr = */ NULL,
899 queue_thread_main, /* args = */ NULL);
902 RRDD_LOG (LOG_ERR, "daemonize: pthread_create failed.");
907 } /* }}} int daemonize */
909 static int cleanup (void) /* {{{ */
911 RRDD_LOG (LOG_DEBUG, "cleanup ()");
915 RRDD_LOG (LOG_DEBUG, "cleanup: Joining queue_thread..");
916 pthread_cond_signal (&cache_cond);
917 pthread_join (queue_thread, /* return = */ NULL);
918 RRDD_LOG (LOG_DEBUG, "cleanup: done");
923 } /* }}} int cleanup */
925 static int read_options (int argc, char **argv) /* {{{ */
930 while ((option = getopt(argc, argv, "l:f:w:h?")) != -1)
938 temp = (char **) realloc (config_listen_address_list,
939 sizeof (char *) * (config_listen_address_list_len + 1));
942 fprintf (stderr, "read_options: realloc failed.\n");
945 config_listen_address_list = temp;
947 temp[config_listen_address_list_len] = strdup (optarg);
948 if (temp[config_listen_address_list_len] == NULL)
950 fprintf (stderr, "read_options: strdup failed.\n");
953 config_listen_address_list_len++;
961 temp = atoi (optarg);
963 config_flush_interval = temp;
966 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
976 temp = atoi (optarg);
978 config_write_interval = temp;
981 fprintf (stderr, "Invalid write interval: %s\n", optarg);
989 printf ("RRDd %s Copyright (C) 2008 Florian octo Forster\n"
991 "Usage: rrdd [options]\n"
993 "Valid options are:\n"
994 " -l <address> Socket address to listen to.\n"
995 " -w <seconds> Interval in which to write data.\n"
996 " -f <seconds> Interval in which to flush dead data.\n"
998 "For more information and a detailed description of all options "
1000 "to the rrdd(1) manual page.\n",
1004 } /* switch (option) */
1005 } /* while (getopt) */
1008 } /* }}} int read_options */
1010 int main (int argc, char **argv)
1014 status = read_options (argc, argv);
1022 status = daemonize ();
1025 struct sigaction sigchld;
1027 memset (&sigchld, 0, sizeof (sigchld));
1028 sigchld.sa_handler = SIG_IGN;
1029 sigaction (SIGCHLD, &sigchld, NULL);
1033 else if (status != 0)
1035 fprintf (stderr, "daemonize failed, exiting.\n");
1039 listen_thread_main (NULL);
1047 * vim: set sw=2 sts=2 ts=8 et fdm=marker :