src/rrd_daemon.c: Remove the declaration of strdup.
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; only version 2 of the License is applicable.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
17  *
18  * Authors:
19  *   Florian octo Forster <octo at verplant.org>
20  **/
21
22 /*
23  * First tell the compiler to stick to the C99 and POSIX standards as close as
24  * possible.
25  */
26 #ifndef __STRICT_ANSI__ /* {{{ */
27 # define __STRICT_ANSI__
28 #endif
29
30 #ifndef _ISOC99_SOURCE
31 # define _ISOC99_SOURCE
32 #endif
33
34 #ifdef _POSIX_C_SOURCE
35 # undef _POSIX_C_SOURCE
36 #endif
37 #define _POSIX_C_SOURCE 200112L
38
39 /* Single UNIX needed for strdup. */
40 #ifdef _XOPEN_SOURCE
41 # undef _XOPEN_SOURCE
42 #endif
43 #define _XOPEN_SOURCE 500
44
45 #ifndef _REENTRANT
46 # define _REENTRANT
47 #endif
48
49 #ifndef _THREAD_SAFE
50 # define _THREAD_SAFE
51 #endif
52
53 #ifdef _GNU_SOURCE
54 # undef _GNU_SOURCE
55 #endif
56 /* }}} */
57
58 /*
59  * Now for some includes..
60  */
61 #include "rrd.h" /* {{{ */
62 #include "rrd_client.h"
63
64 #include <stdlib.h>
65 #include <stdint.h>
66 #include <stdio.h>
67 #include <unistd.h>
68 #include <string.h>
69
70 #include <sys/types.h>
71 #include <sys/stat.h>
72 #include <fcntl.h>
73 #include <signal.h>
74 #include <sys/socket.h>
75 #include <sys/un.h>
76 #include <netdb.h>
77 #include <poll.h>
78 #include <syslog.h>
79 #include <pthread.h>
80 #include <errno.h>
81 #include <assert.h>
82 #include <sys/time.h>
83 #include <time.h>
84
85 #include <glib-2.0/glib.h>
86 /* }}} */
87
88 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
89
90 #ifndef __GNUC__
91 # define __attribute__(x) /**/
92 #endif
93
94 /*
95  * Types
96  */
97 struct listen_socket_s
98 {
99   int fd;
100   char path[PATH_MAX + 1];
101 };
102 typedef struct listen_socket_s listen_socket_t;
103
104 struct cache_item_s;
105 typedef struct cache_item_s cache_item_t;
106 struct cache_item_s
107 {
108   char *file;
109   char **values;
110   int values_num;
111   time_t last_flush_time;
112 #define CI_FLAGS_IN_TREE  0x01
113 #define CI_FLAGS_IN_QUEUE 0x02
114   int flags;
115
116   cache_item_t *next;
117 };
118
119 /*
120  * Variables
121  */
122 static listen_socket_t *listen_fds = NULL;
123 static size_t listen_fds_num = 0;
124
125 static int do_shutdown = 0;
126
127 static pthread_t queue_thread;
128
129 static pthread_t *connetion_threads = NULL;
130 static pthread_mutex_t connetion_threads_lock = PTHREAD_MUTEX_INITIALIZER;
131 static int connetion_threads_num = 0;
132
133 /* Cache stuff */
134 static GTree          *cache_tree = NULL;
135 static cache_item_t   *cache_queue_head = NULL;
136 static cache_item_t   *cache_queue_tail = NULL;
137 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
138 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
139
140 static int config_write_interval = 300;
141 static int config_flush_interval = 3600;
142
143 static char **config_listen_address_list = NULL;
144 static int config_listen_address_list_len = 0;
145
146 /* 
147  * Functions
148  */
149 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
150 {
151   do_shutdown++;
152 } /* }}} void sig_int_handler */
153
154 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
155 {
156   do_shutdown++;
157 } /* }}} void sig_term_handler */
158
159 /*
160  * enqueue_cache_item:
161  * `cache_lock' must be acquired before calling this function!
162  */
163 static int enqueue_cache_item (cache_item_t *ci) /* {{{ */
164 {
165   RRDD_LOG (LOG_DEBUG, "enqueue_cache_item: Adding %s to the update queue.",
166       ci->file);
167
168   if (ci == NULL)
169     return (-1);
170
171   if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
172     return (-1);
173
174   assert (ci->next == NULL);
175
176   if (ci->values_num == 0)
177     return (0);
178
179   if (cache_queue_tail == NULL)
180     cache_queue_head = ci;
181   else
182     cache_queue_tail->next = ci;
183   cache_queue_tail = ci;
184
185   return (0);
186 } /* }}} int enqueue_cache_item */
187
188 /*
189  * tree_callback_flush:
190  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
191  * while this is in progress.
192  */
193 static gboolean tree_callback_flush (gpointer key /* {{{ */
194     __attribute__((unused)), gpointer value, gpointer data)
195 {
196   cache_item_t *ci;
197   time_t now;
198
199   key = NULL; /* make compiler happy */
200
201   ci = (cache_item_t *) value;
202   now = *((time_t *) data);
203
204   if (((now - ci->last_flush_time) >= config_write_interval)
205       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0))
206     enqueue_cache_item (ci);
207
208   return (TRUE);
209 } /* }}} gboolean tree_callback_flush */
210
211 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
212 {
213   struct timeval now;
214   struct timespec next_flush;
215
216   gettimeofday (&now, NULL);
217   next_flush.tv_sec = now.tv_sec + config_flush_interval;
218   next_flush.tv_nsec = 1000 * now.tv_usec;
219
220   pthread_mutex_lock (&cache_lock);
221   while ((do_shutdown == 0) || (cache_queue_head != NULL))
222   {
223     cache_item_t *ci;
224     char *file;
225     char **values;
226     int values_num;
227     int status;
228     int i;
229
230     /* First, check if it's time to do the cache flush. */
231     gettimeofday (&now, NULL);
232     if ((now.tv_sec > next_flush.tv_sec)
233         || ((now.tv_sec == next_flush.tv_sec)
234           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
235     {
236       time_t time_now;
237
238       /* Pass the current time as user data so that we don't need to call
239        * `time' for each node. */
240       time_now = time (NULL);
241
242       g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &time_now);
243
244       /* Determine the time of the next cache flush. */
245       while (next_flush.tv_sec < now.tv_sec)
246         next_flush.tv_sec += config_flush_interval;
247     }
248
249     /* Now, check if there's something to store away. If not, wait until
250      * something comes in or it's time to do the cache flush. */
251     if (cache_queue_head == NULL)
252     {
253       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
254       if ((status != 0) && (status != ETIMEDOUT))
255       {
256         RRDD_LOG (LOG_ERR, "queue_thread_main: "
257             "pthread_cond_timedwait returned %i.", status);
258       }
259     }
260
261     /* Check if a value has arrived. This may be NULL if we timed out or there
262      * was an interrupt such as a signal. */
263     if (cache_queue_head == NULL)
264       continue;
265
266     ci = cache_queue_head;
267
268     /* copy the relevant parts */
269     file = strdup (ci->file);
270     if (file == NULL)
271     {
272       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
273       continue;
274     }
275
276     values = ci->values;
277     values_num = ci->values_num;
278
279     ci->values = NULL;
280     ci->values_num = 0;
281
282     ci->last_flush_time = time (NULL);
283     ci->flags &= ~(CI_FLAGS_IN_QUEUE);
284
285     cache_queue_head = ci->next;
286     if (cache_queue_head == NULL)
287       cache_queue_tail = NULL;
288     ci->next = NULL;
289
290     pthread_mutex_unlock (&cache_lock);
291
292     RRDD_LOG (LOG_DEBUG, "queue_thread_main: rrd_update (%s, %i, %p)",
293         file, values_num, (void *) values);
294
295     status = rrd_update_r (file, NULL, values_num, (void *) values);
296     if (status != 0)
297     {
298       RRDD_LOG (LOG_ERR, "queue_thread_main: "
299           "rrd_update_r failed with status %i.",
300           status);
301     }
302
303     free (file);
304     for (i = 0; i < values_num; i++)
305       free (values[i]);
306
307     pthread_mutex_lock (&cache_lock);
308   } /* while (do_shutdown == 0) */
309   pthread_mutex_unlock (&cache_lock);
310
311   RRDD_LOG (LOG_DEBUG, "queue_thread_main: Exiting.");
312
313   return (NULL);
314 } /* }}} void *queue_thread_main */
315
316 static int handle_request_update (int fd, /* {{{ */
317     char *buffer, int buffer_size __attribute__((unused)))
318 {
319   char *file;
320   char *value;
321   char *buffer_ptr;
322   int values_num = 0;
323   int status;
324
325   time_t now;
326
327   cache_item_t *ci;
328   char answer[4096];
329
330   now = time (NULL);
331
332   buffer_ptr = buffer;
333
334   file = buffer_ptr;
335   buffer_ptr += strlen (file) + 1;
336
337   pthread_mutex_lock (&cache_lock);
338
339   ci = g_tree_lookup (cache_tree, file);
340   if (ci == NULL)
341   {
342     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
343     if (ci == NULL)
344     {
345       pthread_mutex_unlock (&cache_lock);
346       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
347       return (-1);
348     }
349     memset (ci, 0, sizeof (cache_item_t));
350
351     ci->file = strdup (file);
352     if (ci->file == NULL)
353     {
354       pthread_mutex_unlock (&cache_lock);
355       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
356       free (ci);
357       return (-1);
358     }
359
360     ci->values = NULL;
361     ci->values_num = 0;
362     ci->last_flush_time = now;
363     ci->flags = CI_FLAGS_IN_TREE;
364
365     g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
366
367     RRDD_LOG (LOG_DEBUG, "handle_request_update: Created new tree node %s.",
368         ci->file);
369   }
370   assert (ci != NULL);
371
372   while (*buffer_ptr != 0)
373   {
374     char **temp;
375
376     value = buffer_ptr;
377     buffer_ptr += strlen (value) + 1;
378
379     temp = (char **) realloc (ci->values,
380         sizeof (char *) * (ci->values_num + 1));
381     if (temp == NULL)
382     {
383       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
384       continue;
385     }
386     ci->values = temp;
387
388     ci->values[ci->values_num] = strdup (value);
389     if (ci->values[ci->values_num] == NULL)
390     {
391       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
392       continue;
393     }
394     ci->values_num++;
395
396     values_num++;
397   }
398
399   if (((now - ci->last_flush_time) >= config_write_interval)
400       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0))
401   {
402     enqueue_cache_item (ci);
403     pthread_cond_signal (&cache_cond);
404   }
405
406   pthread_mutex_unlock (&cache_lock);
407
408   snprintf (answer, sizeof (answer), "0 Enqueued %i value(s)\n", values_num);
409   answer[sizeof (answer) - 1] = 0;
410
411   status = write (fd, answer, sizeof (answer));
412   if (status < 0)
413   {
414     status = errno;
415     RRDD_LOG (LOG_INFO, "handle_request_update: write(2) returned an error.");
416     return (status);
417   }
418
419   return (0);
420 } /* }}} int handle_request_update */
421
422 static int handle_request (int fd) /* {{{ */
423 {
424   char buffer[4096];
425   int buffer_size;
426
427   buffer_size = read (fd, buffer, sizeof (buffer));
428   if (buffer_size < 1)
429   {
430     RRDD_LOG (LOG_ERR, "handle_request: read(2) failed.");
431     return (-1);
432   }
433   assert (((size_t) buffer_size) <= sizeof (buffer));
434
435   if ((buffer[buffer_size - 2] != 0)
436       || (buffer[buffer_size - 1] != 0))
437   {
438     RRDD_LOG (LOG_INFO, "handle_request: malformed request.");
439     return (-1);
440   }
441
442   /* fields in the buffer a separated by null bytes. */
443   if (strcmp (buffer, "update") == 0)
444   {
445     int offset = strlen ("update") + 1;
446     return (handle_request_update (fd, buffer + offset,
447           buffer_size - offset));
448   }
449   else
450   {
451     RRDD_LOG (LOG_INFO, "handle_request: unknown command: %s.", buffer);
452     return (-1);
453   }
454 } /* }}} int handle_request */
455
456 static void *connection_thread_main (void *args /* {{{ */
457     __attribute__((unused)))
458 {
459   pthread_t self;
460   int i;
461   int fd;
462   
463   fd = *((int *) args);
464
465   pthread_mutex_lock (&connetion_threads_lock);
466   {
467     pthread_t *temp;
468
469     temp = (pthread_t *) realloc (connetion_threads,
470         sizeof (pthread_t) * (connetion_threads_num + 1));
471     if (temp == NULL)
472     {
473       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
474     }
475     else
476     {
477       connetion_threads = temp;
478       connetion_threads[connetion_threads_num] = pthread_self ();
479       connetion_threads_num++;
480     }
481   }
482   pthread_mutex_unlock (&connetion_threads_lock);
483
484   while (do_shutdown == 0)
485   {
486     struct pollfd pollfd;
487     int status;
488
489     pollfd.fd = fd;
490     pollfd.events = POLLIN | POLLPRI;
491     pollfd.revents = 0;
492
493     status = poll (&pollfd, 1, /* timeout = */ 500);
494     if (status == 0) /* timeout */
495       continue;
496     else if (status < 0) /* error */
497     {
498       status = errno;
499       if (status == EINTR)
500         continue;
501       RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
502       continue;
503     }
504
505     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
506     {
507       close (fd);
508       break;
509     }
510     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
511     {
512       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
513           "poll(2) returned something unexpected: %#04hx",
514           pollfd.revents);
515       close (fd);
516       break;
517     }
518
519     status = handle_request (fd);
520     if (status != 0)
521     {
522       close (fd);
523       break;
524     }
525   }
526
527   self = pthread_self ();
528   /* Remove this thread from the connection threads list */
529   pthread_mutex_lock (&connetion_threads_lock);
530   /* Find out own index in the array */
531   for (i = 0; i < connetion_threads_num; i++)
532     if (pthread_equal (connetion_threads[i], self) != 0)
533       break;
534   assert (i < connetion_threads_num);
535
536   /* Move the trailing threads forward. */
537   if (i < (connetion_threads_num - 1))
538   {
539     memmove (connetion_threads + i,
540         connetion_threads + i + 1,
541         sizeof (pthread_t) * (connetion_threads_num - i - 1));
542   }
543
544   connetion_threads_num--;
545   pthread_mutex_unlock (&connetion_threads_lock);
546
547   free (args);
548   return (NULL);
549 } /* }}} void *connection_thread_main */
550
551 static int open_listen_socket_unix (const char *path) /* {{{ */
552 {
553   int fd;
554   struct sockaddr_un sa;
555   listen_socket_t *temp;
556   int status;
557
558   temp = (listen_socket_t *) realloc (listen_fds,
559       sizeof (listen_fds[0]) * (listen_fds_num + 1));
560   if (temp == NULL)
561   {
562     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
563     return (-1);
564   }
565   listen_fds = temp;
566   memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
567
568   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
569   if (fd < 0)
570   {
571     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
572     return (-1);
573   }
574
575   memset (&sa, 0, sizeof (sa));
576   sa.sun_family = AF_UNIX;
577   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
578
579   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
580   if (status != 0)
581   {
582     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
583     close (fd);
584     unlink (path);
585     return (-1);
586   }
587
588   status = listen (fd, /* backlog = */ 10);
589   if (status != 0)
590   {
591     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
592     close (fd);
593     unlink (path);
594     return (-1);
595   }
596   
597   listen_fds[listen_fds_num].fd = fd;
598   snprintf (listen_fds[listen_fds_num].path,
599       sizeof (listen_fds[listen_fds_num].path) - 1,
600       "unix:%s", path);
601   listen_fds_num++;
602
603   return (0);
604 } /* }}} int open_listen_socket_unix */
605
606 static int open_listen_socket (const char *addr) /* {{{ */
607 {
608   struct addrinfo ai_hints;
609   struct addrinfo *ai_res;
610   struct addrinfo *ai_ptr;
611   int status;
612
613   assert (addr != NULL);
614
615   if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
616     return (open_listen_socket_unix (addr + strlen ("unix:")));
617   else if (addr[0] == '/')
618     return (open_listen_socket_unix (addr));
619
620   memset (&ai_hints, 0, sizeof (ai_hints));
621   ai_hints.ai_flags = 0;
622 #ifdef AI_ADDRCONFIG
623   ai_hints.ai_flags |= AI_ADDRCONFIG;
624 #endif
625   ai_hints.ai_family = AF_UNSPEC;
626   ai_hints.ai_socktype = SOCK_STREAM;
627
628   ai_res = NULL;
629   status = getaddrinfo (addr, DEFAULT_PORT, &ai_hints, &ai_res);
630   if (status != 0)
631   {
632     RRDD_LOG (LOG_ERR, "open_listen_socket: getaddrinfo(%s) failed: "
633         "%s", addr, gai_strerror (status));
634     return (-1);
635   }
636
637   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
638   {
639     int fd;
640     listen_socket_t *temp;
641
642     temp = (listen_socket_t *) realloc (listen_fds,
643         sizeof (listen_fds[0]) * (listen_fds_num + 1));
644     if (temp == NULL)
645     {
646       RRDD_LOG (LOG_ERR, "open_listen_socket: realloc failed.");
647       continue;
648     }
649     listen_fds = temp;
650     memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
651
652     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
653     if (fd < 0)
654     {
655       RRDD_LOG (LOG_ERR, "open_listen_socket: socket(2) failed.");
656       continue;
657     }
658
659     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
660     if (status != 0)
661     {
662       RRDD_LOG (LOG_ERR, "open_listen_socket: bind(2) failed.");
663       close (fd);
664       continue;
665     }
666
667     status = listen (fd, /* backlog = */ 10);
668     if (status != 0)
669     {
670       RRDD_LOG (LOG_ERR, "open_listen_socket: listen(2) failed.");
671       close (fd);
672       return (-1);
673     }
674
675     listen_fds[listen_fds_num].fd = fd;
676     strncpy (listen_fds[listen_fds_num].path, addr,
677         sizeof (listen_fds[listen_fds_num].path) - 1);
678     listen_fds_num++;
679   } /* for (ai_ptr) */
680
681   return (0);
682 } /* }}} int open_listen_socket */
683
684 static int close_listen_sockets (void) /* {{{ */
685 {
686   size_t i;
687
688   for (i = 0; i < listen_fds_num; i++)
689   {
690     close (listen_fds[i].fd);
691     if (strncmp ("unix:", listen_fds[i].path, strlen ("unix:")) == 0)
692       unlink (listen_fds[i].path + strlen ("unix:"));
693   }
694
695   free (listen_fds);
696   listen_fds = NULL;
697   listen_fds_num = 0;
698
699   return (0);
700 } /* }}} int close_listen_sockets */
701
702 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
703 {
704   struct pollfd *pollfds;
705   int pollfds_num;
706   int status;
707   int i;
708
709   for (i = 0; i < config_listen_address_list_len; i++)
710   {
711     RRDD_LOG (LOG_DEBUG, "listen_thread_main: config_listen_address_list[%i] "
712         "= %s", i, config_listen_address_list[i]);
713     open_listen_socket (config_listen_address_list[i]);
714   }
715
716   if (config_listen_address_list_len < 1)
717     open_listen_socket (RRDD_SOCK_PATH);
718
719   if (listen_fds_num < 1)
720   {
721     RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
722         "could be opened. Sorry.");
723     return (NULL);
724   }
725
726   pollfds_num = listen_fds_num;
727   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
728   if (pollfds == NULL)
729   {
730     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
731     return (NULL);
732   }
733   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
734
735   while (do_shutdown == 0)
736   {
737     assert (pollfds_num == ((int) listen_fds_num));
738     for (i = 0; i < pollfds_num; i++)
739     {
740       pollfds[i].fd = listen_fds[i].fd;
741       pollfds[i].events = POLLIN | POLLPRI;
742       pollfds[i].revents = 0;
743     }
744
745     status = poll (pollfds, pollfds_num, /* timeout = */ -1);
746     if (status < 1)
747     {
748       status = errno;
749       if (status != EINTR)
750       {
751         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
752       }
753       continue;
754     }
755
756     for (i = 0; i < pollfds_num; i++)
757     {
758       int *client_sd;
759       struct sockaddr_storage client_sa;
760       socklen_t client_sa_size;
761       pthread_t tid;
762
763       if (pollfds[i].revents == 0)
764         continue;
765
766       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
767       {
768         RRDD_LOG (LOG_ERR, "listen_thread_main: "
769             "poll(2) returned something unexpected for listen FD #%i.",
770             pollfds[i].fd);
771         continue;
772       }
773
774       client_sd = (int *) malloc (sizeof (int));
775       if (client_sd == NULL)
776       {
777         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
778         continue;
779       }
780
781       client_sa_size = sizeof (client_sa);
782       *client_sd = accept (pollfds[i].fd,
783           (struct sockaddr *) &client_sa, &client_sa_size);
784       if (*client_sd < 0)
785       {
786         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
787         continue;
788       }
789
790       status = pthread_create (&tid, /* attr = */ NULL, connection_thread_main,
791           /* args = */ (void *) client_sd);
792       if (status != 0)
793       {
794         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
795         close (*client_sd);
796         free (client_sd);
797         continue;
798       }
799     } /* for (pollfds_num) */
800   } /* while (do_shutdown == 0) */
801
802   close_listen_sockets ();
803
804   pthread_mutex_lock (&connetion_threads_lock);
805   while (connetion_threads_num > 0)
806   {
807     pthread_t wait_for;
808
809     wait_for = connetion_threads[0];
810
811     pthread_mutex_unlock (&connetion_threads_lock);
812     pthread_join (wait_for, /* retval = */ NULL);
813     pthread_mutex_lock (&connetion_threads_lock);
814   }
815   pthread_mutex_unlock (&connetion_threads_lock);
816
817   RRDD_LOG (LOG_DEBUG, "listen_thread_main: Exiting.");
818
819   return (NULL);
820 } /* }}} void *listen_thread_main */
821
822 static int daemonize (void) /* {{{ */
823 {
824   pid_t child;
825   int status;
826
827   child = fork ();
828   if (child < 0)
829   {
830     fprintf (stderr, "daemonize: fork(2) failed.\n");
831     return (-1);
832   }
833   else if (child > 0)
834   {
835     return (1);
836   }
837
838   /* Change into the /tmp directory. */
839   chdir ("/tmp");
840
841   /* Become session leader */
842   setsid ();
843
844   /* Open the first three file descriptors to /dev/null */
845   close (2);
846   close (1);
847   close (0);
848
849   open ("/dev/null", O_RDWR);
850   dup (0);
851   dup (0);
852
853   {
854     struct sigaction sa;
855
856     memset (&sa, 0, sizeof (sa));
857     sa.sa_handler = sig_int_handler;
858     sigaction (SIGINT, &sa, NULL);
859
860     memset (&sa, 0, sizeof (sa));
861     sa.sa_handler = sig_term_handler;
862     sigaction (SIGINT, &sa, NULL);
863
864     memset (&sa, 0, sizeof (sa));
865     sa.sa_handler = SIG_IGN;
866     sigaction (SIGPIPE, &sa, NULL);
867   }
868
869   openlog ("rrdd", LOG_PID, LOG_DAEMON);
870
871   cache_tree = g_tree_new ((GCompareFunc) strcmp);
872   if (cache_tree == NULL)
873   {
874     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
875     return (-1);
876   }
877
878   memset (&queue_thread, 0, sizeof (queue_thread));
879   status = pthread_create (&queue_thread, /* attr = */ NULL,
880       queue_thread_main, /* args = */ NULL);
881   if (status != 0)
882   {
883     RRDD_LOG (LOG_ERR, "daemonize: pthread_create failed.");
884     return (-1);
885   }
886
887   return (0);
888 } /* }}} int daemonize */
889
890 static int cleanup (void) /* {{{ */
891 {
892   RRDD_LOG (LOG_DEBUG, "cleanup ()");
893
894   do_shutdown++;
895
896   RRDD_LOG (LOG_DEBUG, "cleanup: Joining queue_thread..");
897   pthread_cond_signal (&cache_cond);
898   pthread_join (queue_thread, /* return = */ NULL);
899   RRDD_LOG (LOG_DEBUG, "cleanup: done");
900
901   closelog ();
902
903   return (0);
904 } /* }}} int cleanup */
905
906 static int read_options (int argc, char **argv) /* {{{ */
907 {
908   int option;
909   int status = 0;
910
911   while ((option = getopt(argc, argv, "l:f:w:h?")) != -1)
912   {
913     switch (option)
914     {
915       case 'l':
916       {
917         char **temp;
918
919         temp = (char **) realloc (config_listen_address_list,
920             sizeof (char *) * (config_listen_address_list_len + 1));
921         if (temp == NULL)
922         {
923           fprintf (stderr, "read_options: realloc failed.\n");
924           return (2);
925         }
926         config_listen_address_list = temp;
927
928         temp[config_listen_address_list_len] = strdup (optarg);
929         if (temp[config_listen_address_list_len] == NULL)
930         {
931           fprintf (stderr, "read_options: strdup failed.\n");
932           return (2);
933         }
934         config_listen_address_list_len++;
935       }
936       break;
937
938       case 'f':
939       {
940         int temp;
941
942         temp = atoi (optarg);
943         if (temp > 0)
944           config_flush_interval = temp;
945         else
946         {
947           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
948           status = 3;
949         }
950       }
951       break;
952
953       case 'w':
954       {
955         int temp;
956
957         temp = atoi (optarg);
958         if (temp > 0)
959           config_write_interval = temp;
960         else
961         {
962           fprintf (stderr, "Invalid write interval: %s\n", optarg);
963           status = 2;
964         }
965       }
966       break;
967
968       case 'h':
969       case '?':
970         printf ("RRDd %s  Copyright (C) 2008 Florian octo Forster\n"
971             "\n"
972             "Usage: rrdd [options]\n"
973             "\n"
974             "Valid options are:\n"
975             "  -l <address>  Socket address to listen to.\n"
976             "  -w <seconds>  Interval in which to write data.\n"
977             "  -f <seconds>  Interval in which to flush dead data.\n"
978             "\n"
979             "For more information and a detailed description of all options "
980             "please refer\n"
981             "to the rrdd(1) manual page.\n",
982             VERSION);
983         status = -1;
984         break;
985     } /* switch (option) */
986   } /* while (getopt) */
987
988   return (status);
989 } /* }}} int read_options */
990
991 int main (int argc, char **argv)
992 {
993   int status;
994
995   status = read_options (argc, argv);
996   if (status != 0)
997   {
998     if (status < 0)
999       status = 0;
1000     return (status);
1001   }
1002
1003   status = daemonize ();
1004   if (status == 1)
1005   {
1006     struct sigaction sigchld;
1007
1008     memset (&sigchld, 0, sizeof (sigchld));
1009     sigchld.sa_handler = SIG_IGN;
1010     sigaction (SIGCHLD, &sigchld, NULL);
1011
1012     return (0);
1013   }
1014   else if (status != 0)
1015   {
1016     fprintf (stderr, "daemonize failed, exiting.\n");
1017     return (1);
1018   }
1019
1020   listen_thread_main (NULL);
1021
1022   cleanup ();
1023
1024   return (0);
1025 } /* int main */
1026
1027 /*
1028  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
1029  */