2 * collectd - src/rrdd.c
3 * Copyright (C) 2008 Florian octo Forster
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the
7 * Free Software Foundation; only version 2 of the License is applicable.
9 * This program is distributed in the hope that it will be useful, but
10 * WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * General Public License for more details.
14 * You should have received a copy of the GNU General Public License along
15 * with this program; if not, write to the Free Software Foundation, Inc.,
16 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 * Florian octo Forster <octo at verplant.org>
25 #include <glib-2.0/glib.h>
29 # define RRDD_LOG(severity, ...) do { fprintf (stderr, __VA_ARGS__); fprintf (stderr, "\n"); } while (0)
31 # define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
37 struct listen_socket_s
40 char path[PATH_MAX + 1];
42 typedef struct listen_socket_s listen_socket_t;
45 typedef struct cache_item_s cache_item_t;
51 time_t last_flush_time;
52 #define CI_FLAGS_IN_TREE 0x01
53 #define CI_FLAGS_IN_QUEUE 0x02
62 static listen_socket_t *listen_fds = NULL;
63 static size_t listen_fds_num = 0;
65 static int do_shutdown = 0;
67 static pthread_t queue_thread;
69 static pthread_t *connetion_threads = NULL;
70 static pthread_mutex_t connetion_threads_lock = PTHREAD_MUTEX_INITIALIZER;
71 static int connetion_threads_num = 0;
74 static GTree *cache_tree = NULL;
75 static cache_item_t *cache_queue_head = NULL;
76 static cache_item_t *cache_queue_tail = NULL;
77 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
78 static pthread_cond_t cache_cond = PTHREAD_COND_INITIALIZER;
80 static int config_write_interval = 300;
81 static int config_flush_interval = 3600;
83 static char **config_listen_address_list = NULL;
84 static int config_listen_address_list_len = 0;
89 static void sig_int_handler (int signal) /* {{{ */
92 } /* }}} void sig_int_handler */
94 static void sig_term_handler (int signal) /* {{{ */
97 } /* }}} void sig_term_handler */
99 static void *queue_thread_main (void *args) /* {{{ */
101 pthread_mutex_lock (&cache_lock);
102 while ((do_shutdown == 0) || (cache_queue_head != NULL))
112 if (cache_queue_head == NULL)
113 pthread_cond_wait (&cache_cond, &cache_lock);
115 if (cache_queue_head == NULL)
118 ci = cache_queue_head;
120 /* copy the relevant parts */
121 file = strdup (ci->file);
124 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
129 values_num = ci->values_num;
134 ci->last_flush_time = time (NULL);
135 ci->flags &= ~(CI_FLAGS_IN_QUEUE);
137 cache_queue_head = ci->next;
138 if (cache_queue_head == NULL)
139 cache_queue_tail = NULL;
142 pthread_mutex_unlock (&cache_lock);
144 RRDD_LOG (LOG_DEBUG, "queue_thread_main: rrd_update (%s, %i, %p)",
145 file, values_num, (void *) values);
147 status = rrd_update_r (file, NULL, values_num, (void *) values);
150 RRDD_LOG (LOG_ERR, "queue_thread_main: "
151 "rrd_update_r failed with status %i.",
156 for (i = 0; i < values_num; i++)
159 pthread_mutex_lock (&cache_lock);
160 } /* while (do_shutdown == 0) */
161 pthread_mutex_unlock (&cache_lock);
163 RRDD_LOG (LOG_DEBUG, "queue_thread_main: Exiting.");
166 } /* }}} void *queue_thread_main */
168 static int handle_request_update (int fd, /* {{{ */
169 char *buffer, int buffer_size)
184 RRDD_LOG (LOG_DEBUG, "handle_request_update (%i, %p, %i)",
185 fd, (void *) buffer, buffer_size);
190 buffer_ptr += strlen (file) + 1;
192 pthread_mutex_lock (&cache_lock);
194 ci = g_tree_lookup (cache_tree, file);
197 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
200 pthread_mutex_unlock (&cache_lock);
201 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
204 memset (ci, 0, sizeof (cache_item_t));
206 ci->file = strdup (file);
207 if (ci->file == NULL)
209 pthread_mutex_unlock (&cache_lock);
210 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
217 ci->last_flush_time = now;
218 ci->flags = CI_FLAGS_IN_TREE;
220 g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
222 RRDD_LOG (LOG_DEBUG, "handle_request_update: Created new tree node %s.",
227 while (*buffer_ptr != 0)
232 buffer_ptr += strlen (value) + 1;
234 temp = (char **) realloc (ci->values,
235 sizeof (char *) * (ci->values_num + 1));
238 RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
243 ci->values[ci->values_num] = strdup (value);
244 if (ci->values[ci->values_num] == NULL)
246 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
254 if (((now - ci->last_flush_time) >= config_write_interval)
255 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0))
257 RRDD_LOG (LOG_DEBUG, "handle_request_update: Adding %s to the update queue.",
260 assert (ci->next == NULL);
262 if (cache_queue_tail == NULL)
263 cache_queue_head = ci;
265 cache_queue_tail->next = ci;
266 cache_queue_tail = ci;
268 pthread_cond_signal (&cache_cond);
271 pthread_mutex_unlock (&cache_lock);
273 snprintf (answer, sizeof (answer), "0 Enqueued %i value(s)\n", values_num);
274 answer[sizeof (answer) - 1] = 0;
276 status = write (fd, answer, sizeof (answer));
280 RRDD_LOG (LOG_INFO, "handle_request_update: write(2) returned an error.");
285 } /* }}} int handle_request_update */
287 static int handle_request (int fd) /* {{{ */
292 RRDD_LOG (LOG_DEBUG, "handle_request (%i)", fd);
294 buffer_size = read (fd, buffer, sizeof (buffer));
297 RRDD_LOG (LOG_ERR, "handle_request: read(2) failed.");
300 assert (((size_t) buffer_size) <= sizeof (buffer));
302 if ((buffer[buffer_size - 2] != 0)
303 || (buffer[buffer_size - 1] != 0))
305 RRDD_LOG (LOG_INFO, "handle_request: malformed request.");
309 /* fields in the buffer a separated by null bytes. */
310 if (strcmp (buffer, "update") == 0)
312 int offset = strlen ("update") + 1;
313 return (handle_request_update (fd, buffer + offset,
314 buffer_size - offset));
318 RRDD_LOG (LOG_INFO, "handle_request: unknown command: %s.", buffer);
321 } /* }}} int handle_request */
323 static void *connection_thread_main (void *args) /* {{{ */
329 fd = *((int *) args);
331 RRDD_LOG (LOG_DEBUG, "connection_thread_main: Adding myself to "
332 "connetion_threads[]..");
333 pthread_mutex_lock (&connetion_threads_lock);
337 temp = (pthread_t *) realloc (connetion_threads,
338 sizeof (pthread_t) * (connetion_threads_num + 1));
341 RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
345 connetion_threads = temp;
346 connetion_threads[connetion_threads_num] = pthread_self ();
347 connetion_threads_num++;
350 pthread_mutex_unlock (&connetion_threads_lock);
351 RRDD_LOG (LOG_DEBUG, "connection_thread_main: done");
353 while (do_shutdown == 0)
355 struct pollfd pollfd;
359 pollfd.events = POLLIN | POLLPRI;
362 status = poll (&pollfd, 1, /* timeout = */ 500);
363 if (status == 0) /* timeout */
365 else if (status < 0) /* error */
370 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
374 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
376 RRDD_LOG (LOG_DEBUG, "connection_thread_main: "
377 "poll(2) returned POLLHUP.");
381 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
383 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
384 "poll(2) returned something unexpected: %#04hx",
390 status = handle_request (fd);
398 self = pthread_self ();
399 /* Remove this thread from the connection threads list */
400 pthread_mutex_lock (&connetion_threads_lock);
401 /* Find out own index in the array */
402 for (i = 0; i < connetion_threads_num; i++)
403 if (pthread_equal (connetion_threads[i], self) != 0)
405 assert (i < connetion_threads_num);
407 /* Move the trailing threads forward. */
408 if (i < (connetion_threads_num - 1))
410 memmove (connetion_threads + i,
411 connetion_threads + i + 1,
412 sizeof (pthread_t) * (connetion_threads_num - i - 1));
415 connetion_threads_num--;
416 pthread_mutex_unlock (&connetion_threads_lock);
420 } /* }}} void *connection_thread_main */
422 static int open_listen_socket_unix (const char *path) /* {{{ */
425 struct sockaddr_un sa;
426 listen_socket_t *temp;
429 temp = (listen_socket_t *) realloc (listen_fds,
430 sizeof (listen_fds[0]) * (listen_fds_num + 1));
433 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
437 memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
439 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
442 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
446 memset (&sa, 0, sizeof (sa));
447 sa.sun_family = AF_UNIX;
448 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
450 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
453 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
459 status = listen (fd, /* backlog = */ 10);
462 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
468 listen_fds[listen_fds_num].fd = fd;
469 snprintf (listen_fds[listen_fds_num].path,
470 sizeof (listen_fds[listen_fds_num].path) - 1,
475 } /* }}} int open_listen_socket_unix */
477 static int open_listen_socket (const char *addr) /* {{{ */
479 struct addrinfo ai_hints;
480 struct addrinfo *ai_res;
481 struct addrinfo *ai_ptr;
484 assert (addr != NULL);
486 if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
487 return (open_listen_socket_unix (addr + strlen ("unix:")));
488 else if (addr[0] == '/')
489 return (open_listen_socket_unix (addr));
491 memset (&ai_hints, 0, sizeof (ai_hints));
492 ai_hints.ai_flags = 0;
494 ai_hints.ai_flags |= AI_ADDRCONFIG;
496 ai_hints.ai_family = AF_UNSPEC;
497 ai_hints.ai_socktype = SOCK_STREAM;
500 status = getaddrinfo (addr, DEFAULT_PORT, &ai_hints, &ai_res);
503 RRDD_LOG (LOG_ERR, "open_listen_socket: getaddrinfo(%s) failed: "
504 "%s", addr, gai_strerror (status));
508 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
511 listen_socket_t *temp;
513 temp = (listen_socket_t *) realloc (listen_fds,
514 sizeof (listen_fds[0]) * (listen_fds_num + 1));
517 RRDD_LOG (LOG_ERR, "open_listen_socket: realloc failed.");
521 memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
523 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
526 RRDD_LOG (LOG_ERR, "open_listen_socket: socket(2) failed.");
530 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
533 RRDD_LOG (LOG_ERR, "open_listen_socket: bind(2) failed.");
538 status = listen (fd, /* backlog = */ 10);
541 RRDD_LOG (LOG_ERR, "open_listen_socket: listen(2) failed.");
546 listen_fds[listen_fds_num].fd = fd;
547 strncpy (listen_fds[listen_fds_num].path, addr,
548 sizeof (listen_fds[listen_fds_num].path) - 1);
553 } /* }}} int open_listen_socket */
555 static int close_listen_sockets (void) /* {{{ */
559 for (i = 0; i < listen_fds_num; i++)
561 close (listen_fds[i].fd);
562 if (strncmp ("unix:", listen_fds[i].path, strlen ("unix:")) == 0)
563 unlink (listen_fds[i].path + strlen ("unix:"));
571 } /* }}} int close_listen_sockets */
573 static void *listen_thread_main (void *args) /* {{{ */
575 struct pollfd *pollfds;
580 for (i = 0; i < config_listen_address_list_len; i++)
582 RRDD_LOG (LOG_DEBUG, "listen_thread_main: config_listen_address_list[%i] "
583 "= %s", i, config_listen_address_list[i]);
584 open_listen_socket (config_listen_address_list[i]);
587 if (config_listen_address_list_len < 1)
588 open_listen_socket (RRDD_SOCK_PATH);
590 if (listen_fds_num < 1)
592 RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
593 "could be opened. Sorry.");
597 pollfds_num = listen_fds_num;
598 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
601 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
604 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
606 while (do_shutdown == 0)
608 assert (pollfds_num == listen_fds_num);
609 for (i = 0; i < pollfds_num; i++)
611 pollfds[i].fd = listen_fds[i].fd;
612 pollfds[i].events = POLLIN | POLLPRI;
613 pollfds[i].revents = 0;
616 status = poll (pollfds, pollfds_num, /* timeout = */ -1);
622 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
627 for (i = 0; i < pollfds_num; i++)
630 struct sockaddr_storage client_sa;
631 socklen_t client_sa_size;
634 if (pollfds[i].revents == 0)
637 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
639 RRDD_LOG (LOG_ERR, "listen_thread_main: "
640 "poll(2) returned something unexpected for listen FD #%i.",
645 client_sd = (int *) malloc (sizeof (int));
646 if (client_sd == NULL)
648 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
652 client_sa_size = sizeof (client_sa);
653 *client_sd = accept (pollfds[i].fd,
654 (struct sockaddr *) &client_sa, &client_sa_size);
657 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
661 RRDD_LOG (LOG_DEBUG, "listen_thread_main: accept(2) returned fd #%i.",
664 status = pthread_create (&tid, /* attr = */ NULL, connection_thread_main,
665 /* args = */ (void *) client_sd);
668 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
674 RRDD_LOG (LOG_DEBUG, "listen_thread_main: pthread_create succeeded: "
676 *((unsigned long *) &tid));
677 } /* for (pollfds_num) */
678 } /* while (do_shutdown == 0) */
680 close_listen_sockets ();
682 pthread_mutex_lock (&connetion_threads_lock);
683 while (connetion_threads_num > 0)
687 wait_for = connetion_threads[0];
689 pthread_mutex_unlock (&connetion_threads_lock);
690 pthread_join (wait_for, /* retval = */ NULL);
691 pthread_mutex_lock (&connetion_threads_lock);
693 pthread_mutex_unlock (&connetion_threads_lock);
695 RRDD_LOG (LOG_DEBUG, "listen_thread_main: Exiting.");
698 } /* }}} void *listen_thread_main */
700 static int daemonize (void) /* {{{ */
711 fprintf (stderr, "daemonize: fork(2) failed.\n");
719 /* Change into the /tmp directory. */
722 /* Become session leader */
725 /* Open the first three file descriptors to /dev/null */
730 open ("/dev/null", O_RDWR);
733 #endif /* RRDD_DEBUG */
738 memset (&sa, 0, sizeof (sa));
739 sa.sa_handler = sig_int_handler;
740 sigaction (SIGINT, &sa, NULL);
742 memset (&sa, 0, sizeof (sa));
743 sa.sa_handler = sig_term_handler;
744 sigaction (SIGINT, &sa, NULL);
746 memset (&sa, 0, sizeof (sa));
747 sa.sa_handler = SIG_IGN;
748 sigaction (SIGPIPE, &sa, NULL);
751 openlog ("rrdd", LOG_PID, LOG_DAEMON);
753 cache_tree = g_tree_new ((GCompareFunc) strcmp);
754 if (cache_tree == NULL)
756 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
760 memset (&queue_thread, 0, sizeof (queue_thread));
761 status = pthread_create (&queue_thread, /* attr = */ NULL,
762 queue_thread_main, /* args = */ NULL);
765 RRDD_LOG (LOG_ERR, "daemonize: pthread_create failed.");
770 } /* }}} int daemonize */
772 static int cleanup (void) /* {{{ */
774 RRDD_LOG (LOG_DEBUG, "cleanup ()");
778 RRDD_LOG (LOG_DEBUG, "cleanup: Joining queue_thread..");
779 pthread_cond_signal (&cache_cond);
780 pthread_join (queue_thread, /* return = */ NULL);
781 RRDD_LOG (LOG_DEBUG, "cleanup: done");
786 } /* }}} int cleanup */
788 static int read_options (int argc, char **argv) /* {{{ */
793 while ((option = getopt(argc, argv, "l:f:w:h?")) != -1)
801 temp = (char **) realloc (config_listen_address_list,
802 sizeof (char *) * (config_listen_address_list_len + 1));
805 fprintf (stderr, "read_options: realloc failed.\n");
808 config_listen_address_list = temp;
810 temp[config_listen_address_list_len] = strdup (optarg);
811 if (temp[config_listen_address_list_len] == NULL)
813 fprintf (stderr, "read_options: strdup failed.\n");
816 config_listen_address_list_len++;
824 temp = atoi (optarg);
826 config_flush_interval = temp;
829 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
839 temp = atoi (optarg);
841 config_write_interval = temp;
844 fprintf (stderr, "Invalid write interval: %s\n", optarg);
852 printf ("RRDd %s Copyright (C) 2008 Florian octo Forster\n"
854 "Usage: rrdd [options]\n"
856 "Valid options are:\n"
857 " -l <address> Socket address to listen to.\n"
858 " -w <seconds> Interval in which to write data.\n"
859 " -f <seconds> Interval in which to flush dead data.\n"
861 "For more information and a detailed description of all options "
863 "to the rrdd(1) manual page.\n",
867 } /* switch (option) */
868 } /* while (getopt) */
871 } /* }}} int read_options */
873 int main (int argc, char **argv)
877 status = read_options (argc, argv);
885 status = daemonize ();
888 struct sigaction sigchld;
890 memset (&sigchld, 0, sizeof (sigchld));
891 sigchld.sa_handler = SIG_IGN;
892 sigaction (SIGCHLD, &sigchld, NULL);
896 else if (status != 0)
898 fprintf (stderr, "daemonize failed, exiting.\n");
902 listen_thread_main (NULL);
910 * vim: set sw=2 sts=2 ts=8 et fdm=marker :