src/rrd_daemon.c: Set the CI_FLAGS_IN_QUEUE flag when queueing a CI.
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; only version 2 of the License is applicable.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
17  *
18  * Authors:
19  *   Florian octo Forster <octo at verplant.org>
20  **/
21
22 /*
23  * First tell the compiler to stick to the C99 and POSIX standards as close as
24  * possible.
25  */
26 #ifndef __STRICT_ANSI__ /* {{{ */
27 # define __STRICT_ANSI__
28 #endif
29
30 #ifndef _ISOC99_SOURCE
31 # define _ISOC99_SOURCE
32 #endif
33
34 #ifdef _POSIX_C_SOURCE
35 # undef _POSIX_C_SOURCE
36 #endif
37 #define _POSIX_C_SOURCE 200112L
38
39 /* Single UNIX needed for strdup. */
40 #ifdef _XOPEN_SOURCE
41 # undef _XOPEN_SOURCE
42 #endif
43 #define _XOPEN_SOURCE 500
44
45 #ifndef _REENTRANT
46 # define _REENTRANT
47 #endif
48
49 #ifndef _THREAD_SAFE
50 # define _THREAD_SAFE
51 #endif
52
53 #ifdef _GNU_SOURCE
54 # undef _GNU_SOURCE
55 #endif
56 /* }}} */
57
58 /*
59  * Now for some includes..
60  */
61 #include "rrd.h" /* {{{ */
62 #include "rrd_client.h"
63
64 #include <stdlib.h>
65 #include <stdint.h>
66 #include <stdio.h>
67 #include <unistd.h>
68 #include <string.h>
69
70 #include <sys/types.h>
71 #include <sys/stat.h>
72 #include <fcntl.h>
73 #include <signal.h>
74 #include <sys/socket.h>
75 #include <sys/un.h>
76 #include <netdb.h>
77 #include <poll.h>
78 #include <syslog.h>
79 #include <pthread.h>
80 #include <errno.h>
81 #include <assert.h>
82 #include <sys/time.h>
83 #include <time.h>
84
85 #include <glib-2.0/glib.h>
86 /* }}} */
87
88 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
89
90 #ifndef __GNUC__
91 # define __attribute__(x) /**/
92 #endif
93
94 /*
95  * Types
96  */
97 struct listen_socket_s
98 {
99   int fd;
100   char path[PATH_MAX + 1];
101 };
102 typedef struct listen_socket_s listen_socket_t;
103
104 struct cache_item_s;
105 typedef struct cache_item_s cache_item_t;
106 struct cache_item_s
107 {
108   char *file;
109   char **values;
110   int values_num;
111   time_t last_flush_time;
112 #define CI_FLAGS_IN_TREE  0x01
113 #define CI_FLAGS_IN_QUEUE 0x02
114   int flags;
115
116   cache_item_t *next;
117 };
118
119 /*
120  * Variables
121  */
122 static listen_socket_t *listen_fds = NULL;
123 static size_t listen_fds_num = 0;
124
125 static int do_shutdown = 0;
126
127 static pthread_t queue_thread;
128
129 static pthread_t *connetion_threads = NULL;
130 static pthread_mutex_t connetion_threads_lock = PTHREAD_MUTEX_INITIALIZER;
131 static int connetion_threads_num = 0;
132
133 /* Cache stuff */
134 static GTree          *cache_tree = NULL;
135 static cache_item_t   *cache_queue_head = NULL;
136 static cache_item_t   *cache_queue_tail = NULL;
137 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
138 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
139
140 static int config_write_interval = 300;
141 static int config_flush_interval = 3600;
142
143 static char **config_listen_address_list = NULL;
144 static int config_listen_address_list_len = 0;
145
146 /* 
147  * Functions
148  */
149 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
150 {
151   do_shutdown++;
152 } /* }}} void sig_int_handler */
153
154 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
155 {
156   do_shutdown++;
157 } /* }}} void sig_term_handler */
158
159 /*
160  * enqueue_cache_item:
161  * `cache_lock' must be acquired before calling this function!
162  */
163 static int enqueue_cache_item (cache_item_t *ci) /* {{{ */
164 {
165   RRDD_LOG (LOG_DEBUG, "enqueue_cache_item: Adding %s to the update queue.",
166       ci->file);
167
168   if (ci == NULL)
169     return (-1);
170
171   if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
172     return (-1);
173
174   assert (ci->next == NULL);
175
176   if (ci->values_num == 0)
177     return (0);
178
179   if (cache_queue_tail == NULL)
180     cache_queue_head = ci;
181   else
182     cache_queue_tail->next = ci;
183   cache_queue_tail = ci;
184
185   ci->flags |= CI_FLAGS_IN_QUEUE;
186
187   return (0);
188 } /* }}} int enqueue_cache_item */
189
190 /*
191  * tree_callback_flush:
192  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
193  * while this is in progress.
194  */
195 static gboolean tree_callback_flush (gpointer key /* {{{ */
196     __attribute__((unused)), gpointer value, gpointer data)
197 {
198   cache_item_t *ci;
199   time_t now;
200
201   key = NULL; /* make compiler happy */
202
203   ci = (cache_item_t *) value;
204   now = *((time_t *) data);
205
206   if (((now - ci->last_flush_time) >= config_write_interval)
207       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0))
208     enqueue_cache_item (ci);
209
210   return (TRUE);
211 } /* }}} gboolean tree_callback_flush */
212
213 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
214 {
215   struct timeval now;
216   struct timespec next_flush;
217
218   gettimeofday (&now, NULL);
219   next_flush.tv_sec = now.tv_sec + config_flush_interval;
220   next_flush.tv_nsec = 1000 * now.tv_usec;
221
222   pthread_mutex_lock (&cache_lock);
223   while ((do_shutdown == 0) || (cache_queue_head != NULL))
224   {
225     cache_item_t *ci;
226     char *file;
227     char **values;
228     int values_num;
229     int status;
230     int i;
231
232     /* First, check if it's time to do the cache flush. */
233     gettimeofday (&now, NULL);
234     if ((now.tv_sec > next_flush.tv_sec)
235         || ((now.tv_sec == next_flush.tv_sec)
236           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
237     {
238       time_t time_now;
239
240       /* Pass the current time as user data so that we don't need to call
241        * `time' for each node. */
242       time_now = time (NULL);
243
244       g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &time_now);
245
246       /* Determine the time of the next cache flush. */
247       while (next_flush.tv_sec < now.tv_sec)
248         next_flush.tv_sec += config_flush_interval;
249     }
250
251     /* Now, check if there's something to store away. If not, wait until
252      * something comes in or it's time to do the cache flush. */
253     if (cache_queue_head == NULL)
254     {
255       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
256       if ((status != 0) && (status != ETIMEDOUT))
257       {
258         RRDD_LOG (LOG_ERR, "queue_thread_main: "
259             "pthread_cond_timedwait returned %i.", status);
260       }
261     }
262
263     /* Check if a value has arrived. This may be NULL if we timed out or there
264      * was an interrupt such as a signal. */
265     if (cache_queue_head == NULL)
266       continue;
267
268     ci = cache_queue_head;
269
270     /* copy the relevant parts */
271     file = strdup (ci->file);
272     if (file == NULL)
273     {
274       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
275       continue;
276     }
277
278     values = ci->values;
279     values_num = ci->values_num;
280
281     ci->values = NULL;
282     ci->values_num = 0;
283
284     ci->last_flush_time = time (NULL);
285     ci->flags &= ~(CI_FLAGS_IN_QUEUE);
286
287     cache_queue_head = ci->next;
288     if (cache_queue_head == NULL)
289       cache_queue_tail = NULL;
290     ci->next = NULL;
291
292     pthread_mutex_unlock (&cache_lock);
293
294     RRDD_LOG (LOG_DEBUG, "queue_thread_main: rrd_update (%s, %i, %p)",
295         file, values_num, (void *) values);
296
297     status = rrd_update_r (file, NULL, values_num, (void *) values);
298     if (status != 0)
299     {
300       RRDD_LOG (LOG_ERR, "queue_thread_main: "
301           "rrd_update_r failed with status %i.",
302           status);
303     }
304
305     free (file);
306     for (i = 0; i < values_num; i++)
307       free (values[i]);
308
309     pthread_mutex_lock (&cache_lock);
310   } /* while (do_shutdown == 0) */
311   pthread_mutex_unlock (&cache_lock);
312
313   RRDD_LOG (LOG_DEBUG, "queue_thread_main: Exiting.");
314
315   return (NULL);
316 } /* }}} void *queue_thread_main */
317
318 static int handle_request_update (int fd, /* {{{ */
319     char *buffer, int buffer_size __attribute__((unused)))
320 {
321   char *file;
322   char *value;
323   char *buffer_ptr;
324   int values_num = 0;
325   int status;
326
327   time_t now;
328
329   cache_item_t *ci;
330   char answer[4096];
331
332   now = time (NULL);
333
334   buffer_ptr = buffer;
335
336   file = buffer_ptr;
337   buffer_ptr += strlen (file) + 1;
338
339   pthread_mutex_lock (&cache_lock);
340
341   ci = g_tree_lookup (cache_tree, file);
342   if (ci == NULL)
343   {
344     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
345     if (ci == NULL)
346     {
347       pthread_mutex_unlock (&cache_lock);
348       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
349       return (-1);
350     }
351     memset (ci, 0, sizeof (cache_item_t));
352
353     ci->file = strdup (file);
354     if (ci->file == NULL)
355     {
356       pthread_mutex_unlock (&cache_lock);
357       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
358       free (ci);
359       return (-1);
360     }
361
362     ci->values = NULL;
363     ci->values_num = 0;
364     ci->last_flush_time = now;
365     ci->flags = CI_FLAGS_IN_TREE;
366
367     g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
368
369     RRDD_LOG (LOG_DEBUG, "handle_request_update: Created new tree node %s.",
370         ci->file);
371   }
372   assert (ci != NULL);
373
374   while (*buffer_ptr != 0)
375   {
376     char **temp;
377
378     value = buffer_ptr;
379     buffer_ptr += strlen (value) + 1;
380
381     temp = (char **) realloc (ci->values,
382         sizeof (char *) * (ci->values_num + 1));
383     if (temp == NULL)
384     {
385       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
386       continue;
387     }
388     ci->values = temp;
389
390     ci->values[ci->values_num] = strdup (value);
391     if (ci->values[ci->values_num] == NULL)
392     {
393       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
394       continue;
395     }
396     ci->values_num++;
397
398     values_num++;
399   }
400
401   if (((now - ci->last_flush_time) >= config_write_interval)
402       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0))
403   {
404     enqueue_cache_item (ci);
405     pthread_cond_signal (&cache_cond);
406   }
407
408   pthread_mutex_unlock (&cache_lock);
409
410   snprintf (answer, sizeof (answer), "0 Enqueued %i value(s)\n", values_num);
411   answer[sizeof (answer) - 1] = 0;
412
413   status = write (fd, answer, sizeof (answer));
414   if (status < 0)
415   {
416     status = errno;
417     RRDD_LOG (LOG_INFO, "handle_request_update: write(2) returned an error.");
418     return (status);
419   }
420
421   return (0);
422 } /* }}} int handle_request_update */
423
424 static int handle_request (int fd) /* {{{ */
425 {
426   char buffer[4096];
427   int buffer_size;
428
429   buffer_size = read (fd, buffer, sizeof (buffer));
430   if (buffer_size < 1)
431   {
432     RRDD_LOG (LOG_ERR, "handle_request: read(2) failed.");
433     return (-1);
434   }
435   assert (((size_t) buffer_size) <= sizeof (buffer));
436
437   if ((buffer[buffer_size - 2] != 0)
438       || (buffer[buffer_size - 1] != 0))
439   {
440     RRDD_LOG (LOG_INFO, "handle_request: malformed request.");
441     return (-1);
442   }
443
444   /* fields in the buffer a separated by null bytes. */
445   if (strcmp (buffer, "update") == 0)
446   {
447     int offset = strlen ("update") + 1;
448     return (handle_request_update (fd, buffer + offset,
449           buffer_size - offset));
450   }
451   else
452   {
453     RRDD_LOG (LOG_INFO, "handle_request: unknown command: %s.", buffer);
454     return (-1);
455   }
456 } /* }}} int handle_request */
457
458 static void *connection_thread_main (void *args /* {{{ */
459     __attribute__((unused)))
460 {
461   pthread_t self;
462   int i;
463   int fd;
464   
465   fd = *((int *) args);
466
467   pthread_mutex_lock (&connetion_threads_lock);
468   {
469     pthread_t *temp;
470
471     temp = (pthread_t *) realloc (connetion_threads,
472         sizeof (pthread_t) * (connetion_threads_num + 1));
473     if (temp == NULL)
474     {
475       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
476     }
477     else
478     {
479       connetion_threads = temp;
480       connetion_threads[connetion_threads_num] = pthread_self ();
481       connetion_threads_num++;
482     }
483   }
484   pthread_mutex_unlock (&connetion_threads_lock);
485
486   while (do_shutdown == 0)
487   {
488     struct pollfd pollfd;
489     int status;
490
491     pollfd.fd = fd;
492     pollfd.events = POLLIN | POLLPRI;
493     pollfd.revents = 0;
494
495     status = poll (&pollfd, 1, /* timeout = */ 500);
496     if (status == 0) /* timeout */
497       continue;
498     else if (status < 0) /* error */
499     {
500       status = errno;
501       if (status == EINTR)
502         continue;
503       RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
504       continue;
505     }
506
507     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
508     {
509       close (fd);
510       break;
511     }
512     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
513     {
514       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
515           "poll(2) returned something unexpected: %#04hx",
516           pollfd.revents);
517       close (fd);
518       break;
519     }
520
521     status = handle_request (fd);
522     if (status != 0)
523     {
524       close (fd);
525       break;
526     }
527   }
528
529   self = pthread_self ();
530   /* Remove this thread from the connection threads list */
531   pthread_mutex_lock (&connetion_threads_lock);
532   /* Find out own index in the array */
533   for (i = 0; i < connetion_threads_num; i++)
534     if (pthread_equal (connetion_threads[i], self) != 0)
535       break;
536   assert (i < connetion_threads_num);
537
538   /* Move the trailing threads forward. */
539   if (i < (connetion_threads_num - 1))
540   {
541     memmove (connetion_threads + i,
542         connetion_threads + i + 1,
543         sizeof (pthread_t) * (connetion_threads_num - i - 1));
544   }
545
546   connetion_threads_num--;
547   pthread_mutex_unlock (&connetion_threads_lock);
548
549   free (args);
550   return (NULL);
551 } /* }}} void *connection_thread_main */
552
553 static int open_listen_socket_unix (const char *path) /* {{{ */
554 {
555   int fd;
556   struct sockaddr_un sa;
557   listen_socket_t *temp;
558   int status;
559
560   temp = (listen_socket_t *) realloc (listen_fds,
561       sizeof (listen_fds[0]) * (listen_fds_num + 1));
562   if (temp == NULL)
563   {
564     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
565     return (-1);
566   }
567   listen_fds = temp;
568   memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
569
570   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
571   if (fd < 0)
572   {
573     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
574     return (-1);
575   }
576
577   memset (&sa, 0, sizeof (sa));
578   sa.sun_family = AF_UNIX;
579   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
580
581   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
582   if (status != 0)
583   {
584     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
585     close (fd);
586     unlink (path);
587     return (-1);
588   }
589
590   status = listen (fd, /* backlog = */ 10);
591   if (status != 0)
592   {
593     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
594     close (fd);
595     unlink (path);
596     return (-1);
597   }
598   
599   listen_fds[listen_fds_num].fd = fd;
600   snprintf (listen_fds[listen_fds_num].path,
601       sizeof (listen_fds[listen_fds_num].path) - 1,
602       "unix:%s", path);
603   listen_fds_num++;
604
605   return (0);
606 } /* }}} int open_listen_socket_unix */
607
608 static int open_listen_socket (const char *addr) /* {{{ */
609 {
610   struct addrinfo ai_hints;
611   struct addrinfo *ai_res;
612   struct addrinfo *ai_ptr;
613   int status;
614
615   assert (addr != NULL);
616
617   if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
618     return (open_listen_socket_unix (addr + strlen ("unix:")));
619   else if (addr[0] == '/')
620     return (open_listen_socket_unix (addr));
621
622   memset (&ai_hints, 0, sizeof (ai_hints));
623   ai_hints.ai_flags = 0;
624 #ifdef AI_ADDRCONFIG
625   ai_hints.ai_flags |= AI_ADDRCONFIG;
626 #endif
627   ai_hints.ai_family = AF_UNSPEC;
628   ai_hints.ai_socktype = SOCK_STREAM;
629
630   ai_res = NULL;
631   status = getaddrinfo (addr, DEFAULT_PORT, &ai_hints, &ai_res);
632   if (status != 0)
633   {
634     RRDD_LOG (LOG_ERR, "open_listen_socket: getaddrinfo(%s) failed: "
635         "%s", addr, gai_strerror (status));
636     return (-1);
637   }
638
639   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
640   {
641     int fd;
642     listen_socket_t *temp;
643
644     temp = (listen_socket_t *) realloc (listen_fds,
645         sizeof (listen_fds[0]) * (listen_fds_num + 1));
646     if (temp == NULL)
647     {
648       RRDD_LOG (LOG_ERR, "open_listen_socket: realloc failed.");
649       continue;
650     }
651     listen_fds = temp;
652     memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
653
654     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
655     if (fd < 0)
656     {
657       RRDD_LOG (LOG_ERR, "open_listen_socket: socket(2) failed.");
658       continue;
659     }
660
661     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
662     if (status != 0)
663     {
664       RRDD_LOG (LOG_ERR, "open_listen_socket: bind(2) failed.");
665       close (fd);
666       continue;
667     }
668
669     status = listen (fd, /* backlog = */ 10);
670     if (status != 0)
671     {
672       RRDD_LOG (LOG_ERR, "open_listen_socket: listen(2) failed.");
673       close (fd);
674       return (-1);
675     }
676
677     listen_fds[listen_fds_num].fd = fd;
678     strncpy (listen_fds[listen_fds_num].path, addr,
679         sizeof (listen_fds[listen_fds_num].path) - 1);
680     listen_fds_num++;
681   } /* for (ai_ptr) */
682
683   return (0);
684 } /* }}} int open_listen_socket */
685
686 static int close_listen_sockets (void) /* {{{ */
687 {
688   size_t i;
689
690   for (i = 0; i < listen_fds_num; i++)
691   {
692     close (listen_fds[i].fd);
693     if (strncmp ("unix:", listen_fds[i].path, strlen ("unix:")) == 0)
694       unlink (listen_fds[i].path + strlen ("unix:"));
695   }
696
697   free (listen_fds);
698   listen_fds = NULL;
699   listen_fds_num = 0;
700
701   return (0);
702 } /* }}} int close_listen_sockets */
703
704 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
705 {
706   struct pollfd *pollfds;
707   int pollfds_num;
708   int status;
709   int i;
710
711   for (i = 0; i < config_listen_address_list_len; i++)
712   {
713     RRDD_LOG (LOG_DEBUG, "listen_thread_main: config_listen_address_list[%i] "
714         "= %s", i, config_listen_address_list[i]);
715     open_listen_socket (config_listen_address_list[i]);
716   }
717
718   if (config_listen_address_list_len < 1)
719     open_listen_socket (RRDD_SOCK_PATH);
720
721   if (listen_fds_num < 1)
722   {
723     RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
724         "could be opened. Sorry.");
725     return (NULL);
726   }
727
728   pollfds_num = listen_fds_num;
729   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
730   if (pollfds == NULL)
731   {
732     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
733     return (NULL);
734   }
735   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
736
737   while (do_shutdown == 0)
738   {
739     assert (pollfds_num == ((int) listen_fds_num));
740     for (i = 0; i < pollfds_num; i++)
741     {
742       pollfds[i].fd = listen_fds[i].fd;
743       pollfds[i].events = POLLIN | POLLPRI;
744       pollfds[i].revents = 0;
745     }
746
747     status = poll (pollfds, pollfds_num, /* timeout = */ -1);
748     if (status < 1)
749     {
750       status = errno;
751       if (status != EINTR)
752       {
753         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
754       }
755       continue;
756     }
757
758     for (i = 0; i < pollfds_num; i++)
759     {
760       int *client_sd;
761       struct sockaddr_storage client_sa;
762       socklen_t client_sa_size;
763       pthread_t tid;
764
765       if (pollfds[i].revents == 0)
766         continue;
767
768       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
769       {
770         RRDD_LOG (LOG_ERR, "listen_thread_main: "
771             "poll(2) returned something unexpected for listen FD #%i.",
772             pollfds[i].fd);
773         continue;
774       }
775
776       client_sd = (int *) malloc (sizeof (int));
777       if (client_sd == NULL)
778       {
779         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
780         continue;
781       }
782
783       client_sa_size = sizeof (client_sa);
784       *client_sd = accept (pollfds[i].fd,
785           (struct sockaddr *) &client_sa, &client_sa_size);
786       if (*client_sd < 0)
787       {
788         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
789         continue;
790       }
791
792       status = pthread_create (&tid, /* attr = */ NULL, connection_thread_main,
793           /* args = */ (void *) client_sd);
794       if (status != 0)
795       {
796         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
797         close (*client_sd);
798         free (client_sd);
799         continue;
800       }
801     } /* for (pollfds_num) */
802   } /* while (do_shutdown == 0) */
803
804   close_listen_sockets ();
805
806   pthread_mutex_lock (&connetion_threads_lock);
807   while (connetion_threads_num > 0)
808   {
809     pthread_t wait_for;
810
811     wait_for = connetion_threads[0];
812
813     pthread_mutex_unlock (&connetion_threads_lock);
814     pthread_join (wait_for, /* retval = */ NULL);
815     pthread_mutex_lock (&connetion_threads_lock);
816   }
817   pthread_mutex_unlock (&connetion_threads_lock);
818
819   RRDD_LOG (LOG_DEBUG, "listen_thread_main: Exiting.");
820
821   return (NULL);
822 } /* }}} void *listen_thread_main */
823
824 static int daemonize (void) /* {{{ */
825 {
826   pid_t child;
827   int status;
828
829   child = fork ();
830   if (child < 0)
831   {
832     fprintf (stderr, "daemonize: fork(2) failed.\n");
833     return (-1);
834   }
835   else if (child > 0)
836   {
837     return (1);
838   }
839
840   /* Change into the /tmp directory. */
841   chdir ("/tmp");
842
843   /* Become session leader */
844   setsid ();
845
846   /* Open the first three file descriptors to /dev/null */
847   close (2);
848   close (1);
849   close (0);
850
851   open ("/dev/null", O_RDWR);
852   dup (0);
853   dup (0);
854
855   {
856     struct sigaction sa;
857
858     memset (&sa, 0, sizeof (sa));
859     sa.sa_handler = sig_int_handler;
860     sigaction (SIGINT, &sa, NULL);
861
862     memset (&sa, 0, sizeof (sa));
863     sa.sa_handler = sig_term_handler;
864     sigaction (SIGINT, &sa, NULL);
865
866     memset (&sa, 0, sizeof (sa));
867     sa.sa_handler = SIG_IGN;
868     sigaction (SIGPIPE, &sa, NULL);
869   }
870
871   openlog ("rrdd", LOG_PID, LOG_DAEMON);
872
873   cache_tree = g_tree_new ((GCompareFunc) strcmp);
874   if (cache_tree == NULL)
875   {
876     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
877     return (-1);
878   }
879
880   memset (&queue_thread, 0, sizeof (queue_thread));
881   status = pthread_create (&queue_thread, /* attr = */ NULL,
882       queue_thread_main, /* args = */ NULL);
883   if (status != 0)
884   {
885     RRDD_LOG (LOG_ERR, "daemonize: pthread_create failed.");
886     return (-1);
887   }
888
889   return (0);
890 } /* }}} int daemonize */
891
892 static int cleanup (void) /* {{{ */
893 {
894   RRDD_LOG (LOG_DEBUG, "cleanup ()");
895
896   do_shutdown++;
897
898   RRDD_LOG (LOG_DEBUG, "cleanup: Joining queue_thread..");
899   pthread_cond_signal (&cache_cond);
900   pthread_join (queue_thread, /* return = */ NULL);
901   RRDD_LOG (LOG_DEBUG, "cleanup: done");
902
903   closelog ();
904
905   return (0);
906 } /* }}} int cleanup */
907
908 static int read_options (int argc, char **argv) /* {{{ */
909 {
910   int option;
911   int status = 0;
912
913   while ((option = getopt(argc, argv, "l:f:w:h?")) != -1)
914   {
915     switch (option)
916     {
917       case 'l':
918       {
919         char **temp;
920
921         temp = (char **) realloc (config_listen_address_list,
922             sizeof (char *) * (config_listen_address_list_len + 1));
923         if (temp == NULL)
924         {
925           fprintf (stderr, "read_options: realloc failed.\n");
926           return (2);
927         }
928         config_listen_address_list = temp;
929
930         temp[config_listen_address_list_len] = strdup (optarg);
931         if (temp[config_listen_address_list_len] == NULL)
932         {
933           fprintf (stderr, "read_options: strdup failed.\n");
934           return (2);
935         }
936         config_listen_address_list_len++;
937       }
938       break;
939
940       case 'f':
941       {
942         int temp;
943
944         temp = atoi (optarg);
945         if (temp > 0)
946           config_flush_interval = temp;
947         else
948         {
949           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
950           status = 3;
951         }
952       }
953       break;
954
955       case 'w':
956       {
957         int temp;
958
959         temp = atoi (optarg);
960         if (temp > 0)
961           config_write_interval = temp;
962         else
963         {
964           fprintf (stderr, "Invalid write interval: %s\n", optarg);
965           status = 2;
966         }
967       }
968       break;
969
970       case 'h':
971       case '?':
972         printf ("RRDd %s  Copyright (C) 2008 Florian octo Forster\n"
973             "\n"
974             "Usage: rrdd [options]\n"
975             "\n"
976             "Valid options are:\n"
977             "  -l <address>  Socket address to listen to.\n"
978             "  -w <seconds>  Interval in which to write data.\n"
979             "  -f <seconds>  Interval in which to flush dead data.\n"
980             "\n"
981             "For more information and a detailed description of all options "
982             "please refer\n"
983             "to the rrdd(1) manual page.\n",
984             VERSION);
985         status = -1;
986         break;
987     } /* switch (option) */
988   } /* while (getopt) */
989
990   return (status);
991 } /* }}} int read_options */
992
993 int main (int argc, char **argv)
994 {
995   int status;
996
997   status = read_options (argc, argv);
998   if (status != 0)
999   {
1000     if (status < 0)
1001       status = 0;
1002     return (status);
1003   }
1004
1005   status = daemonize ();
1006   if (status == 1)
1007   {
1008     struct sigaction sigchld;
1009
1010     memset (&sigchld, 0, sizeof (sigchld));
1011     sigchld.sa_handler = SIG_IGN;
1012     sigaction (SIGCHLD, &sigchld, NULL);
1013
1014     return (0);
1015   }
1016   else if (status != 0)
1017   {
1018     fprintf (stderr, "daemonize failed, exiting.\n");
1019     return (1);
1020   }
1021
1022   listen_thread_main (NULL);
1023
1024   cleanup ();
1025
1026   return (0);
1027 } /* int main */
1028
1029 /*
1030  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
1031  */