src/rrd_daemon.c: pthread_cond_timedwait expects an _absolute_ time, d'oh!
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; only version 2 of the License is applicable.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
17  *
18  * Authors:
19  *   Florian octo Forster <octo at verplant.org>
20  **/
21
22 /*
23  * First tell the compiler to stick to the C99 and POSIX standards as close as
24  * possible.
25  */
26 #ifndef __STRICT_ANSI__ /* {{{ */
27 # define __STRICT_ANSI__
28 #endif
29
30 #ifndef _ISOC99_SOURCE
31 # define _ISOC99_SOURCE
32 #endif
33
34 #ifdef _POSIX_C_SOURCE
35 # undef _POSIX_C_SOURCE
36 #endif
37 #define _POSIX_C_SOURCE 200112L
38
39 /* Single UNIX needed for strdup. */
40 #ifdef _XOPEN_SOURCE
41 # undef _XOPEN_SOURCE
42 #endif
43 #define _XOPEN_SOURCE 500
44
45 #ifndef _REENTRANT
46 # define _REENTRANT
47 #endif
48
49 #ifndef _THREAD_SAFE
50 # define _THREAD_SAFE
51 #endif
52
53 #ifdef _GNU_SOURCE
54 # undef _GNU_SOURCE
55 #endif
56 /* }}} */
57
58 /*
59  * Now for some includes..
60  */
61 #include "rrd.h" /* {{{ */
62 #include "rrd_client.h"
63
64 #include <stdlib.h>
65 #include <stdint.h>
66 #include <stdio.h>
67 #include <unistd.h>
68 #include <string.h>
69
70 #include <sys/types.h>
71 #include <sys/stat.h>
72 #include <fcntl.h>
73 #include <signal.h>
74 #include <sys/socket.h>
75 #include <sys/un.h>
76 #include <netdb.h>
77 #include <poll.h>
78 #include <syslog.h>
79 #include <pthread.h>
80 #include <errno.h>
81 #include <assert.h>
82 #include <sys/time.h>
83 #include <time.h>
84
85 #include <glib-2.0/glib.h>
86 /* }}} */
87
88 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
89
90 #ifndef __GNUC__
91 # define __attribute__(x) /**/
92 #endif
93
94 #if 0
95 /* FIXME: I don't want to declare strdup myself! */
96 extern char *strdup (const char *s);
97 #endif
98
99 /*
100  * Types
101  */
102 struct listen_socket_s
103 {
104   int fd;
105   char path[PATH_MAX + 1];
106 };
107 typedef struct listen_socket_s listen_socket_t;
108
109 struct cache_item_s;
110 typedef struct cache_item_s cache_item_t;
111 struct cache_item_s
112 {
113   char *file;
114   char **values;
115   int values_num;
116   time_t last_flush_time;
117 #define CI_FLAGS_IN_TREE  0x01
118 #define CI_FLAGS_IN_QUEUE 0x02
119   int flags;
120
121   cache_item_t *next;
122 };
123
124 /*
125  * Variables
126  */
127 static listen_socket_t *listen_fds = NULL;
128 static size_t listen_fds_num = 0;
129
130 static int do_shutdown = 0;
131
132 static pthread_t queue_thread;
133
134 static pthread_t *connetion_threads = NULL;
135 static pthread_mutex_t connetion_threads_lock = PTHREAD_MUTEX_INITIALIZER;
136 static int connetion_threads_num = 0;
137
138 /* Cache stuff */
139 static GTree          *cache_tree = NULL;
140 static cache_item_t   *cache_queue_head = NULL;
141 static cache_item_t   *cache_queue_tail = NULL;
142 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
143 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
144
145 static int config_write_interval = 300;
146 static int config_flush_interval = 3600;
147
148 static char **config_listen_address_list = NULL;
149 static int config_listen_address_list_len = 0;
150
151 /* 
152  * Functions
153  */
154 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
155 {
156   do_shutdown++;
157 } /* }}} void sig_int_handler */
158
159 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
160 {
161   do_shutdown++;
162 } /* }}} void sig_term_handler */
163
164 /*
165  * enqueue_cache_item:
166  * `cache_lock' must be acquired before calling this function!
167  */
168 static int enqueue_cache_item (cache_item_t *ci) /* {{{ */
169 {
170   RRDD_LOG (LOG_DEBUG, "handle_request_update: Adding %s to the update queue.",
171       ci->file);
172
173   if (ci == NULL)
174     return (-1);
175
176   if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
177     return (-1);
178
179   assert (ci->next == NULL);
180
181   if (cache_queue_tail == NULL)
182     cache_queue_head = ci;
183   else
184     cache_queue_tail->next = ci;
185   cache_queue_tail = ci;
186
187   return (0);
188 } /* }}} int enqueue_cache_item */
189
190 /*
191  * tree_callback_flush:
192  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
193  * while this is in progress.
194  */
195 static gboolean tree_callback_flush (gpointer key /* {{{ */
196     __attribute__((unused)), gpointer value, gpointer data)
197 {
198   cache_item_t *ci;
199   time_t now;
200
201   key = NULL; /* make compiler happy */
202
203   ci = (cache_item_t *) value;
204   now = *((time_t *) data);
205
206   if (((now - ci->last_flush_time) >= config_write_interval)
207       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0))
208     enqueue_cache_item (ci);
209
210   return (TRUE);
211 } /* }}} gboolean tree_callback_flush */
212
213 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
214 {
215   struct timeval now;
216   struct timespec next_flush;
217
218   gettimeofday (&now, NULL);
219   next_flush.tv_sec = now.tv_sec + config_flush_interval;
220   next_flush.tv_nsec = 1000 * now.tv_usec;
221
222   pthread_mutex_lock (&cache_lock);
223   while ((do_shutdown == 0) || (cache_queue_head != NULL))
224   {
225     cache_item_t *ci;
226     char *file;
227     char **values;
228     int values_num;
229     int status;
230     int i;
231
232     /* First, check if it's time to do the cache flush. */
233     gettimeofday (&now, NULL);
234     if ((now.tv_sec > next_flush.tv_sec)
235         || ((now.tv_sec == next_flush.tv_sec)
236           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
237     {
238       time_t time_now;
239
240       /* Pass the current time as user data so that we don't need to call
241        * `time' for each node. */
242       time_now = time (NULL);
243
244       g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &time_now);
245
246       /* Determine the time of the next cache flush. */
247       while (next_flush.tv_sec < now.tv_sec)
248         next_flush.tv_sec += config_flush_interval;
249     }
250
251     /* Now, check if there's something to store away. If not, wait until
252      * something comes in or it's time to do the cache flush. */
253     if (cache_queue_head == NULL)
254     {
255       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
256       if ((status != 0) && (status != ETIMEDOUT))
257       {
258         RRDD_LOG (LOG_ERR, "queue_thread_main: "
259             "pthread_cond_timedwait returned %i.", status);
260       }
261     }
262
263     /* Check if a value has arrived. This may be NULL if we timed out or there
264      * was an interrupt such as a signal. */
265     if (cache_queue_head == NULL)
266       continue;
267
268     ci = cache_queue_head;
269
270     /* copy the relevant parts */
271     file = strdup (ci->file);
272     if (file == NULL)
273     {
274       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
275       continue;
276     }
277
278     values = ci->values;
279     values_num = ci->values_num;
280
281     ci->values = NULL;
282     ci->values_num = 0;
283
284     ci->last_flush_time = time (NULL);
285     ci->flags &= ~(CI_FLAGS_IN_QUEUE);
286
287     cache_queue_head = ci->next;
288     if (cache_queue_head == NULL)
289       cache_queue_tail = NULL;
290     ci->next = NULL;
291
292     pthread_mutex_unlock (&cache_lock);
293
294     RRDD_LOG (LOG_DEBUG, "queue_thread_main: rrd_update (%s, %i, %p)",
295         file, values_num, (void *) values);
296
297     status = rrd_update_r (file, NULL, values_num, (void *) values);
298     if (status != 0)
299     {
300       RRDD_LOG (LOG_ERR, "queue_thread_main: "
301           "rrd_update_r failed with status %i.",
302           status);
303     }
304
305     free (file);
306     for (i = 0; i < values_num; i++)
307       free (values[i]);
308
309     pthread_mutex_lock (&cache_lock);
310   } /* while (do_shutdown == 0) */
311   pthread_mutex_unlock (&cache_lock);
312
313   RRDD_LOG (LOG_DEBUG, "queue_thread_main: Exiting.");
314
315   return (NULL);
316 } /* }}} void *queue_thread_main */
317
318 static int handle_request_update (int fd, /* {{{ */
319     char *buffer, int buffer_size)
320 {
321   char *file;
322   char *value;
323   char *buffer_ptr;
324   int values_num = 0;
325   int status;
326
327   time_t now;
328
329   cache_item_t *ci;
330   char answer[4096];
331
332   now = time (NULL);
333
334   RRDD_LOG (LOG_DEBUG, "handle_request_update (%i, %p, %i)",
335       fd, (void *) buffer, buffer_size);
336
337   buffer_ptr = buffer;
338
339   file = buffer_ptr;
340   buffer_ptr += strlen (file) + 1;
341
342   pthread_mutex_lock (&cache_lock);
343
344   ci = g_tree_lookup (cache_tree, file);
345   if (ci == NULL)
346   {
347     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
348     if (ci == NULL)
349     {
350       pthread_mutex_unlock (&cache_lock);
351       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
352       return (-1);
353     }
354     memset (ci, 0, sizeof (cache_item_t));
355
356     ci->file = strdup (file);
357     if (ci->file == NULL)
358     {
359       pthread_mutex_unlock (&cache_lock);
360       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
361       free (ci);
362       return (-1);
363     }
364
365     ci->values = NULL;
366     ci->values_num = 0;
367     ci->last_flush_time = now;
368     ci->flags = CI_FLAGS_IN_TREE;
369
370     g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
371
372     RRDD_LOG (LOG_DEBUG, "handle_request_update: Created new tree node %s.",
373         ci->file);
374   }
375   assert (ci != NULL);
376
377   while (*buffer_ptr != 0)
378   {
379     char **temp;
380
381     value = buffer_ptr;
382     buffer_ptr += strlen (value) + 1;
383
384     temp = (char **) realloc (ci->values,
385         sizeof (char *) * (ci->values_num + 1));
386     if (temp == NULL)
387     {
388       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
389       continue;
390     }
391     ci->values = temp;
392
393     ci->values[ci->values_num] = strdup (value);
394     if (ci->values[ci->values_num] == NULL)
395     {
396       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
397       continue;
398     }
399     ci->values_num++;
400
401     values_num++;
402   }
403
404   if (((now - ci->last_flush_time) >= config_write_interval)
405       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0))
406   {
407     enqueue_cache_item (ci);
408     pthread_cond_signal (&cache_cond);
409   }
410
411   pthread_mutex_unlock (&cache_lock);
412
413   snprintf (answer, sizeof (answer), "0 Enqueued %i value(s)\n", values_num);
414   answer[sizeof (answer) - 1] = 0;
415
416   status = write (fd, answer, sizeof (answer));
417   if (status < 0)
418   {
419     status = errno;
420     RRDD_LOG (LOG_INFO, "handle_request_update: write(2) returned an error.");
421     return (status);
422   }
423
424   return (0);
425 } /* }}} int handle_request_update */
426
427 static int handle_request (int fd) /* {{{ */
428 {
429   char buffer[4096];
430   int buffer_size;
431
432   RRDD_LOG (LOG_DEBUG, "handle_request (%i)", fd);
433
434   buffer_size = read (fd, buffer, sizeof (buffer));
435   if (buffer_size < 1)
436   {
437     RRDD_LOG (LOG_ERR, "handle_request: read(2) failed.");
438     return (-1);
439   }
440   assert (((size_t) buffer_size) <= sizeof (buffer));
441
442   if ((buffer[buffer_size - 2] != 0)
443       || (buffer[buffer_size - 1] != 0))
444   {
445     RRDD_LOG (LOG_INFO, "handle_request: malformed request.");
446     return (-1);
447   }
448
449   /* fields in the buffer a separated by null bytes. */
450   if (strcmp (buffer, "update") == 0)
451   {
452     int offset = strlen ("update") + 1;
453     return (handle_request_update (fd, buffer + offset,
454           buffer_size - offset));
455   }
456   else
457   {
458     RRDD_LOG (LOG_INFO, "handle_request: unknown command: %s.", buffer);
459     return (-1);
460   }
461 } /* }}} int handle_request */
462
463 static void *connection_thread_main (void *args /* {{{ */
464     __attribute__((unused)))
465 {
466   pthread_t self;
467   int i;
468   int fd;
469   
470   fd = *((int *) args);
471
472   RRDD_LOG (LOG_DEBUG, "connection_thread_main: Adding myself to "
473       "connetion_threads[]..");
474   pthread_mutex_lock (&connetion_threads_lock);
475   {
476     pthread_t *temp;
477
478     temp = (pthread_t *) realloc (connetion_threads,
479         sizeof (pthread_t) * (connetion_threads_num + 1));
480     if (temp == NULL)
481     {
482       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
483     }
484     else
485     {
486       connetion_threads = temp;
487       connetion_threads[connetion_threads_num] = pthread_self ();
488       connetion_threads_num++;
489     }
490   }
491   pthread_mutex_unlock (&connetion_threads_lock);
492   RRDD_LOG (LOG_DEBUG, "connection_thread_main: done");
493
494   while (do_shutdown == 0)
495   {
496     struct pollfd pollfd;
497     int status;
498
499     pollfd.fd = fd;
500     pollfd.events = POLLIN | POLLPRI;
501     pollfd.revents = 0;
502
503     status = poll (&pollfd, 1, /* timeout = */ 500);
504     if (status == 0) /* timeout */
505       continue;
506     else if (status < 0) /* error */
507     {
508       status = errno;
509       if (status == EINTR)
510         continue;
511       RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
512       continue;
513     }
514
515     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
516     {
517       RRDD_LOG (LOG_DEBUG, "connection_thread_main: "
518           "poll(2) returned POLLHUP.");
519       close (fd);
520       break;
521     }
522     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
523     {
524       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
525           "poll(2) returned something unexpected: %#04hx",
526           pollfd.revents);
527       close (fd);
528       break;
529     }
530
531     status = handle_request (fd);
532     if (status != 0)
533     {
534       close (fd);
535       break;
536     }
537   }
538
539   self = pthread_self ();
540   /* Remove this thread from the connection threads list */
541   pthread_mutex_lock (&connetion_threads_lock);
542   /* Find out own index in the array */
543   for (i = 0; i < connetion_threads_num; i++)
544     if (pthread_equal (connetion_threads[i], self) != 0)
545       break;
546   assert (i < connetion_threads_num);
547
548   /* Move the trailing threads forward. */
549   if (i < (connetion_threads_num - 1))
550   {
551     memmove (connetion_threads + i,
552         connetion_threads + i + 1,
553         sizeof (pthread_t) * (connetion_threads_num - i - 1));
554   }
555
556   connetion_threads_num--;
557   pthread_mutex_unlock (&connetion_threads_lock);
558
559   free (args);
560   return (NULL);
561 } /* }}} void *connection_thread_main */
562
563 static int open_listen_socket_unix (const char *path) /* {{{ */
564 {
565   int fd;
566   struct sockaddr_un sa;
567   listen_socket_t *temp;
568   int status;
569
570   temp = (listen_socket_t *) realloc (listen_fds,
571       sizeof (listen_fds[0]) * (listen_fds_num + 1));
572   if (temp == NULL)
573   {
574     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
575     return (-1);
576   }
577   listen_fds = temp;
578   memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
579
580   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
581   if (fd < 0)
582   {
583     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
584     return (-1);
585   }
586
587   memset (&sa, 0, sizeof (sa));
588   sa.sun_family = AF_UNIX;
589   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
590
591   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
592   if (status != 0)
593   {
594     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
595     close (fd);
596     unlink (path);
597     return (-1);
598   }
599
600   status = listen (fd, /* backlog = */ 10);
601   if (status != 0)
602   {
603     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
604     close (fd);
605     unlink (path);
606     return (-1);
607   }
608   
609   listen_fds[listen_fds_num].fd = fd;
610   snprintf (listen_fds[listen_fds_num].path,
611       sizeof (listen_fds[listen_fds_num].path) - 1,
612       "unix:%s", path);
613   listen_fds_num++;
614
615   return (0);
616 } /* }}} int open_listen_socket_unix */
617
618 static int open_listen_socket (const char *addr) /* {{{ */
619 {
620   struct addrinfo ai_hints;
621   struct addrinfo *ai_res;
622   struct addrinfo *ai_ptr;
623   int status;
624
625   assert (addr != NULL);
626
627   if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
628     return (open_listen_socket_unix (addr + strlen ("unix:")));
629   else if (addr[0] == '/')
630     return (open_listen_socket_unix (addr));
631
632   memset (&ai_hints, 0, sizeof (ai_hints));
633   ai_hints.ai_flags = 0;
634 #ifdef AI_ADDRCONFIG
635   ai_hints.ai_flags |= AI_ADDRCONFIG;
636 #endif
637   ai_hints.ai_family = AF_UNSPEC;
638   ai_hints.ai_socktype = SOCK_STREAM;
639
640   ai_res = NULL;
641   status = getaddrinfo (addr, DEFAULT_PORT, &ai_hints, &ai_res);
642   if (status != 0)
643   {
644     RRDD_LOG (LOG_ERR, "open_listen_socket: getaddrinfo(%s) failed: "
645         "%s", addr, gai_strerror (status));
646     return (-1);
647   }
648
649   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
650   {
651     int fd;
652     listen_socket_t *temp;
653
654     temp = (listen_socket_t *) realloc (listen_fds,
655         sizeof (listen_fds[0]) * (listen_fds_num + 1));
656     if (temp == NULL)
657     {
658       RRDD_LOG (LOG_ERR, "open_listen_socket: realloc failed.");
659       continue;
660     }
661     listen_fds = temp;
662     memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
663
664     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
665     if (fd < 0)
666     {
667       RRDD_LOG (LOG_ERR, "open_listen_socket: socket(2) failed.");
668       continue;
669     }
670
671     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
672     if (status != 0)
673     {
674       RRDD_LOG (LOG_ERR, "open_listen_socket: bind(2) failed.");
675       close (fd);
676       continue;
677     }
678
679     status = listen (fd, /* backlog = */ 10);
680     if (status != 0)
681     {
682       RRDD_LOG (LOG_ERR, "open_listen_socket: listen(2) failed.");
683       close (fd);
684       return (-1);
685     }
686
687     listen_fds[listen_fds_num].fd = fd;
688     strncpy (listen_fds[listen_fds_num].path, addr,
689         sizeof (listen_fds[listen_fds_num].path) - 1);
690     listen_fds_num++;
691   } /* for (ai_ptr) */
692
693   return (0);
694 } /* }}} int open_listen_socket */
695
696 static int close_listen_sockets (void) /* {{{ */
697 {
698   size_t i;
699
700   for (i = 0; i < listen_fds_num; i++)
701   {
702     close (listen_fds[i].fd);
703     if (strncmp ("unix:", listen_fds[i].path, strlen ("unix:")) == 0)
704       unlink (listen_fds[i].path + strlen ("unix:"));
705   }
706
707   free (listen_fds);
708   listen_fds = NULL;
709   listen_fds_num = 0;
710
711   return (0);
712 } /* }}} int close_listen_sockets */
713
714 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
715 {
716   struct pollfd *pollfds;
717   int pollfds_num;
718   int status;
719   int i;
720
721   for (i = 0; i < config_listen_address_list_len; i++)
722   {
723     RRDD_LOG (LOG_DEBUG, "listen_thread_main: config_listen_address_list[%i] "
724         "= %s", i, config_listen_address_list[i]);
725     open_listen_socket (config_listen_address_list[i]);
726   }
727
728   if (config_listen_address_list_len < 1)
729     open_listen_socket (RRDD_SOCK_PATH);
730
731   if (listen_fds_num < 1)
732   {
733     RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
734         "could be opened. Sorry.");
735     return (NULL);
736   }
737
738   pollfds_num = listen_fds_num;
739   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
740   if (pollfds == NULL)
741   {
742     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
743     return (NULL);
744   }
745   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
746
747   while (do_shutdown == 0)
748   {
749     assert (pollfds_num == ((int) listen_fds_num));
750     for (i = 0; i < pollfds_num; i++)
751     {
752       pollfds[i].fd = listen_fds[i].fd;
753       pollfds[i].events = POLLIN | POLLPRI;
754       pollfds[i].revents = 0;
755     }
756
757     status = poll (pollfds, pollfds_num, /* timeout = */ -1);
758     if (status < 1)
759     {
760       status = errno;
761       if (status != EINTR)
762       {
763         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
764       }
765       continue;
766     }
767
768     for (i = 0; i < pollfds_num; i++)
769     {
770       int *client_sd;
771       struct sockaddr_storage client_sa;
772       socklen_t client_sa_size;
773       pthread_t tid;
774
775       if (pollfds[i].revents == 0)
776         continue;
777
778       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
779       {
780         RRDD_LOG (LOG_ERR, "listen_thread_main: "
781             "poll(2) returned something unexpected for listen FD #%i.",
782             pollfds[i].fd);
783         continue;
784       }
785
786       client_sd = (int *) malloc (sizeof (int));
787       if (client_sd == NULL)
788       {
789         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
790         continue;
791       }
792
793       client_sa_size = sizeof (client_sa);
794       *client_sd = accept (pollfds[i].fd,
795           (struct sockaddr *) &client_sa, &client_sa_size);
796       if (*client_sd < 0)
797       {
798         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
799         continue;
800       }
801
802       RRDD_LOG (LOG_DEBUG, "listen_thread_main: accept(2) returned fd #%i.",
803           *client_sd);
804
805       status = pthread_create (&tid, /* attr = */ NULL, connection_thread_main,
806           /* args = */ (void *) client_sd);
807       if (status != 0)
808       {
809         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
810         close (*client_sd);
811         free (client_sd);
812         continue;
813       }
814
815       RRDD_LOG (LOG_DEBUG, "listen_thread_main: pthread_create succeeded: "
816           "tid = %lu",
817           *((unsigned long *) &tid));
818     } /* for (pollfds_num) */
819   } /* while (do_shutdown == 0) */
820
821   close_listen_sockets ();
822
823   pthread_mutex_lock (&connetion_threads_lock);
824   while (connetion_threads_num > 0)
825   {
826     pthread_t wait_for;
827
828     wait_for = connetion_threads[0];
829
830     pthread_mutex_unlock (&connetion_threads_lock);
831     pthread_join (wait_for, /* retval = */ NULL);
832     pthread_mutex_lock (&connetion_threads_lock);
833   }
834   pthread_mutex_unlock (&connetion_threads_lock);
835
836   RRDD_LOG (LOG_DEBUG, "listen_thread_main: Exiting.");
837
838   return (NULL);
839 } /* }}} void *listen_thread_main */
840
841 static int daemonize (void) /* {{{ */
842 {
843   pid_t child;
844   int status;
845
846   child = fork ();
847   if (child < 0)
848   {
849     fprintf (stderr, "daemonize: fork(2) failed.\n");
850     return (-1);
851   }
852   else if (child > 0)
853   {
854     return (1);
855   }
856
857   /* Change into the /tmp directory. */
858   chdir ("/tmp");
859
860   /* Become session leader */
861   setsid ();
862
863   /* Open the first three file descriptors to /dev/null */
864   close (2);
865   close (1);
866   close (0);
867
868   open ("/dev/null", O_RDWR);
869   dup (0);
870   dup (0);
871
872   {
873     struct sigaction sa;
874
875     memset (&sa, 0, sizeof (sa));
876     sa.sa_handler = sig_int_handler;
877     sigaction (SIGINT, &sa, NULL);
878
879     memset (&sa, 0, sizeof (sa));
880     sa.sa_handler = sig_term_handler;
881     sigaction (SIGINT, &sa, NULL);
882
883     memset (&sa, 0, sizeof (sa));
884     sa.sa_handler = SIG_IGN;
885     sigaction (SIGPIPE, &sa, NULL);
886   }
887
888   openlog ("rrdd", LOG_PID, LOG_DAEMON);
889
890   cache_tree = g_tree_new ((GCompareFunc) strcmp);
891   if (cache_tree == NULL)
892   {
893     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
894     return (-1);
895   }
896
897   memset (&queue_thread, 0, sizeof (queue_thread));
898   status = pthread_create (&queue_thread, /* attr = */ NULL,
899       queue_thread_main, /* args = */ NULL);
900   if (status != 0)
901   {
902     RRDD_LOG (LOG_ERR, "daemonize: pthread_create failed.");
903     return (-1);
904   }
905
906   return (0);
907 } /* }}} int daemonize */
908
909 static int cleanup (void) /* {{{ */
910 {
911   RRDD_LOG (LOG_DEBUG, "cleanup ()");
912
913   do_shutdown++;
914
915   RRDD_LOG (LOG_DEBUG, "cleanup: Joining queue_thread..");
916   pthread_cond_signal (&cache_cond);
917   pthread_join (queue_thread, /* return = */ NULL);
918   RRDD_LOG (LOG_DEBUG, "cleanup: done");
919
920   closelog ();
921
922   return (0);
923 } /* }}} int cleanup */
924
925 static int read_options (int argc, char **argv) /* {{{ */
926 {
927   int option;
928   int status = 0;
929
930   while ((option = getopt(argc, argv, "l:f:w:h?")) != -1)
931   {
932     switch (option)
933     {
934       case 'l':
935       {
936         char **temp;
937
938         temp = (char **) realloc (config_listen_address_list,
939             sizeof (char *) * (config_listen_address_list_len + 1));
940         if (temp == NULL)
941         {
942           fprintf (stderr, "read_options: realloc failed.\n");
943           return (2);
944         }
945         config_listen_address_list = temp;
946
947         temp[config_listen_address_list_len] = strdup (optarg);
948         if (temp[config_listen_address_list_len] == NULL)
949         {
950           fprintf (stderr, "read_options: strdup failed.\n");
951           return (2);
952         }
953         config_listen_address_list_len++;
954       }
955       break;
956
957       case 'f':
958       {
959         int temp;
960
961         temp = atoi (optarg);
962         if (temp > 0)
963           config_flush_interval = temp;
964         else
965         {
966           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
967           status = 3;
968         }
969       }
970       break;
971
972       case 'w':
973       {
974         int temp;
975
976         temp = atoi (optarg);
977         if (temp > 0)
978           config_write_interval = temp;
979         else
980         {
981           fprintf (stderr, "Invalid write interval: %s\n", optarg);
982           status = 2;
983         }
984       }
985       break;
986
987       case 'h':
988       case '?':
989         printf ("RRDd %s  Copyright (C) 2008 Florian octo Forster\n"
990             "\n"
991             "Usage: rrdd [options]\n"
992             "\n"
993             "Valid options are:\n"
994             "  -l <address>  Socket address to listen to.\n"
995             "  -w <seconds>  Interval in which to write data.\n"
996             "  -f <seconds>  Interval in which to flush dead data.\n"
997             "\n"
998             "For more information and a detailed description of all options "
999             "please refer\n"
1000             "to the rrdd(1) manual page.\n",
1001             VERSION);
1002         status = -1;
1003         break;
1004     } /* switch (option) */
1005   } /* while (getopt) */
1006
1007   return (status);
1008 } /* }}} int read_options */
1009
1010 int main (int argc, char **argv)
1011 {
1012   int status;
1013
1014   status = read_options (argc, argv);
1015   if (status != 0)
1016   {
1017     if (status < 0)
1018       status = 0;
1019     return (status);
1020   }
1021
1022   status = daemonize ();
1023   if (status == 1)
1024   {
1025     struct sigaction sigchld;
1026
1027     memset (&sigchld, 0, sizeof (sigchld));
1028     sigchld.sa_handler = SIG_IGN;
1029     sigaction (SIGCHLD, &sigchld, NULL);
1030
1031     return (0);
1032   }
1033   else if (status != 0)
1034   {
1035     fprintf (stderr, "daemonize failed, exiting.\n");
1036     return (1);
1037   }
1038
1039   listen_thread_main (NULL);
1040
1041   cleanup ();
1042
1043   return (0);
1044 } /* int main */
1045
1046 /*
1047  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
1048  */