df31d514212199ddf21bdf7de767f7c27ae43b0c
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; only version 2 of the License is applicable.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
17  *
18  * Authors:
19  *   Florian octo Forster <octo at verplant.org>
20  **/
21
22 /*
23  * First tell the compiler to stick to the C99 and POSIX standards as close as
24  * possible.
25  */
26 #ifndef __STRICT_ANSI__ /* {{{ */
27 # define __STRICT_ANSI__
28 #endif
29
30 #ifndef _ISOC99_SOURCE
31 # define _ISOC99_SOURCE
32 #endif
33
34 #ifdef _POSIX_C_SOURCE
35 # undef _POSIX_C_SOURCE
36 #endif
37 #define _POSIX_C_SOURCE 200112L
38
39 /* Single UNIX needed for strdup. */
40 #ifdef _XOPEN_SOURCE
41 # undef _XOPEN_SOURCE
42 #endif
43 #define _XOPEN_SOURCE 500
44
45 #ifndef _REENTRANT
46 # define _REENTRANT
47 #endif
48
49 #ifndef _THREAD_SAFE
50 # define _THREAD_SAFE
51 #endif
52
53 #ifdef _GNU_SOURCE
54 # undef _GNU_SOURCE
55 #endif
56 /* }}} */
57
58 /*
59  * Now for some includes..
60  */
61 #include "rrd.h" /* {{{ */
62 #include "rrd_client.h"
63
64 #include <stdlib.h>
65 #include <stdint.h>
66 #include <stdio.h>
67 #include <unistd.h>
68 #include <string.h>
69
70 #include <sys/types.h>
71 #include <sys/stat.h>
72 #include <fcntl.h>
73 #include <signal.h>
74 #include <sys/socket.h>
75 #include <sys/un.h>
76 #include <netdb.h>
77 #include <poll.h>
78 #include <syslog.h>
79 #include <pthread.h>
80 #include <errno.h>
81 #include <assert.h>
82 #include <sys/time.h>
83 #include <time.h>
84
85 #include <glib-2.0/glib.h>
86 /* }}} */
87
88 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
89
90 #ifndef __GNUC__
91 # define __attribute__(x) /**/
92 #endif
93
94 /*
95  * Types
96  */
97 struct listen_socket_s
98 {
99   int fd;
100   char path[PATH_MAX + 1];
101 };
102 typedef struct listen_socket_s listen_socket_t;
103
104 struct cache_item_s;
105 typedef struct cache_item_s cache_item_t;
106 struct cache_item_s
107 {
108   char *file;
109   char **values;
110   int values_num;
111   time_t last_flush_time;
112 #define CI_FLAGS_IN_TREE  0x01
113 #define CI_FLAGS_IN_QUEUE 0x02
114   int flags;
115
116   cache_item_t *next;
117 };
118
119 /*
120  * Variables
121  */
122 static listen_socket_t *listen_fds = NULL;
123 static size_t listen_fds_num = 0;
124
125 static int do_shutdown = 0;
126
127 static pthread_t queue_thread;
128
129 static pthread_t *connetion_threads = NULL;
130 static pthread_mutex_t connetion_threads_lock = PTHREAD_MUTEX_INITIALIZER;
131 static int connetion_threads_num = 0;
132
133 /* Cache stuff */
134 static GTree          *cache_tree = NULL;
135 static cache_item_t   *cache_queue_head = NULL;
136 static cache_item_t   *cache_queue_tail = NULL;
137 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
138 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
139
140 static int config_write_interval = 300;
141 static int config_flush_interval = 3600;
142
143 static char **config_listen_address_list = NULL;
144 static int config_listen_address_list_len = 0;
145
146 /* 
147  * Functions
148  */
149 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
150 {
151   do_shutdown++;
152 } /* }}} void sig_int_handler */
153
154 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
155 {
156   do_shutdown++;
157 } /* }}} void sig_term_handler */
158
159 /*
160  * enqueue_cache_item:
161  * `cache_lock' must be acquired before calling this function!
162  */
163 static int enqueue_cache_item (cache_item_t *ci) /* {{{ */
164 {
165   RRDD_LOG (LOG_DEBUG, "enqueue_cache_item: Adding %s to the update queue.",
166       ci->file);
167
168   if (ci == NULL)
169     return (-1);
170
171   if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
172     return (-1);
173
174   assert (ci->next == NULL);
175
176   if (ci->values_num == 0)
177     return (0);
178
179   if (cache_queue_tail == NULL)
180     cache_queue_head = ci;
181   else
182     cache_queue_tail->next = ci;
183   cache_queue_tail = ci;
184
185   ci->flags |= CI_FLAGS_IN_QUEUE;
186
187   ci->flags |= CI_FLAGS_IN_QUEUE;
188
189   return (0);
190 } /* }}} int enqueue_cache_item */
191
192 /*
193  * tree_callback_flush:
194  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
195  * while this is in progress.
196  */
197 static gboolean tree_callback_flush (gpointer key /* {{{ */
198     __attribute__((unused)), gpointer value, gpointer data)
199 {
200   cache_item_t *ci;
201   time_t now;
202
203   key = NULL; /* make compiler happy */
204
205   ci = (cache_item_t *) value;
206   now = *((time_t *) data);
207
208   if (((now - ci->last_flush_time) >= config_write_interval)
209       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
210       && (ci->values_num > 0))
211     enqueue_cache_item (ci);
212
213   return (TRUE);
214 } /* }}} gboolean tree_callback_flush */
215
216 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
217 {
218   struct timeval now;
219   struct timespec next_flush;
220
221   gettimeofday (&now, NULL);
222   next_flush.tv_sec = now.tv_sec + config_flush_interval;
223   next_flush.tv_nsec = 1000 * now.tv_usec;
224
225   pthread_mutex_lock (&cache_lock);
226   while ((do_shutdown == 0) || (cache_queue_head != NULL))
227   {
228     cache_item_t *ci;
229     char *file;
230     char **values;
231     int values_num;
232     int status;
233     int i;
234
235     /* First, check if it's time to do the cache flush. */
236     gettimeofday (&now, NULL);
237     if ((now.tv_sec > next_flush.tv_sec)
238         || ((now.tv_sec == next_flush.tv_sec)
239           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
240     {
241       time_t time_now;
242
243       /* Pass the current time as user data so that we don't need to call
244        * `time' for each node. */
245       time_now = time (NULL);
246
247       g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &time_now);
248
249       /* Determine the time of the next cache flush. */
250       while (next_flush.tv_sec < now.tv_sec)
251         next_flush.tv_sec += config_flush_interval;
252     }
253
254     /* Now, check if there's something to store away. If not, wait until
255      * something comes in or it's time to do the cache flush. */
256     if (cache_queue_head == NULL)
257     {
258       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
259       if ((status != 0) && (status != ETIMEDOUT))
260       {
261         RRDD_LOG (LOG_ERR, "queue_thread_main: "
262             "pthread_cond_timedwait returned %i.", status);
263       }
264     }
265
266     /* Check if a value has arrived. This may be NULL if we timed out or there
267      * was an interrupt such as a signal. */
268     if (cache_queue_head == NULL)
269       continue;
270
271     ci = cache_queue_head;
272
273     /* copy the relevant parts */
274     file = strdup (ci->file);
275     if (file == NULL)
276     {
277       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
278       continue;
279     }
280
281     values = ci->values;
282     values_num = ci->values_num;
283
284     ci->values = NULL;
285     ci->values_num = 0;
286
287     ci->last_flush_time = time (NULL);
288     ci->flags &= ~(CI_FLAGS_IN_QUEUE);
289
290     cache_queue_head = ci->next;
291     if (cache_queue_head == NULL)
292       cache_queue_tail = NULL;
293     ci->next = NULL;
294
295     pthread_mutex_unlock (&cache_lock);
296
297     RRDD_LOG (LOG_DEBUG, "queue_thread_main: rrd_update (%s, %i, %p)",
298         file, values_num, (void *) values);
299
300     status = rrd_update_r (file, NULL, values_num, (void *) values);
301     if (status != 0)
302     {
303       RRDD_LOG (LOG_ERR, "queue_thread_main: "
304           "rrd_update_r failed with status %i.",
305           status);
306     }
307
308     free (file);
309     for (i = 0; i < values_num; i++)
310       free (values[i]);
311
312     pthread_mutex_lock (&cache_lock);
313   } /* while (do_shutdown == 0) */
314   pthread_mutex_unlock (&cache_lock);
315
316   RRDD_LOG (LOG_DEBUG, "queue_thread_main: Exiting.");
317
318   return (NULL);
319 } /* }}} void *queue_thread_main */
320
321 static int buffer_get_field (char **buffer_ret, /* {{{ */
322     size_t *buffer_size_ret, char **field_ret)
323 {
324   char *buffer;
325   size_t buffer_pos;
326   size_t buffer_size;
327   char *field;
328   size_t field_size;
329   int status;
330
331   buffer = *buffer_ret;
332   buffer_pos = 0;
333   buffer_size = *buffer_size_ret;
334   field = *buffer_ret;
335   field_size = 0;
336
337   /* This is ensured by `handle_request'. */
338   assert (buffer[buffer_size - 1] == ' ');
339
340   status = -1;
341   while (buffer_pos < buffer_size)
342   {
343     /* Check for end-of-field or end-of-buffer */
344     if (buffer[buffer_pos] == ' ')
345     {
346       field[field_size] = 0;
347       field_size++;
348       buffer_pos++;
349       status = 0;
350       break;
351     }
352     /* Handle escaped characters. */
353     else if (buffer[buffer_pos] == '\\')
354     {
355       if (buffer_pos >= (buffer_size - 1))
356         break;
357       buffer_pos++;
358       field[field_size] = buffer[buffer_pos];
359       field_size++;
360       buffer_pos++;
361     }
362     /* Normal operation */ 
363     else
364     {
365       field[field_size] = buffer[buffer_pos];
366       field_size++;
367       buffer_pos++;
368     }
369   } /* while (buffer_pos < buffer_size) */
370
371   if (status != 0)
372     return (status);
373
374   *buffer_ret = buffer + buffer_pos;
375   *buffer_size_ret = buffer_size - buffer_pos;
376   *field_ret = field;
377
378   return (0);
379 } /* }}} int buffer_get_field */
380
381 static int handle_request_update (int fd, /* {{{ */
382     char *buffer, size_t buffer_size)
383 {
384   char *file;
385   int values_num = 0;
386   int status;
387
388   time_t now;
389
390   cache_item_t *ci;
391   char answer[4096];
392
393   now = time (NULL);
394
395   status = buffer_get_field (&buffer, &buffer_size, &file);
396   if (status != 0)
397   {
398     RRDD_LOG (LOG_INFO, "handle_request_update: Cannot get file name.");
399     return (-1);
400   }
401
402   pthread_mutex_lock (&cache_lock);
403
404   ci = g_tree_lookup (cache_tree, file);
405   if (ci == NULL) /* {{{ */
406   {
407     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
408     if (ci == NULL)
409     {
410       pthread_mutex_unlock (&cache_lock);
411       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
412       return (-1);
413     }
414     memset (ci, 0, sizeof (cache_item_t));
415
416     ci->file = strdup (file);
417     if (ci->file == NULL)
418     {
419       pthread_mutex_unlock (&cache_lock);
420       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
421       free (ci);
422       return (-1);
423     }
424
425     ci->values = NULL;
426     ci->values_num = 0;
427     ci->last_flush_time = now;
428     ci->flags = CI_FLAGS_IN_TREE;
429
430     g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
431
432     RRDD_LOG (LOG_DEBUG, "handle_request_update: Created new tree node %s.",
433         ci->file);
434   } /* }}} */
435   assert (ci != NULL);
436
437   while (buffer_size > 0)
438   {
439     char **temp;
440     char *value;
441
442     status = buffer_get_field (&buffer, &buffer_size, &value);
443     if (status != 0)
444     {
445       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
446       break;
447     }
448
449     temp = (char **) realloc (ci->values,
450         sizeof (char *) * (ci->values_num + 1));
451     if (temp == NULL)
452     {
453       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
454       continue;
455     }
456     ci->values = temp;
457
458     ci->values[ci->values_num] = strdup (value);
459     if (ci->values[ci->values_num] == NULL)
460     {
461       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
462       continue;
463     }
464     ci->values_num++;
465
466     values_num++;
467   }
468
469   if (((now - ci->last_flush_time) >= config_write_interval)
470       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
471       && (ci->values_num > 0))
472   {
473     enqueue_cache_item (ci);
474     pthread_cond_signal (&cache_cond);
475   }
476
477   pthread_mutex_unlock (&cache_lock);
478
479   snprintf (answer, sizeof (answer), "0 Enqueued %i value(s)\n", values_num);
480   answer[sizeof (answer) - 1] = 0;
481
482   status = write (fd, answer, strlen (answer));
483   if (status < 0)
484   {
485     status = errno;
486     RRDD_LOG (LOG_INFO, "handle_request_update: write(2) returned an error.");
487     return (status);
488   }
489
490   return (0);
491 } /* }}} int handle_request_update */
492
493 static int handle_request (int fd) /* {{{ */
494 {
495   char buffer[4096];
496   size_t buffer_size;
497   char *buffer_ptr;
498   char *command;
499   int status;
500
501   status = read (fd, buffer, sizeof (buffer));
502   if (status < 1)
503   {
504     RRDD_LOG (LOG_ERR, "handle_request: read(2) failed.");
505     return (-1);
506   }
507   buffer_size = status;
508   assert (((size_t) buffer_size) <= sizeof (buffer));
509
510   if (buffer[buffer_size - 1] != '\n')
511   {
512     RRDD_LOG (LOG_INFO, "handle_request: malformed request.");
513     return (-1);
514   }
515   /* Place the normal field separator at the end to simplify
516    * `buffer_get_field's work. */
517   buffer[buffer_size - 1] = ' ';
518
519   buffer_ptr = buffer;
520   command = NULL;
521   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
522   if (status != 0)
523   {
524     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
525     return (-1);
526   }
527
528   if (strcmp (command, "update") == 0)
529   {
530     return (handle_request_update (fd, buffer_ptr, buffer_size));
531   }
532   else
533   {
534     RRDD_LOG (LOG_INFO, "handle_request: unknown command: %s.", buffer);
535     return (-1);
536   }
537 } /* }}} int handle_request */
538
539 static void *connection_thread_main (void *args /* {{{ */
540     __attribute__((unused)))
541 {
542   pthread_t self;
543   int i;
544   int fd;
545   
546   fd = *((int *) args);
547
548   pthread_mutex_lock (&connetion_threads_lock);
549   {
550     pthread_t *temp;
551
552     temp = (pthread_t *) realloc (connetion_threads,
553         sizeof (pthread_t) * (connetion_threads_num + 1));
554     if (temp == NULL)
555     {
556       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
557     }
558     else
559     {
560       connetion_threads = temp;
561       connetion_threads[connetion_threads_num] = pthread_self ();
562       connetion_threads_num++;
563     }
564   }
565   pthread_mutex_unlock (&connetion_threads_lock);
566
567   while (do_shutdown == 0)
568   {
569     struct pollfd pollfd;
570     int status;
571
572     pollfd.fd = fd;
573     pollfd.events = POLLIN | POLLPRI;
574     pollfd.revents = 0;
575
576     status = poll (&pollfd, 1, /* timeout = */ 500);
577     if (status == 0) /* timeout */
578       continue;
579     else if (status < 0) /* error */
580     {
581       status = errno;
582       if (status == EINTR)
583         continue;
584       RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
585       continue;
586     }
587
588     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
589     {
590       close (fd);
591       break;
592     }
593     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
594     {
595       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
596           "poll(2) returned something unexpected: %#04hx",
597           pollfd.revents);
598       close (fd);
599       break;
600     }
601
602     status = handle_request (fd);
603     if (status != 0)
604     {
605       close (fd);
606       break;
607     }
608   }
609
610   self = pthread_self ();
611   /* Remove this thread from the connection threads list */
612   pthread_mutex_lock (&connetion_threads_lock);
613   /* Find out own index in the array */
614   for (i = 0; i < connetion_threads_num; i++)
615     if (pthread_equal (connetion_threads[i], self) != 0)
616       break;
617   assert (i < connetion_threads_num);
618
619   /* Move the trailing threads forward. */
620   if (i < (connetion_threads_num - 1))
621   {
622     memmove (connetion_threads + i,
623         connetion_threads + i + 1,
624         sizeof (pthread_t) * (connetion_threads_num - i - 1));
625   }
626
627   connetion_threads_num--;
628   pthread_mutex_unlock (&connetion_threads_lock);
629
630   free (args);
631   return (NULL);
632 } /* }}} void *connection_thread_main */
633
634 static int open_listen_socket_unix (const char *path) /* {{{ */
635 {
636   int fd;
637   struct sockaddr_un sa;
638   listen_socket_t *temp;
639   int status;
640
641   temp = (listen_socket_t *) realloc (listen_fds,
642       sizeof (listen_fds[0]) * (listen_fds_num + 1));
643   if (temp == NULL)
644   {
645     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
646     return (-1);
647   }
648   listen_fds = temp;
649   memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
650
651   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
652   if (fd < 0)
653   {
654     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
655     return (-1);
656   }
657
658   memset (&sa, 0, sizeof (sa));
659   sa.sun_family = AF_UNIX;
660   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
661
662   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
663   if (status != 0)
664   {
665     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
666     close (fd);
667     unlink (path);
668     return (-1);
669   }
670
671   status = listen (fd, /* backlog = */ 10);
672   if (status != 0)
673   {
674     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
675     close (fd);
676     unlink (path);
677     return (-1);
678   }
679   
680   listen_fds[listen_fds_num].fd = fd;
681   snprintf (listen_fds[listen_fds_num].path,
682       sizeof (listen_fds[listen_fds_num].path) - 1,
683       "unix:%s", path);
684   listen_fds_num++;
685
686   return (0);
687 } /* }}} int open_listen_socket_unix */
688
689 static int open_listen_socket (const char *addr) /* {{{ */
690 {
691   struct addrinfo ai_hints;
692   struct addrinfo *ai_res;
693   struct addrinfo *ai_ptr;
694   int status;
695
696   assert (addr != NULL);
697
698   if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
699     return (open_listen_socket_unix (addr + strlen ("unix:")));
700   else if (addr[0] == '/')
701     return (open_listen_socket_unix (addr));
702
703   memset (&ai_hints, 0, sizeof (ai_hints));
704   ai_hints.ai_flags = 0;
705 #ifdef AI_ADDRCONFIG
706   ai_hints.ai_flags |= AI_ADDRCONFIG;
707 #endif
708   ai_hints.ai_family = AF_UNSPEC;
709   ai_hints.ai_socktype = SOCK_STREAM;
710
711   ai_res = NULL;
712   status = getaddrinfo (addr, DEFAULT_PORT, &ai_hints, &ai_res);
713   if (status != 0)
714   {
715     RRDD_LOG (LOG_ERR, "open_listen_socket: getaddrinfo(%s) failed: "
716         "%s", addr, gai_strerror (status));
717     return (-1);
718   }
719
720   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
721   {
722     int fd;
723     listen_socket_t *temp;
724
725     temp = (listen_socket_t *) realloc (listen_fds,
726         sizeof (listen_fds[0]) * (listen_fds_num + 1));
727     if (temp == NULL)
728     {
729       RRDD_LOG (LOG_ERR, "open_listen_socket: realloc failed.");
730       continue;
731     }
732     listen_fds = temp;
733     memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
734
735     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
736     if (fd < 0)
737     {
738       RRDD_LOG (LOG_ERR, "open_listen_socket: socket(2) failed.");
739       continue;
740     }
741
742     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
743     if (status != 0)
744     {
745       RRDD_LOG (LOG_ERR, "open_listen_socket: bind(2) failed.");
746       close (fd);
747       continue;
748     }
749
750     status = listen (fd, /* backlog = */ 10);
751     if (status != 0)
752     {
753       RRDD_LOG (LOG_ERR, "open_listen_socket: listen(2) failed.");
754       close (fd);
755       return (-1);
756     }
757
758     listen_fds[listen_fds_num].fd = fd;
759     strncpy (listen_fds[listen_fds_num].path, addr,
760         sizeof (listen_fds[listen_fds_num].path) - 1);
761     listen_fds_num++;
762   } /* for (ai_ptr) */
763
764   return (0);
765 } /* }}} int open_listen_socket */
766
767 static int close_listen_sockets (void) /* {{{ */
768 {
769   size_t i;
770
771   for (i = 0; i < listen_fds_num; i++)
772   {
773     close (listen_fds[i].fd);
774     if (strncmp ("unix:", listen_fds[i].path, strlen ("unix:")) == 0)
775       unlink (listen_fds[i].path + strlen ("unix:"));
776   }
777
778   free (listen_fds);
779   listen_fds = NULL;
780   listen_fds_num = 0;
781
782   return (0);
783 } /* }}} int close_listen_sockets */
784
785 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
786 {
787   struct pollfd *pollfds;
788   int pollfds_num;
789   int status;
790   int i;
791
792   for (i = 0; i < config_listen_address_list_len; i++)
793   {
794     RRDD_LOG (LOG_DEBUG, "listen_thread_main: config_listen_address_list[%i] "
795         "= %s", i, config_listen_address_list[i]);
796     open_listen_socket (config_listen_address_list[i]);
797   }
798
799   if (config_listen_address_list_len < 1)
800     open_listen_socket (RRDD_SOCK_PATH);
801
802   if (listen_fds_num < 1)
803   {
804     RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
805         "could be opened. Sorry.");
806     return (NULL);
807   }
808
809   pollfds_num = listen_fds_num;
810   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
811   if (pollfds == NULL)
812   {
813     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
814     return (NULL);
815   }
816   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
817
818   while (do_shutdown == 0)
819   {
820     assert (pollfds_num == ((int) listen_fds_num));
821     for (i = 0; i < pollfds_num; i++)
822     {
823       pollfds[i].fd = listen_fds[i].fd;
824       pollfds[i].events = POLLIN | POLLPRI;
825       pollfds[i].revents = 0;
826     }
827
828     status = poll (pollfds, pollfds_num, /* timeout = */ -1);
829     if (status < 1)
830     {
831       status = errno;
832       if (status != EINTR)
833       {
834         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
835       }
836       continue;
837     }
838
839     for (i = 0; i < pollfds_num; i++)
840     {
841       int *client_sd;
842       struct sockaddr_storage client_sa;
843       socklen_t client_sa_size;
844       pthread_t tid;
845
846       if (pollfds[i].revents == 0)
847         continue;
848
849       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
850       {
851         RRDD_LOG (LOG_ERR, "listen_thread_main: "
852             "poll(2) returned something unexpected for listen FD #%i.",
853             pollfds[i].fd);
854         continue;
855       }
856
857       client_sd = (int *) malloc (sizeof (int));
858       if (client_sd == NULL)
859       {
860         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
861         continue;
862       }
863
864       client_sa_size = sizeof (client_sa);
865       *client_sd = accept (pollfds[i].fd,
866           (struct sockaddr *) &client_sa, &client_sa_size);
867       if (*client_sd < 0)
868       {
869         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
870         continue;
871       }
872
873       status = pthread_create (&tid, /* attr = */ NULL, connection_thread_main,
874           /* args = */ (void *) client_sd);
875       if (status != 0)
876       {
877         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
878         close (*client_sd);
879         free (client_sd);
880         continue;
881       }
882     } /* for (pollfds_num) */
883   } /* while (do_shutdown == 0) */
884
885   close_listen_sockets ();
886
887   pthread_mutex_lock (&connetion_threads_lock);
888   while (connetion_threads_num > 0)
889   {
890     pthread_t wait_for;
891
892     wait_for = connetion_threads[0];
893
894     pthread_mutex_unlock (&connetion_threads_lock);
895     pthread_join (wait_for, /* retval = */ NULL);
896     pthread_mutex_lock (&connetion_threads_lock);
897   }
898   pthread_mutex_unlock (&connetion_threads_lock);
899
900   RRDD_LOG (LOG_DEBUG, "listen_thread_main: Exiting.");
901
902   return (NULL);
903 } /* }}} void *listen_thread_main */
904
905 static int daemonize (void) /* {{{ */
906 {
907   pid_t child;
908   int status;
909
910   child = fork ();
911   if (child < 0)
912   {
913     fprintf (stderr, "daemonize: fork(2) failed.\n");
914     return (-1);
915   }
916   else if (child > 0)
917   {
918     return (1);
919   }
920
921   /* Change into the /tmp directory. */
922   chdir ("/tmp");
923
924   /* Become session leader */
925   setsid ();
926
927   /* Open the first three file descriptors to /dev/null */
928   close (2);
929   close (1);
930   close (0);
931
932   open ("/dev/null", O_RDWR);
933   dup (0);
934   dup (0);
935
936   {
937     struct sigaction sa;
938
939     memset (&sa, 0, sizeof (sa));
940     sa.sa_handler = sig_int_handler;
941     sigaction (SIGINT, &sa, NULL);
942
943     memset (&sa, 0, sizeof (sa));
944     sa.sa_handler = sig_term_handler;
945     sigaction (SIGINT, &sa, NULL);
946
947     memset (&sa, 0, sizeof (sa));
948     sa.sa_handler = SIG_IGN;
949     sigaction (SIGPIPE, &sa, NULL);
950   }
951
952   openlog ("rrdd", LOG_PID, LOG_DAEMON);
953
954   cache_tree = g_tree_new ((GCompareFunc) strcmp);
955   if (cache_tree == NULL)
956   {
957     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
958     return (-1);
959   }
960
961   memset (&queue_thread, 0, sizeof (queue_thread));
962   status = pthread_create (&queue_thread, /* attr = */ NULL,
963       queue_thread_main, /* args = */ NULL);
964   if (status != 0)
965   {
966     RRDD_LOG (LOG_ERR, "daemonize: pthread_create failed.");
967     return (-1);
968   }
969
970   return (0);
971 } /* }}} int daemonize */
972
973 static int cleanup (void) /* {{{ */
974 {
975   RRDD_LOG (LOG_DEBUG, "cleanup ()");
976
977   do_shutdown++;
978
979   RRDD_LOG (LOG_DEBUG, "cleanup: Joining queue_thread..");
980   pthread_cond_signal (&cache_cond);
981   pthread_join (queue_thread, /* return = */ NULL);
982   RRDD_LOG (LOG_DEBUG, "cleanup: done");
983
984   closelog ();
985
986   return (0);
987 } /* }}} int cleanup */
988
989 static int read_options (int argc, char **argv) /* {{{ */
990 {
991   int option;
992   int status = 0;
993
994   while ((option = getopt(argc, argv, "l:f:w:h?")) != -1)
995   {
996     switch (option)
997     {
998       case 'l':
999       {
1000         char **temp;
1001
1002         temp = (char **) realloc (config_listen_address_list,
1003             sizeof (char *) * (config_listen_address_list_len + 1));
1004         if (temp == NULL)
1005         {
1006           fprintf (stderr, "read_options: realloc failed.\n");
1007           return (2);
1008         }
1009         config_listen_address_list = temp;
1010
1011         temp[config_listen_address_list_len] = strdup (optarg);
1012         if (temp[config_listen_address_list_len] == NULL)
1013         {
1014           fprintf (stderr, "read_options: strdup failed.\n");
1015           return (2);
1016         }
1017         config_listen_address_list_len++;
1018       }
1019       break;
1020
1021       case 'f':
1022       {
1023         int temp;
1024
1025         temp = atoi (optarg);
1026         if (temp > 0)
1027           config_flush_interval = temp;
1028         else
1029         {
1030           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
1031           status = 3;
1032         }
1033       }
1034       break;
1035
1036       case 'w':
1037       {
1038         int temp;
1039
1040         temp = atoi (optarg);
1041         if (temp > 0)
1042           config_write_interval = temp;
1043         else
1044         {
1045           fprintf (stderr, "Invalid write interval: %s\n", optarg);
1046           status = 2;
1047         }
1048       }
1049       break;
1050
1051       case 'h':
1052       case '?':
1053         printf ("RRDd %s  Copyright (C) 2008 Florian octo Forster\n"
1054             "\n"
1055             "Usage: rrdd [options]\n"
1056             "\n"
1057             "Valid options are:\n"
1058             "  -l <address>  Socket address to listen to.\n"
1059             "  -w <seconds>  Interval in which to write data.\n"
1060             "  -f <seconds>  Interval in which to flush dead data.\n"
1061             "\n"
1062             "For more information and a detailed description of all options "
1063             "please refer\n"
1064             "to the rrdd(1) manual page.\n",
1065             VERSION);
1066         status = -1;
1067         break;
1068     } /* switch (option) */
1069   } /* while (getopt) */
1070
1071   return (status);
1072 } /* }}} int read_options */
1073
1074 int main (int argc, char **argv)
1075 {
1076   int status;
1077
1078   status = read_options (argc, argv);
1079   if (status != 0)
1080   {
1081     if (status < 0)
1082       status = 0;
1083     return (status);
1084   }
1085
1086   status = daemonize ();
1087   if (status == 1)
1088   {
1089     struct sigaction sigchld;
1090
1091     memset (&sigchld, 0, sizeof (sigchld));
1092     sigchld.sa_handler = SIG_IGN;
1093     sigaction (SIGCHLD, &sigchld, NULL);
1094
1095     return (0);
1096   }
1097   else if (status != 0)
1098   {
1099     fprintf (stderr, "daemonize failed, exiting.\n");
1100     return (1);
1101   }
1102
1103   listen_thread_main (NULL);
1104
1105   cleanup ();
1106
1107   return (0);
1108 } /* int main */
1109
1110 /*
1111  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
1112  */