src/rrdd.[ch]: Implemented flushing of dead values once in a while.
[rrdd.git] / src / rrdd.c
1 /**
2  * collectd - src/rrdd.c
3  * Copyright (C) 2008 Florian octo Forster
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; only version 2 of the License is applicable.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
17  *
18  * Authors:
19  *   Florian octo Forster <octo at verplant.org>
20  **/
21
22 #define RRDD_DEBUG 1
23
24 #include "rrdd.h"
25 #include <glib-2.0/glib.h>
26 #include <rrd.h>
27
28 #if RRDD_DEBUG
29 # define RRDD_LOG(severity, ...) do { fprintf (stderr, __VA_ARGS__); fprintf (stderr, "\n"); } while (0)
30 #else
31 # define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
32 #endif
33
34 /*
35  * Types
36  */
37 struct listen_socket_s
38 {
39   int fd;
40   char path[PATH_MAX + 1];
41 };
42 typedef struct listen_socket_s listen_socket_t;
43
44 struct cache_item_s;
45 typedef struct cache_item_s cache_item_t;
46 struct cache_item_s
47 {
48   char *file;
49   char **values;
50   int values_num;
51   time_t last_flush_time;
52 #define CI_FLAGS_IN_TREE  0x01
53 #define CI_FLAGS_IN_QUEUE 0x02
54   int flags;
55
56   cache_item_t *next;
57 };
58
59 /*
60  * Variables
61  */
62 static listen_socket_t *listen_fds = NULL;
63 static size_t listen_fds_num = 0;
64
65 static int do_shutdown = 0;
66
67 static pthread_t queue_thread;
68
69 static pthread_t *connetion_threads = NULL;
70 static pthread_mutex_t connetion_threads_lock = PTHREAD_MUTEX_INITIALIZER;
71 static int connetion_threads_num = 0;
72
73 /* Cache stuff */
74 static GTree          *cache_tree = NULL;
75 static cache_item_t   *cache_queue_head = NULL;
76 static cache_item_t   *cache_queue_tail = NULL;
77 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
78 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
79
80 static int config_write_interval = 300;
81 static int config_flush_interval = 3600;
82
83 static char **config_listen_address_list = NULL;
84 static int config_listen_address_list_len = 0;
85
86 /* 
87  * Functions
88  */
89 static void sig_int_handler (int signal) /* {{{ */
90 {
91   do_shutdown++;
92 } /* }}} void sig_int_handler */
93
94 static void sig_term_handler (int signal) /* {{{ */
95 {
96   do_shutdown++;
97 } /* }}} void sig_term_handler */
98
99 /*
100  * enqueue_cache_item:
101  * `cache_lock' must be acquired before calling this function!
102  */
103 static int enqueue_cache_item (cache_item_t *ci) /* {{{ */
104 {
105   RRDD_LOG (LOG_DEBUG, "handle_request_update: Adding %s to the update queue.",
106       ci->file);
107
108   if (ci == NULL)
109     return (-1);
110
111   if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
112     return (-1);
113
114   assert (ci->next == NULL);
115
116   if (cache_queue_tail == NULL)
117     cache_queue_head = ci;
118   else
119     cache_queue_tail->next = ci;
120   cache_queue_tail = ci;
121
122   return (0);
123 } /* }}} int enqueue_cache_item */
124
125 /*
126  * tree_callback_flush:
127  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
128  * while this is in progress.
129  */
130 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
131     gpointer data)
132 {
133   cache_item_t *ci;
134   time_t now;
135
136   ci = (cache_item_t *) value;
137   now = *((time_t *) data);
138
139   if (((now - ci->last_flush_time) >= config_write_interval)
140       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0))
141     enqueue_cache_item (ci);
142
143   return (TRUE);
144 } /* }}} gboolean tree_callback_flush */
145
146 static void *queue_thread_main (void *args) /* {{{ */
147 {
148   struct timeval now;
149   struct timespec next_flush;
150
151   gettimeofday (&now, NULL);
152   next_flush.tv_sec = now.tv_sec + config_flush_interval;
153   next_flush.tv_nsec = 1000 * now.tv_usec;
154
155   pthread_mutex_lock (&cache_lock);
156   while ((do_shutdown == 0) || (cache_queue_head != NULL))
157   {
158     cache_item_t *ci;
159     char *file;
160     char **values;
161     int values_num;
162     int status;
163     int i;
164
165     /* First, check if it's time to do the cache flush. */
166     gettimeofday (&now, NULL);
167     if ((now.tv_sec > next_flush.tv_sec)
168         || ((now.tv_sec == next_flush.tv_sec)
169           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
170     {
171       time_t time_now;
172
173       /* Pass the current time as user data so that we don't need to call
174        * `time' for each node. */
175       time_now = time (NULL);
176
177       g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &time_now);
178
179       /* Determine the time of the next cache flush. */
180       while (next_flush.tv_sec < now.tv_sec)
181         next_flush.tv_sec += config_flush_interval;
182     }
183
184     /* Now, check if there's something to store away. If not, wait until
185      * something comes in or it's time to do the cache flush. */
186     if (cache_queue_head == NULL)
187     {
188       struct timespec timeout;
189
190       timeout.tv_sec = next_flush.tv_sec - now.tv_sec;
191       if (next_flush.tv_nsec < (1000 * now.tv_usec))
192       {
193         timeout.tv_sec--;
194         timeout.tv_nsec = 1000000000 + next_flush.tv_nsec
195           - (1000 * now.tv_usec);
196       }
197       else
198       {
199         timeout.tv_nsec = next_flush.tv_nsec - (1000 * now.tv_usec);
200       }
201
202       pthread_cond_timedwait (&cache_cond, &cache_lock, &timeout);
203     }
204
205     /* Check if a value has arrived. This may be NULL if we timed out or there
206      * was an interrupt such as a signal. */
207     if (cache_queue_head == NULL)
208       continue;
209
210     ci = cache_queue_head;
211
212     /* copy the relevant parts */
213     file = strdup (ci->file);
214     if (file == NULL)
215     {
216       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
217       continue;
218     }
219
220     values = ci->values;
221     values_num = ci->values_num;
222
223     ci->values = NULL;
224     ci->values_num = 0;
225
226     ci->last_flush_time = time (NULL);
227     ci->flags &= ~(CI_FLAGS_IN_QUEUE);
228
229     cache_queue_head = ci->next;
230     if (cache_queue_head == NULL)
231       cache_queue_tail = NULL;
232     ci->next = NULL;
233
234     pthread_mutex_unlock (&cache_lock);
235
236     RRDD_LOG (LOG_DEBUG, "queue_thread_main: rrd_update (%s, %i, %p)",
237         file, values_num, (void *) values);
238
239     status = rrd_update_r (file, NULL, values_num, (void *) values);
240     if (status != 0)
241     {
242       RRDD_LOG (LOG_ERR, "queue_thread_main: "
243           "rrd_update_r failed with status %i.",
244           status);
245     }
246
247     free (file);
248     for (i = 0; i < values_num; i++)
249       free (values[i]);
250
251     pthread_mutex_lock (&cache_lock);
252   } /* while (do_shutdown == 0) */
253   pthread_mutex_unlock (&cache_lock);
254
255   RRDD_LOG (LOG_DEBUG, "queue_thread_main: Exiting.");
256
257   return (NULL);
258 } /* }}} void *queue_thread_main */
259
260 static int handle_request_update (int fd, /* {{{ */
261     char *buffer, int buffer_size)
262 {
263   char *file;
264   char *value;
265   char *buffer_ptr;
266   int values_num = 0;
267   int status;
268
269   time_t now;
270
271   cache_item_t *ci;
272   char answer[4096];
273
274   now = time (NULL);
275
276   RRDD_LOG (LOG_DEBUG, "handle_request_update (%i, %p, %i)",
277       fd, (void *) buffer, buffer_size);
278
279   buffer_ptr = buffer;
280
281   file = buffer_ptr;
282   buffer_ptr += strlen (file) + 1;
283
284   pthread_mutex_lock (&cache_lock);
285
286   ci = g_tree_lookup (cache_tree, file);
287   if (ci == NULL)
288   {
289     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
290     if (ci == NULL)
291     {
292       pthread_mutex_unlock (&cache_lock);
293       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
294       return (-1);
295     }
296     memset (ci, 0, sizeof (cache_item_t));
297
298     ci->file = strdup (file);
299     if (ci->file == NULL)
300     {
301       pthread_mutex_unlock (&cache_lock);
302       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
303       free (ci);
304       return (-1);
305     }
306
307     ci->values = NULL;
308     ci->values_num = 0;
309     ci->last_flush_time = now;
310     ci->flags = CI_FLAGS_IN_TREE;
311
312     g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
313
314     RRDD_LOG (LOG_DEBUG, "handle_request_update: Created new tree node %s.",
315         ci->file);
316   }
317   assert (ci != NULL);
318
319   while (*buffer_ptr != 0)
320   {
321     char **temp;
322
323     value = buffer_ptr;
324     buffer_ptr += strlen (value) + 1;
325
326     temp = (char **) realloc (ci->values,
327         sizeof (char *) * (ci->values_num + 1));
328     if (temp == NULL)
329     {
330       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
331       continue;
332     }
333     ci->values = temp;
334
335     ci->values[ci->values_num] = strdup (value);
336     if (ci->values[ci->values_num] == NULL)
337     {
338       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
339       continue;
340     }
341     ci->values_num++;
342
343     values_num++;
344   }
345
346   if (((now - ci->last_flush_time) >= config_write_interval)
347       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0))
348   {
349     enqueue_cache_item (ci);
350     pthread_cond_signal (&cache_cond);
351   }
352
353   pthread_mutex_unlock (&cache_lock);
354
355   snprintf (answer, sizeof (answer), "0 Enqueued %i value(s)\n", values_num);
356   answer[sizeof (answer) - 1] = 0;
357
358   status = write (fd, answer, sizeof (answer));
359   if (status < 0)
360   {
361     status = errno;
362     RRDD_LOG (LOG_INFO, "handle_request_update: write(2) returned an error.");
363     return (status);
364   }
365
366   return (0);
367 } /* }}} int handle_request_update */
368
369 static int handle_request (int fd) /* {{{ */
370 {
371   char buffer[4096];
372   int buffer_size;
373
374   RRDD_LOG (LOG_DEBUG, "handle_request (%i)", fd);
375
376   buffer_size = read (fd, buffer, sizeof (buffer));
377   if (buffer_size < 1)
378   {
379     RRDD_LOG (LOG_ERR, "handle_request: read(2) failed.");
380     return (-1);
381   }
382   assert (((size_t) buffer_size) <= sizeof (buffer));
383
384   if ((buffer[buffer_size - 2] != 0)
385       || (buffer[buffer_size - 1] != 0))
386   {
387     RRDD_LOG (LOG_INFO, "handle_request: malformed request.");
388     return (-1);
389   }
390
391   /* fields in the buffer a separated by null bytes. */
392   if (strcmp (buffer, "update") == 0)
393   {
394     int offset = strlen ("update") + 1;
395     return (handle_request_update (fd, buffer + offset,
396           buffer_size - offset));
397   }
398   else
399   {
400     RRDD_LOG (LOG_INFO, "handle_request: unknown command: %s.", buffer);
401     return (-1);
402   }
403 } /* }}} int handle_request */
404
405 static void *connection_thread_main (void *args) /* {{{ */
406 {
407   pthread_t self;
408   int i;
409   int fd;
410   
411   fd = *((int *) args);
412
413   RRDD_LOG (LOG_DEBUG, "connection_thread_main: Adding myself to "
414       "connetion_threads[]..");
415   pthread_mutex_lock (&connetion_threads_lock);
416   {
417     pthread_t *temp;
418
419     temp = (pthread_t *) realloc (connetion_threads,
420         sizeof (pthread_t) * (connetion_threads_num + 1));
421     if (temp == NULL)
422     {
423       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
424     }
425     else
426     {
427       connetion_threads = temp;
428       connetion_threads[connetion_threads_num] = pthread_self ();
429       connetion_threads_num++;
430     }
431   }
432   pthread_mutex_unlock (&connetion_threads_lock);
433   RRDD_LOG (LOG_DEBUG, "connection_thread_main: done");
434
435   while (do_shutdown == 0)
436   {
437     struct pollfd pollfd;
438     int status;
439
440     pollfd.fd = fd;
441     pollfd.events = POLLIN | POLLPRI;
442     pollfd.revents = 0;
443
444     status = poll (&pollfd, 1, /* timeout = */ 500);
445     if (status == 0) /* timeout */
446       continue;
447     else if (status < 0) /* error */
448     {
449       status = errno;
450       if (status == EINTR)
451         continue;
452       RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
453       continue;
454     }
455
456     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
457     {
458       RRDD_LOG (LOG_DEBUG, "connection_thread_main: "
459           "poll(2) returned POLLHUP.");
460       close (fd);
461       break;
462     }
463     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
464     {
465       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
466           "poll(2) returned something unexpected: %#04hx",
467           pollfd.revents);
468       close (fd);
469       break;
470     }
471
472     status = handle_request (fd);
473     if (status != 0)
474     {
475       close (fd);
476       break;
477     }
478   }
479
480   self = pthread_self ();
481   /* Remove this thread from the connection threads list */
482   pthread_mutex_lock (&connetion_threads_lock);
483   /* Find out own index in the array */
484   for (i = 0; i < connetion_threads_num; i++)
485     if (pthread_equal (connetion_threads[i], self) != 0)
486       break;
487   assert (i < connetion_threads_num);
488
489   /* Move the trailing threads forward. */
490   if (i < (connetion_threads_num - 1))
491   {
492     memmove (connetion_threads + i,
493         connetion_threads + i + 1,
494         sizeof (pthread_t) * (connetion_threads_num - i - 1));
495   }
496
497   connetion_threads_num--;
498   pthread_mutex_unlock (&connetion_threads_lock);
499
500   free (args);
501   return (NULL);
502 } /* }}} void *connection_thread_main */
503
504 static int open_listen_socket_unix (const char *path) /* {{{ */
505 {
506   int fd;
507   struct sockaddr_un sa;
508   listen_socket_t *temp;
509   int status;
510
511   temp = (listen_socket_t *) realloc (listen_fds,
512       sizeof (listen_fds[0]) * (listen_fds_num + 1));
513   if (temp == NULL)
514   {
515     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
516     return (-1);
517   }
518   listen_fds = temp;
519   memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
520
521   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
522   if (fd < 0)
523   {
524     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
525     return (-1);
526   }
527
528   memset (&sa, 0, sizeof (sa));
529   sa.sun_family = AF_UNIX;
530   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
531
532   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
533   if (status != 0)
534   {
535     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
536     close (fd);
537     unlink (path);
538     return (-1);
539   }
540
541   status = listen (fd, /* backlog = */ 10);
542   if (status != 0)
543   {
544     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
545     close (fd);
546     unlink (path);
547     return (-1);
548   }
549   
550   listen_fds[listen_fds_num].fd = fd;
551   snprintf (listen_fds[listen_fds_num].path,
552       sizeof (listen_fds[listen_fds_num].path) - 1,
553       "unix:%s", path);
554   listen_fds_num++;
555
556   return (0);
557 } /* }}} int open_listen_socket_unix */
558
559 static int open_listen_socket (const char *addr) /* {{{ */
560 {
561   struct addrinfo ai_hints;
562   struct addrinfo *ai_res;
563   struct addrinfo *ai_ptr;
564   int status;
565
566   assert (addr != NULL);
567
568   if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
569     return (open_listen_socket_unix (addr + strlen ("unix:")));
570   else if (addr[0] == '/')
571     return (open_listen_socket_unix (addr));
572
573   memset (&ai_hints, 0, sizeof (ai_hints));
574   ai_hints.ai_flags = 0;
575 #ifdef AI_ADDRCONFIG
576   ai_hints.ai_flags |= AI_ADDRCONFIG;
577 #endif
578   ai_hints.ai_family = AF_UNSPEC;
579   ai_hints.ai_socktype = SOCK_STREAM;
580
581   ai_res = NULL;
582   status = getaddrinfo (addr, DEFAULT_PORT, &ai_hints, &ai_res);
583   if (status != 0)
584   {
585     RRDD_LOG (LOG_ERR, "open_listen_socket: getaddrinfo(%s) failed: "
586         "%s", addr, gai_strerror (status));
587     return (-1);
588   }
589
590   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
591   {
592     int fd;
593     listen_socket_t *temp;
594
595     temp = (listen_socket_t *) realloc (listen_fds,
596         sizeof (listen_fds[0]) * (listen_fds_num + 1));
597     if (temp == NULL)
598     {
599       RRDD_LOG (LOG_ERR, "open_listen_socket: realloc failed.");
600       continue;
601     }
602     listen_fds = temp;
603     memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
604
605     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
606     if (fd < 0)
607     {
608       RRDD_LOG (LOG_ERR, "open_listen_socket: socket(2) failed.");
609       continue;
610     }
611
612     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
613     if (status != 0)
614     {
615       RRDD_LOG (LOG_ERR, "open_listen_socket: bind(2) failed.");
616       close (fd);
617       continue;
618     }
619
620     status = listen (fd, /* backlog = */ 10);
621     if (status != 0)
622     {
623       RRDD_LOG (LOG_ERR, "open_listen_socket: listen(2) failed.");
624       close (fd);
625       return (-1);
626     }
627
628     listen_fds[listen_fds_num].fd = fd;
629     strncpy (listen_fds[listen_fds_num].path, addr,
630         sizeof (listen_fds[listen_fds_num].path) - 1);
631     listen_fds_num++;
632   } /* for (ai_ptr) */
633
634   return (0);
635 } /* }}} int open_listen_socket */
636
637 static int close_listen_sockets (void) /* {{{ */
638 {
639   size_t i;
640
641   for (i = 0; i < listen_fds_num; i++)
642   {
643     close (listen_fds[i].fd);
644     if (strncmp ("unix:", listen_fds[i].path, strlen ("unix:")) == 0)
645       unlink (listen_fds[i].path + strlen ("unix:"));
646   }
647
648   free (listen_fds);
649   listen_fds = NULL;
650   listen_fds_num = 0;
651
652   return (0);
653 } /* }}} int close_listen_sockets */
654
655 static void *listen_thread_main (void *args) /* {{{ */
656 {
657   struct pollfd *pollfds;
658   int pollfds_num;
659   int status;
660   int i;
661
662   for (i = 0; i < config_listen_address_list_len; i++)
663   {
664     RRDD_LOG (LOG_DEBUG, "listen_thread_main: config_listen_address_list[%i] "
665         "= %s", i, config_listen_address_list[i]);
666     open_listen_socket (config_listen_address_list[i]);
667   }
668
669   if (config_listen_address_list_len < 1)
670     open_listen_socket (RRDD_SOCK_PATH);
671
672   if (listen_fds_num < 1)
673   {
674     RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
675         "could be opened. Sorry.");
676     return (NULL);
677   }
678
679   pollfds_num = listen_fds_num;
680   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
681   if (pollfds == NULL)
682   {
683     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
684     return (NULL);
685   }
686   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
687
688   while (do_shutdown == 0)
689   {
690     assert (pollfds_num == listen_fds_num);
691     for (i = 0; i < pollfds_num; i++)
692     {
693       pollfds[i].fd = listen_fds[i].fd;
694       pollfds[i].events = POLLIN | POLLPRI;
695       pollfds[i].revents = 0;
696     }
697
698     status = poll (pollfds, pollfds_num, /* timeout = */ -1);
699     if (status < 1)
700     {
701       status = errno;
702       if (status != EINTR)
703       {
704         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
705       }
706       continue;
707     }
708
709     for (i = 0; i < pollfds_num; i++)
710     {
711       int *client_sd;
712       struct sockaddr_storage client_sa;
713       socklen_t client_sa_size;
714       pthread_t tid;
715
716       if (pollfds[i].revents == 0)
717         continue;
718
719       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
720       {
721         RRDD_LOG (LOG_ERR, "listen_thread_main: "
722             "poll(2) returned something unexpected for listen FD #%i.",
723             pollfds[i].fd);
724         continue;
725       }
726
727       client_sd = (int *) malloc (sizeof (int));
728       if (client_sd == NULL)
729       {
730         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
731         continue;
732       }
733
734       client_sa_size = sizeof (client_sa);
735       *client_sd = accept (pollfds[i].fd,
736           (struct sockaddr *) &client_sa, &client_sa_size);
737       if (*client_sd < 0)
738       {
739         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
740         continue;
741       }
742
743       RRDD_LOG (LOG_DEBUG, "listen_thread_main: accept(2) returned fd #%i.",
744           *client_sd);
745
746       status = pthread_create (&tid, /* attr = */ NULL, connection_thread_main,
747           /* args = */ (void *) client_sd);
748       if (status != 0)
749       {
750         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
751         close (*client_sd);
752         free (client_sd);
753         continue;
754       }
755
756       RRDD_LOG (LOG_DEBUG, "listen_thread_main: pthread_create succeeded: "
757           "tid = %lu",
758           *((unsigned long *) &tid));
759     } /* for (pollfds_num) */
760   } /* while (do_shutdown == 0) */
761
762   close_listen_sockets ();
763
764   pthread_mutex_lock (&connetion_threads_lock);
765   while (connetion_threads_num > 0)
766   {
767     pthread_t wait_for;
768
769     wait_for = connetion_threads[0];
770
771     pthread_mutex_unlock (&connetion_threads_lock);
772     pthread_join (wait_for, /* retval = */ NULL);
773     pthread_mutex_lock (&connetion_threads_lock);
774   }
775   pthread_mutex_unlock (&connetion_threads_lock);
776
777   RRDD_LOG (LOG_DEBUG, "listen_thread_main: Exiting.");
778
779   return (NULL);
780 } /* }}} void *listen_thread_main */
781
782 static int daemonize (void) /* {{{ */
783 {
784 #if !RRDD_DEBUG
785   pid_t child;
786 #endif
787   int status;
788
789 #if !RRDD_DEBUG
790   child = fork ();
791   if (child < 0)
792   {
793     fprintf (stderr, "daemonize: fork(2) failed.\n");
794     return (-1);
795   }
796   else if (child > 0)
797   {
798     return (1);
799   }
800
801   /* Change into the /tmp directory. */
802   chdir ("/tmp");
803
804   /* Become session leader */
805   setsid ();
806
807   /* Open the first three file descriptors to /dev/null */
808   close (2);
809   close (1);
810   close (0);
811
812   open ("/dev/null", O_RDWR);
813   dup (0);
814   dup (0);
815 #endif /* RRDD_DEBUG */
816
817   {
818     struct sigaction sa;
819
820     memset (&sa, 0, sizeof (sa));
821     sa.sa_handler = sig_int_handler;
822     sigaction (SIGINT, &sa, NULL);
823
824     memset (&sa, 0, sizeof (sa));
825     sa.sa_handler = sig_term_handler;
826     sigaction (SIGINT, &sa, NULL);
827
828     memset (&sa, 0, sizeof (sa));
829     sa.sa_handler = SIG_IGN;
830     sigaction (SIGPIPE, &sa, NULL);
831   }
832
833   openlog ("rrdd", LOG_PID, LOG_DAEMON);
834
835   cache_tree = g_tree_new ((GCompareFunc) strcmp);
836   if (cache_tree == NULL)
837   {
838     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
839     return (-1);
840   }
841
842   memset (&queue_thread, 0, sizeof (queue_thread));
843   status = pthread_create (&queue_thread, /* attr = */ NULL,
844       queue_thread_main, /* args = */ NULL);
845   if (status != 0)
846   {
847     RRDD_LOG (LOG_ERR, "daemonize: pthread_create failed.");
848     return (-1);
849   }
850
851   return (0);
852 } /* }}} int daemonize */
853
854 static int cleanup (void) /* {{{ */
855 {
856   RRDD_LOG (LOG_DEBUG, "cleanup ()");
857
858   do_shutdown++;
859
860   RRDD_LOG (LOG_DEBUG, "cleanup: Joining queue_thread..");
861   pthread_cond_signal (&cache_cond);
862   pthread_join (queue_thread, /* return = */ NULL);
863   RRDD_LOG (LOG_DEBUG, "cleanup: done");
864
865   closelog ();
866
867   return (0);
868 } /* }}} int cleanup */
869
870 static int read_options (int argc, char **argv) /* {{{ */
871 {
872   int option;
873   int status = 0;
874
875   while ((option = getopt(argc, argv, "l:f:w:h?")) != -1)
876   {
877     switch (option)
878     {
879       case 'l':
880       {
881         char **temp;
882
883         temp = (char **) realloc (config_listen_address_list,
884             sizeof (char *) * (config_listen_address_list_len + 1));
885         if (temp == NULL)
886         {
887           fprintf (stderr, "read_options: realloc failed.\n");
888           return (2);
889         }
890         config_listen_address_list = temp;
891
892         temp[config_listen_address_list_len] = strdup (optarg);
893         if (temp[config_listen_address_list_len] == NULL)
894         {
895           fprintf (stderr, "read_options: strdup failed.\n");
896           return (2);
897         }
898         config_listen_address_list_len++;
899       }
900       break;
901
902       case 'f':
903       {
904         int temp;
905
906         temp = atoi (optarg);
907         if (temp > 0)
908           config_flush_interval = temp;
909         else
910         {
911           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
912           status = 3;
913         }
914       }
915       break;
916
917       case 'w':
918       {
919         int temp;
920
921         temp = atoi (optarg);
922         if (temp > 0)
923           config_write_interval = temp;
924         else
925         {
926           fprintf (stderr, "Invalid write interval: %s\n", optarg);
927           status = 2;
928         }
929       }
930       break;
931
932       case 'h':
933       case '?':
934         printf ("RRDd %s  Copyright (C) 2008 Florian octo Forster\n"
935             "\n"
936             "Usage: rrdd [options]\n"
937             "\n"
938             "Valid options are:\n"
939             "  -l <address>  Socket address to listen to.\n"
940             "  -w <seconds>  Interval in which to write data.\n"
941             "  -f <seconds>  Interval in which to flush dead data.\n"
942             "\n"
943             "For more information and a detailed description of all options "
944             "please refer\n"
945             "to the rrdd(1) manual page.\n",
946             PACKAGE_VERSION);
947         status = -1;
948         break;
949     } /* switch (option) */
950   } /* while (getopt) */
951
952   return (status);
953 } /* }}} int read_options */
954
955 int main (int argc, char **argv)
956 {
957   int status;
958
959   status = read_options (argc, argv);
960   if (status != 0)
961   {
962     if (status < 0)
963       status = 0;
964     return (status);
965   }
966
967   status = daemonize ();
968   if (status == 1)
969   {
970     struct sigaction sigchld;
971
972     memset (&sigchld, 0, sizeof (sigchld));
973     sigchld.sa_handler = SIG_IGN;
974     sigaction (SIGCHLD, &sigchld, NULL);
975
976     return (0);
977   }
978   else if (status != 0)
979   {
980     fprintf (stderr, "daemonize failed, exiting.\n");
981     return (1);
982   }
983
984   listen_thread_main (NULL);
985
986   cleanup ();
987
988   return (0);
989 } /* int main */
990
991 /*
992  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
993  */