src/rrd_daemon.c: Add caching daemon.
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; only version 2 of the License is applicable.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
17  *
18  * Authors:
19  *   Florian octo Forster <octo at verplant.org>
20  **/
21
22 /*
23  * First tell the compiler to stick to the C99 and POSIX standards as close as
24  * possible.
25  */
26 #ifndef __STRICT_ANSI__ /* {{{ */
27 # define __STRICT_ANSI__
28 #endif
29
30 #ifndef _ISOC99_SOURCE
31 # define _ISOC99_SOURCE
32 #endif
33
34 #ifdef _POSIX_C_SOURCE
35 # undef _POSIX_C_SOURCE
36 #endif
37 #define _POSIX_C_SOURCE 200112L
38
39 /* Single UNIX needed for strdup. */
40 #ifdef _XOPEN_SOURCE
41 # undef _XOPEN_SOURCE
42 #endif
43 #define _XOPEN_SOURCE 500
44
45 #ifndef _REENTRANT
46 # define _REENTRANT
47 #endif
48
49 #ifndef _THREAD_SAFE
50 # define _THREAD_SAFE
51 #endif
52
53 #ifdef _GNU_SOURCE
54 # undef _GNU_SOURCE
55 #endif
56 /* }}} */
57
58 /*
59  * Now for some includes..
60  */
61 #include "rrd.h" /* {{{ */
62 #include "rrd_client.h"
63
64 #include <stdlib.h>
65 #include <stdint.h>
66 #include <stdio.h>
67 #include <unistd.h>
68 #include <string.h>
69
70 #include <sys/types.h>
71 #include <sys/stat.h>
72 #include <fcntl.h>
73 #include <signal.h>
74 #include <sys/socket.h>
75 #include <sys/un.h>
76 #include <netdb.h>
77 #include <poll.h>
78 #include <syslog.h>
79 #include <pthread.h>
80 #include <errno.h>
81 #include <assert.h>
82 #include <sys/time.h>
83 #include <time.h>
84
85 #include <glib-2.0/glib.h>
86 /* }}} */
87
88 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
89
90 #ifndef __GNUC__
91 # define __attribute__(x) /**/
92 #endif
93
94 #if 0
95 /* FIXME: I don't want to declare strdup myself! */
96 extern char *strdup (const char *s);
97 #endif
98
99 /*
100  * Types
101  */
102 struct listen_socket_s
103 {
104   int fd;
105   char path[PATH_MAX + 1];
106 };
107 typedef struct listen_socket_s listen_socket_t;
108
109 struct cache_item_s;
110 typedef struct cache_item_s cache_item_t;
111 struct cache_item_s
112 {
113   char *file;
114   char **values;
115   int values_num;
116   time_t last_flush_time;
117 #define CI_FLAGS_IN_TREE  0x01
118 #define CI_FLAGS_IN_QUEUE 0x02
119   int flags;
120
121   cache_item_t *next;
122 };
123
124 /*
125  * Variables
126  */
127 static listen_socket_t *listen_fds = NULL;
128 static size_t listen_fds_num = 0;
129
130 static int do_shutdown = 0;
131
132 static pthread_t queue_thread;
133
134 static pthread_t *connetion_threads = NULL;
135 static pthread_mutex_t connetion_threads_lock = PTHREAD_MUTEX_INITIALIZER;
136 static int connetion_threads_num = 0;
137
138 /* Cache stuff */
139 static GTree          *cache_tree = NULL;
140 static cache_item_t   *cache_queue_head = NULL;
141 static cache_item_t   *cache_queue_tail = NULL;
142 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
143 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
144
145 static int config_write_interval = 300;
146 static int config_flush_interval = 3600;
147
148 static char **config_listen_address_list = NULL;
149 static int config_listen_address_list_len = 0;
150
151 /* 
152  * Functions
153  */
154 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
155 {
156   do_shutdown++;
157 } /* }}} void sig_int_handler */
158
159 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
160 {
161   do_shutdown++;
162 } /* }}} void sig_term_handler */
163
164 /*
165  * enqueue_cache_item:
166  * `cache_lock' must be acquired before calling this function!
167  */
168 static int enqueue_cache_item (cache_item_t *ci) /* {{{ */
169 {
170   RRDD_LOG (LOG_DEBUG, "handle_request_update: Adding %s to the update queue.",
171       ci->file);
172
173   if (ci == NULL)
174     return (-1);
175
176   if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
177     return (-1);
178
179   assert (ci->next == NULL);
180
181   if (cache_queue_tail == NULL)
182     cache_queue_head = ci;
183   else
184     cache_queue_tail->next = ci;
185   cache_queue_tail = ci;
186
187   return (0);
188 } /* }}} int enqueue_cache_item */
189
190 /*
191  * tree_callback_flush:
192  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
193  * while this is in progress.
194  */
195 static gboolean tree_callback_flush (gpointer key /* {{{ */
196     __attribute__((unused)), gpointer value, gpointer data)
197 {
198   cache_item_t *ci;
199   time_t now;
200
201   key = NULL; /* make compiler happy */
202
203   ci = (cache_item_t *) value;
204   now = *((time_t *) data);
205
206   if (((now - ci->last_flush_time) >= config_write_interval)
207       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0))
208     enqueue_cache_item (ci);
209
210   return (TRUE);
211 } /* }}} gboolean tree_callback_flush */
212
213 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
214 {
215   struct timeval now;
216   struct timespec next_flush;
217
218   gettimeofday (&now, NULL);
219   next_flush.tv_sec = now.tv_sec + config_flush_interval;
220   next_flush.tv_nsec = 1000 * now.tv_usec;
221
222   pthread_mutex_lock (&cache_lock);
223   while ((do_shutdown == 0) || (cache_queue_head != NULL))
224   {
225     cache_item_t *ci;
226     char *file;
227     char **values;
228     int values_num;
229     int status;
230     int i;
231
232     /* First, check if it's time to do the cache flush. */
233     gettimeofday (&now, NULL);
234     if ((now.tv_sec > next_flush.tv_sec)
235         || ((now.tv_sec == next_flush.tv_sec)
236           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
237     {
238       time_t time_now;
239
240       /* Pass the current time as user data so that we don't need to call
241        * `time' for each node. */
242       time_now = time (NULL);
243
244       g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &time_now);
245
246       /* Determine the time of the next cache flush. */
247       while (next_flush.tv_sec < now.tv_sec)
248         next_flush.tv_sec += config_flush_interval;
249     }
250
251     /* Now, check if there's something to store away. If not, wait until
252      * something comes in or it's time to do the cache flush. */
253     if (cache_queue_head == NULL)
254     {
255       struct timespec timeout;
256
257       timeout.tv_sec = next_flush.tv_sec - now.tv_sec;
258       if (next_flush.tv_nsec < (1000 * now.tv_usec))
259       {
260         timeout.tv_sec--;
261         timeout.tv_nsec = 1000000000 + next_flush.tv_nsec
262           - (1000 * now.tv_usec);
263       }
264       else
265       {
266         timeout.tv_nsec = next_flush.tv_nsec - (1000 * now.tv_usec);
267       }
268
269       pthread_cond_timedwait (&cache_cond, &cache_lock, &timeout);
270     }
271
272     /* Check if a value has arrived. This may be NULL if we timed out or there
273      * was an interrupt such as a signal. */
274     if (cache_queue_head == NULL)
275       continue;
276
277     ci = cache_queue_head;
278
279     /* copy the relevant parts */
280     file = strdup (ci->file);
281     if (file == NULL)
282     {
283       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
284       continue;
285     }
286
287     values = ci->values;
288     values_num = ci->values_num;
289
290     ci->values = NULL;
291     ci->values_num = 0;
292
293     ci->last_flush_time = time (NULL);
294     ci->flags &= ~(CI_FLAGS_IN_QUEUE);
295
296     cache_queue_head = ci->next;
297     if (cache_queue_head == NULL)
298       cache_queue_tail = NULL;
299     ci->next = NULL;
300
301     pthread_mutex_unlock (&cache_lock);
302
303     RRDD_LOG (LOG_DEBUG, "queue_thread_main: rrd_update (%s, %i, %p)",
304         file, values_num, (void *) values);
305
306     status = rrd_update_r (file, NULL, values_num, (void *) values);
307     if (status != 0)
308     {
309       RRDD_LOG (LOG_ERR, "queue_thread_main: "
310           "rrd_update_r failed with status %i.",
311           status);
312     }
313
314     free (file);
315     for (i = 0; i < values_num; i++)
316       free (values[i]);
317
318     pthread_mutex_lock (&cache_lock);
319   } /* while (do_shutdown == 0) */
320   pthread_mutex_unlock (&cache_lock);
321
322   RRDD_LOG (LOG_DEBUG, "queue_thread_main: Exiting.");
323
324   return (NULL);
325 } /* }}} void *queue_thread_main */
326
327 static int handle_request_update (int fd, /* {{{ */
328     char *buffer, int buffer_size)
329 {
330   char *file;
331   char *value;
332   char *buffer_ptr;
333   int values_num = 0;
334   int status;
335
336   time_t now;
337
338   cache_item_t *ci;
339   char answer[4096];
340
341   now = time (NULL);
342
343   RRDD_LOG (LOG_DEBUG, "handle_request_update (%i, %p, %i)",
344       fd, (void *) buffer, buffer_size);
345
346   buffer_ptr = buffer;
347
348   file = buffer_ptr;
349   buffer_ptr += strlen (file) + 1;
350
351   pthread_mutex_lock (&cache_lock);
352
353   ci = g_tree_lookup (cache_tree, file);
354   if (ci == NULL)
355   {
356     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
357     if (ci == NULL)
358     {
359       pthread_mutex_unlock (&cache_lock);
360       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
361       return (-1);
362     }
363     memset (ci, 0, sizeof (cache_item_t));
364
365     ci->file = strdup (file);
366     if (ci->file == NULL)
367     {
368       pthread_mutex_unlock (&cache_lock);
369       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
370       free (ci);
371       return (-1);
372     }
373
374     ci->values = NULL;
375     ci->values_num = 0;
376     ci->last_flush_time = now;
377     ci->flags = CI_FLAGS_IN_TREE;
378
379     g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
380
381     RRDD_LOG (LOG_DEBUG, "handle_request_update: Created new tree node %s.",
382         ci->file);
383   }
384   assert (ci != NULL);
385
386   while (*buffer_ptr != 0)
387   {
388     char **temp;
389
390     value = buffer_ptr;
391     buffer_ptr += strlen (value) + 1;
392
393     temp = (char **) realloc (ci->values,
394         sizeof (char *) * (ci->values_num + 1));
395     if (temp == NULL)
396     {
397       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
398       continue;
399     }
400     ci->values = temp;
401
402     ci->values[ci->values_num] = strdup (value);
403     if (ci->values[ci->values_num] == NULL)
404     {
405       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
406       continue;
407     }
408     ci->values_num++;
409
410     values_num++;
411   }
412
413   if (((now - ci->last_flush_time) >= config_write_interval)
414       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0))
415   {
416     enqueue_cache_item (ci);
417     pthread_cond_signal (&cache_cond);
418   }
419
420   pthread_mutex_unlock (&cache_lock);
421
422   snprintf (answer, sizeof (answer), "0 Enqueued %i value(s)\n", values_num);
423   answer[sizeof (answer) - 1] = 0;
424
425   status = write (fd, answer, sizeof (answer));
426   if (status < 0)
427   {
428     status = errno;
429     RRDD_LOG (LOG_INFO, "handle_request_update: write(2) returned an error.");
430     return (status);
431   }
432
433   return (0);
434 } /* }}} int handle_request_update */
435
436 static int handle_request (int fd) /* {{{ */
437 {
438   char buffer[4096];
439   int buffer_size;
440
441   RRDD_LOG (LOG_DEBUG, "handle_request (%i)", fd);
442
443   buffer_size = read (fd, buffer, sizeof (buffer));
444   if (buffer_size < 1)
445   {
446     RRDD_LOG (LOG_ERR, "handle_request: read(2) failed.");
447     return (-1);
448   }
449   assert (((size_t) buffer_size) <= sizeof (buffer));
450
451   if ((buffer[buffer_size - 2] != 0)
452       || (buffer[buffer_size - 1] != 0))
453   {
454     RRDD_LOG (LOG_INFO, "handle_request: malformed request.");
455     return (-1);
456   }
457
458   /* fields in the buffer a separated by null bytes. */
459   if (strcmp (buffer, "update") == 0)
460   {
461     int offset = strlen ("update") + 1;
462     return (handle_request_update (fd, buffer + offset,
463           buffer_size - offset));
464   }
465   else
466   {
467     RRDD_LOG (LOG_INFO, "handle_request: unknown command: %s.", buffer);
468     return (-1);
469   }
470 } /* }}} int handle_request */
471
472 static void *connection_thread_main (void *args /* {{{ */
473     __attribute__((unused)))
474 {
475   pthread_t self;
476   int i;
477   int fd;
478   
479   fd = *((int *) args);
480
481   RRDD_LOG (LOG_DEBUG, "connection_thread_main: Adding myself to "
482       "connetion_threads[]..");
483   pthread_mutex_lock (&connetion_threads_lock);
484   {
485     pthread_t *temp;
486
487     temp = (pthread_t *) realloc (connetion_threads,
488         sizeof (pthread_t) * (connetion_threads_num + 1));
489     if (temp == NULL)
490     {
491       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
492     }
493     else
494     {
495       connetion_threads = temp;
496       connetion_threads[connetion_threads_num] = pthread_self ();
497       connetion_threads_num++;
498     }
499   }
500   pthread_mutex_unlock (&connetion_threads_lock);
501   RRDD_LOG (LOG_DEBUG, "connection_thread_main: done");
502
503   while (do_shutdown == 0)
504   {
505     struct pollfd pollfd;
506     int status;
507
508     pollfd.fd = fd;
509     pollfd.events = POLLIN | POLLPRI;
510     pollfd.revents = 0;
511
512     status = poll (&pollfd, 1, /* timeout = */ 500);
513     if (status == 0) /* timeout */
514       continue;
515     else if (status < 0) /* error */
516     {
517       status = errno;
518       if (status == EINTR)
519         continue;
520       RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
521       continue;
522     }
523
524     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
525     {
526       RRDD_LOG (LOG_DEBUG, "connection_thread_main: "
527           "poll(2) returned POLLHUP.");
528       close (fd);
529       break;
530     }
531     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
532     {
533       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
534           "poll(2) returned something unexpected: %#04hx",
535           pollfd.revents);
536       close (fd);
537       break;
538     }
539
540     status = handle_request (fd);
541     if (status != 0)
542     {
543       close (fd);
544       break;
545     }
546   }
547
548   self = pthread_self ();
549   /* Remove this thread from the connection threads list */
550   pthread_mutex_lock (&connetion_threads_lock);
551   /* Find out own index in the array */
552   for (i = 0; i < connetion_threads_num; i++)
553     if (pthread_equal (connetion_threads[i], self) != 0)
554       break;
555   assert (i < connetion_threads_num);
556
557   /* Move the trailing threads forward. */
558   if (i < (connetion_threads_num - 1))
559   {
560     memmove (connetion_threads + i,
561         connetion_threads + i + 1,
562         sizeof (pthread_t) * (connetion_threads_num - i - 1));
563   }
564
565   connetion_threads_num--;
566   pthread_mutex_unlock (&connetion_threads_lock);
567
568   free (args);
569   return (NULL);
570 } /* }}} void *connection_thread_main */
571
572 static int open_listen_socket_unix (const char *path) /* {{{ */
573 {
574   int fd;
575   struct sockaddr_un sa;
576   listen_socket_t *temp;
577   int status;
578
579   temp = (listen_socket_t *) realloc (listen_fds,
580       sizeof (listen_fds[0]) * (listen_fds_num + 1));
581   if (temp == NULL)
582   {
583     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
584     return (-1);
585   }
586   listen_fds = temp;
587   memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
588
589   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
590   if (fd < 0)
591   {
592     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
593     return (-1);
594   }
595
596   memset (&sa, 0, sizeof (sa));
597   sa.sun_family = AF_UNIX;
598   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
599
600   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
601   if (status != 0)
602   {
603     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
604     close (fd);
605     unlink (path);
606     return (-1);
607   }
608
609   status = listen (fd, /* backlog = */ 10);
610   if (status != 0)
611   {
612     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
613     close (fd);
614     unlink (path);
615     return (-1);
616   }
617   
618   listen_fds[listen_fds_num].fd = fd;
619   snprintf (listen_fds[listen_fds_num].path,
620       sizeof (listen_fds[listen_fds_num].path) - 1,
621       "unix:%s", path);
622   listen_fds_num++;
623
624   return (0);
625 } /* }}} int open_listen_socket_unix */
626
627 static int open_listen_socket (const char *addr) /* {{{ */
628 {
629   struct addrinfo ai_hints;
630   struct addrinfo *ai_res;
631   struct addrinfo *ai_ptr;
632   int status;
633
634   assert (addr != NULL);
635
636   if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
637     return (open_listen_socket_unix (addr + strlen ("unix:")));
638   else if (addr[0] == '/')
639     return (open_listen_socket_unix (addr));
640
641   memset (&ai_hints, 0, sizeof (ai_hints));
642   ai_hints.ai_flags = 0;
643 #ifdef AI_ADDRCONFIG
644   ai_hints.ai_flags |= AI_ADDRCONFIG;
645 #endif
646   ai_hints.ai_family = AF_UNSPEC;
647   ai_hints.ai_socktype = SOCK_STREAM;
648
649   ai_res = NULL;
650   status = getaddrinfo (addr, DEFAULT_PORT, &ai_hints, &ai_res);
651   if (status != 0)
652   {
653     RRDD_LOG (LOG_ERR, "open_listen_socket: getaddrinfo(%s) failed: "
654         "%s", addr, gai_strerror (status));
655     return (-1);
656   }
657
658   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
659   {
660     int fd;
661     listen_socket_t *temp;
662
663     temp = (listen_socket_t *) realloc (listen_fds,
664         sizeof (listen_fds[0]) * (listen_fds_num + 1));
665     if (temp == NULL)
666     {
667       RRDD_LOG (LOG_ERR, "open_listen_socket: realloc failed.");
668       continue;
669     }
670     listen_fds = temp;
671     memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
672
673     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
674     if (fd < 0)
675     {
676       RRDD_LOG (LOG_ERR, "open_listen_socket: socket(2) failed.");
677       continue;
678     }
679
680     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
681     if (status != 0)
682     {
683       RRDD_LOG (LOG_ERR, "open_listen_socket: bind(2) failed.");
684       close (fd);
685       continue;
686     }
687
688     status = listen (fd, /* backlog = */ 10);
689     if (status != 0)
690     {
691       RRDD_LOG (LOG_ERR, "open_listen_socket: listen(2) failed.");
692       close (fd);
693       return (-1);
694     }
695
696     listen_fds[listen_fds_num].fd = fd;
697     strncpy (listen_fds[listen_fds_num].path, addr,
698         sizeof (listen_fds[listen_fds_num].path) - 1);
699     listen_fds_num++;
700   } /* for (ai_ptr) */
701
702   return (0);
703 } /* }}} int open_listen_socket */
704
705 static int close_listen_sockets (void) /* {{{ */
706 {
707   size_t i;
708
709   for (i = 0; i < listen_fds_num; i++)
710   {
711     close (listen_fds[i].fd);
712     if (strncmp ("unix:", listen_fds[i].path, strlen ("unix:")) == 0)
713       unlink (listen_fds[i].path + strlen ("unix:"));
714   }
715
716   free (listen_fds);
717   listen_fds = NULL;
718   listen_fds_num = 0;
719
720   return (0);
721 } /* }}} int close_listen_sockets */
722
723 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
724 {
725   struct pollfd *pollfds;
726   int pollfds_num;
727   int status;
728   int i;
729
730   for (i = 0; i < config_listen_address_list_len; i++)
731   {
732     RRDD_LOG (LOG_DEBUG, "listen_thread_main: config_listen_address_list[%i] "
733         "= %s", i, config_listen_address_list[i]);
734     open_listen_socket (config_listen_address_list[i]);
735   }
736
737   if (config_listen_address_list_len < 1)
738     open_listen_socket (RRDD_SOCK_PATH);
739
740   if (listen_fds_num < 1)
741   {
742     RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
743         "could be opened. Sorry.");
744     return (NULL);
745   }
746
747   pollfds_num = listen_fds_num;
748   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
749   if (pollfds == NULL)
750   {
751     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
752     return (NULL);
753   }
754   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
755
756   while (do_shutdown == 0)
757   {
758     assert (pollfds_num == ((int) listen_fds_num));
759     for (i = 0; i < pollfds_num; i++)
760     {
761       pollfds[i].fd = listen_fds[i].fd;
762       pollfds[i].events = POLLIN | POLLPRI;
763       pollfds[i].revents = 0;
764     }
765
766     status = poll (pollfds, pollfds_num, /* timeout = */ -1);
767     if (status < 1)
768     {
769       status = errno;
770       if (status != EINTR)
771       {
772         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
773       }
774       continue;
775     }
776
777     for (i = 0; i < pollfds_num; i++)
778     {
779       int *client_sd;
780       struct sockaddr_storage client_sa;
781       socklen_t client_sa_size;
782       pthread_t tid;
783
784       if (pollfds[i].revents == 0)
785         continue;
786
787       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
788       {
789         RRDD_LOG (LOG_ERR, "listen_thread_main: "
790             "poll(2) returned something unexpected for listen FD #%i.",
791             pollfds[i].fd);
792         continue;
793       }
794
795       client_sd = (int *) malloc (sizeof (int));
796       if (client_sd == NULL)
797       {
798         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
799         continue;
800       }
801
802       client_sa_size = sizeof (client_sa);
803       *client_sd = accept (pollfds[i].fd,
804           (struct sockaddr *) &client_sa, &client_sa_size);
805       if (*client_sd < 0)
806       {
807         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
808         continue;
809       }
810
811       RRDD_LOG (LOG_DEBUG, "listen_thread_main: accept(2) returned fd #%i.",
812           *client_sd);
813
814       status = pthread_create (&tid, /* attr = */ NULL, connection_thread_main,
815           /* args = */ (void *) client_sd);
816       if (status != 0)
817       {
818         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
819         close (*client_sd);
820         free (client_sd);
821         continue;
822       }
823
824       RRDD_LOG (LOG_DEBUG, "listen_thread_main: pthread_create succeeded: "
825           "tid = %lu",
826           *((unsigned long *) &tid));
827     } /* for (pollfds_num) */
828   } /* while (do_shutdown == 0) */
829
830   close_listen_sockets ();
831
832   pthread_mutex_lock (&connetion_threads_lock);
833   while (connetion_threads_num > 0)
834   {
835     pthread_t wait_for;
836
837     wait_for = connetion_threads[0];
838
839     pthread_mutex_unlock (&connetion_threads_lock);
840     pthread_join (wait_for, /* retval = */ NULL);
841     pthread_mutex_lock (&connetion_threads_lock);
842   }
843   pthread_mutex_unlock (&connetion_threads_lock);
844
845   RRDD_LOG (LOG_DEBUG, "listen_thread_main: Exiting.");
846
847   return (NULL);
848 } /* }}} void *listen_thread_main */
849
850 static int daemonize (void) /* {{{ */
851 {
852   pid_t child;
853   int status;
854
855   child = fork ();
856   if (child < 0)
857   {
858     fprintf (stderr, "daemonize: fork(2) failed.\n");
859     return (-1);
860   }
861   else if (child > 0)
862   {
863     return (1);
864   }
865
866   /* Change into the /tmp directory. */
867   chdir ("/tmp");
868
869   /* Become session leader */
870   setsid ();
871
872   /* Open the first three file descriptors to /dev/null */
873   close (2);
874   close (1);
875   close (0);
876
877   open ("/dev/null", O_RDWR);
878   dup (0);
879   dup (0);
880
881   {
882     struct sigaction sa;
883
884     memset (&sa, 0, sizeof (sa));
885     sa.sa_handler = sig_int_handler;
886     sigaction (SIGINT, &sa, NULL);
887
888     memset (&sa, 0, sizeof (sa));
889     sa.sa_handler = sig_term_handler;
890     sigaction (SIGINT, &sa, NULL);
891
892     memset (&sa, 0, sizeof (sa));
893     sa.sa_handler = SIG_IGN;
894     sigaction (SIGPIPE, &sa, NULL);
895   }
896
897   openlog ("rrdd", LOG_PID, LOG_DAEMON);
898
899   cache_tree = g_tree_new ((GCompareFunc) strcmp);
900   if (cache_tree == NULL)
901   {
902     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
903     return (-1);
904   }
905
906   memset (&queue_thread, 0, sizeof (queue_thread));
907   status = pthread_create (&queue_thread, /* attr = */ NULL,
908       queue_thread_main, /* args = */ NULL);
909   if (status != 0)
910   {
911     RRDD_LOG (LOG_ERR, "daemonize: pthread_create failed.");
912     return (-1);
913   }
914
915   return (0);
916 } /* }}} int daemonize */
917
918 static int cleanup (void) /* {{{ */
919 {
920   RRDD_LOG (LOG_DEBUG, "cleanup ()");
921
922   do_shutdown++;
923
924   RRDD_LOG (LOG_DEBUG, "cleanup: Joining queue_thread..");
925   pthread_cond_signal (&cache_cond);
926   pthread_join (queue_thread, /* return = */ NULL);
927   RRDD_LOG (LOG_DEBUG, "cleanup: done");
928
929   closelog ();
930
931   return (0);
932 } /* }}} int cleanup */
933
934 static int read_options (int argc, char **argv) /* {{{ */
935 {
936   int option;
937   int status = 0;
938
939   while ((option = getopt(argc, argv, "l:f:w:h?")) != -1)
940   {
941     switch (option)
942     {
943       case 'l':
944       {
945         char **temp;
946
947         temp = (char **) realloc (config_listen_address_list,
948             sizeof (char *) * (config_listen_address_list_len + 1));
949         if (temp == NULL)
950         {
951           fprintf (stderr, "read_options: realloc failed.\n");
952           return (2);
953         }
954         config_listen_address_list = temp;
955
956         temp[config_listen_address_list_len] = strdup (optarg);
957         if (temp[config_listen_address_list_len] == NULL)
958         {
959           fprintf (stderr, "read_options: strdup failed.\n");
960           return (2);
961         }
962         config_listen_address_list_len++;
963       }
964       break;
965
966       case 'f':
967       {
968         int temp;
969
970         temp = atoi (optarg);
971         if (temp > 0)
972           config_flush_interval = temp;
973         else
974         {
975           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
976           status = 3;
977         }
978       }
979       break;
980
981       case 'w':
982       {
983         int temp;
984
985         temp = atoi (optarg);
986         if (temp > 0)
987           config_write_interval = temp;
988         else
989         {
990           fprintf (stderr, "Invalid write interval: %s\n", optarg);
991           status = 2;
992         }
993       }
994       break;
995
996       case 'h':
997       case '?':
998         printf ("RRDd %s  Copyright (C) 2008 Florian octo Forster\n"
999             "\n"
1000             "Usage: rrdd [options]\n"
1001             "\n"
1002             "Valid options are:\n"
1003             "  -l <address>  Socket address to listen to.\n"
1004             "  -w <seconds>  Interval in which to write data.\n"
1005             "  -f <seconds>  Interval in which to flush dead data.\n"
1006             "\n"
1007             "For more information and a detailed description of all options "
1008             "please refer\n"
1009             "to the rrdd(1) manual page.\n",
1010             VERSION);
1011         status = -1;
1012         break;
1013     } /* switch (option) */
1014   } /* while (getopt) */
1015
1016   return (status);
1017 } /* }}} int read_options */
1018
1019 int main (int argc, char **argv)
1020 {
1021   int status;
1022
1023   status = read_options (argc, argv);
1024   if (status != 0)
1025   {
1026     if (status < 0)
1027       status = 0;
1028     return (status);
1029   }
1030
1031   status = daemonize ();
1032   if (status == 1)
1033   {
1034     struct sigaction sigchld;
1035
1036     memset (&sigchld, 0, sizeof (sigchld));
1037     sigchld.sa_handler = SIG_IGN;
1038     sigaction (SIGCHLD, &sigchld, NULL);
1039
1040     return (0);
1041   }
1042   else if (status != 0)
1043   {
1044     fprintf (stderr, "daemonize failed, exiting.\n");
1045     return (1);
1046   }
1047
1048   listen_thread_main (NULL);
1049
1050   cleanup ();
1051
1052   return (0);
1053 } /* int main */
1054
1055 /*
1056  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
1057  */