src/rrdd.c et al.: Use the tree implementation from GLib 2.0.
[rrdd.git] / src / rrdd.c
1 /**
2  * collectd - src/rrdd.c
3  * Copyright (C) 2008 Florian octo Forster
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; only version 2 of the License is applicable.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
17  *
18  * Authors:
19  *   Florian octo Forster <octo at verplant.org>
20  **/
21
22 #define RRDD_DEBUG 1
23
24 #include "rrdd.h"
25 #include <glib-2.0/glib.h>
26 #include <rrd.h>
27
28 #if RRDD_DEBUG
29 # define RRDD_LOG(severity, ...) do { fprintf (stderr, __VA_ARGS__); fprintf (stderr, "\n"); } while (0)
30 #else
31 # define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
32 #endif
33
34 /*
35  * Types
36  */
37 struct listen_socket_s
38 {
39   int fd;
40   char path[PATH_MAX + 1];
41 };
42 typedef struct listen_socket_s listen_socket_t;
43
44 struct cache_item_s;
45 typedef struct cache_item_s cache_item_t;
46 struct cache_item_s
47 {
48   char *file;
49   char **values;
50   int values_num;
51   time_t last_flush_time;
52 #define CI_FLAGS_IN_TREE  0x01
53 #define CI_FLAGS_IN_QUEUE 0x02
54   int flags;
55
56   cache_item_t *next;
57 };
58
59 /*
60  * Variables
61  */
62 static listen_socket_t *listen_fds = NULL;
63 static size_t listen_fds_num = 0;
64
65 static int do_shutdown = 0;
66
67 static pthread_t queue_thread;
68
69 static pthread_t *connetion_threads = NULL;
70 static pthread_mutex_t connetion_threads_lock = PTHREAD_MUTEX_INITIALIZER;
71 static int connetion_threads_num = 0;
72
73 /* Cache stuff */
74 static GTree          *cache_tree = NULL;
75 static cache_item_t   *cache_queue_head = NULL;
76 static cache_item_t   *cache_queue_tail = NULL;
77 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
78 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
79
80 static int config_write_interval = 300;
81 static int config_flush_interval = 3600;
82
83 static char **config_listen_address_list = NULL;
84 static int config_listen_address_list_len = 0;
85
86 /* 
87  * Functions
88  */
89 static void sig_int_handler (int signal) /* {{{ */
90 {
91   do_shutdown++;
92 } /* }}} void sig_int_handler */
93
94 static void sig_term_handler (int signal) /* {{{ */
95 {
96   do_shutdown++;
97 } /* }}} void sig_term_handler */
98
99 static int cache_tree_compare (const void *v0, const void *v1) /* {{{ */
100 {
101   cache_item_t *c0 = (cache_item_t *) v0;
102   cache_item_t *c1 = (cache_item_t *) v1;
103
104   assert (c0->file != NULL);
105   assert (c1->file != NULL);
106
107   return (strcmp (c0->file, c1->file));
108 } /* }}} int cache_tree_compare */
109
110 static void cache_tree_free (void *v) /* {{{ */
111 {
112   cache_item_t *c = (cache_item_t *) v;
113
114   assert (c->values_num == 0);
115   assert ((c->flags & CI_FLAGS_IN_TREE) != 0);
116   assert ((c->flags & CI_FLAGS_IN_QUEUE) == 0);
117
118   free (c->file);
119   c->file = NULL;
120   free (c);
121 } /* }}} void cache_tree_free */
122
123 static void *queue_thread_main (void *args) /* {{{ */
124 {
125   pthread_mutex_lock (&cache_lock);
126   while ((do_shutdown == 0) || (cache_queue_head != NULL))
127   {
128     cache_item_t *ci;
129
130     char *file;
131     char **values;
132     int values_num;
133     int status;
134     int i;
135
136     if (cache_queue_head == NULL)
137       pthread_cond_wait (&cache_cond, &cache_lock);
138
139     if (cache_queue_head == NULL)
140       continue;
141
142     ci = cache_queue_head;
143
144     /* copy the relevant parts */
145     file = strdup (ci->file);
146     if (file == NULL)
147     {
148       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
149       continue;
150     }
151
152     values = ci->values;
153     values_num = ci->values_num;
154
155     ci->values = NULL;
156     ci->values_num = 0;
157
158     ci->last_flush_time = time (NULL);
159     ci->flags &= ~(CI_FLAGS_IN_QUEUE);
160
161     cache_queue_head = ci->next;
162     if (cache_queue_head == NULL)
163       cache_queue_tail = NULL;
164     ci->next = NULL;
165
166     pthread_mutex_unlock (&cache_lock);
167
168     RRDD_LOG (LOG_DEBUG, "queue_thread_main: rrd_update (%s, %i, %p)",
169         file, values_num, (void *) values);
170
171     status = rrd_update_r (file, NULL, values_num, (void *) values);
172     if (status != 0)
173     {
174       RRDD_LOG (LOG_ERR, "queue_thread_main: "
175           "rrd_update_r failed with status %i.",
176           status);
177     }
178
179     free (file);
180     for (i = 0; i < values_num; i++)
181       free (values[i]);
182
183     pthread_mutex_lock (&cache_lock);
184   } /* while (do_shutdown == 0) */
185   pthread_mutex_unlock (&cache_lock);
186
187   RRDD_LOG (LOG_DEBUG, "queue_thread_main: Exiting.");
188
189   return (NULL);
190 } /* }}} void *queue_thread_main */
191
192 static int handle_request_update (int fd, /* {{{ */
193     char *buffer, int buffer_size)
194 {
195   char *file;
196   char *value;
197   char *buffer_ptr;
198   int values_num = 0;
199   int status;
200
201   time_t now;
202
203   cache_item_t *ci;
204   char answer[4096];
205
206   now = time (NULL);
207
208   RRDD_LOG (LOG_DEBUG, "handle_request_update (%i, %p, %i)",
209       fd, (void *) buffer, buffer_size);
210
211   buffer_ptr = buffer;
212
213   file = buffer_ptr;
214   buffer_ptr += strlen (file) + 1;
215
216   pthread_mutex_lock (&cache_lock);
217
218   ci = g_tree_lookup (cache_tree, file);
219   if (ci == NULL)
220   {
221     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
222     if (ci == NULL)
223     {
224       pthread_mutex_unlock (&cache_lock);
225       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
226       return (-1);
227     }
228     memset (ci, 0, sizeof (cache_item_t));
229
230     ci->file = strdup (file);
231     if (ci->file == NULL)
232     {
233       pthread_mutex_unlock (&cache_lock);
234       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
235       free (ci);
236       return (-1);
237     }
238
239     ci->values = NULL;
240     ci->values_num = 0;
241     ci->last_flush_time = now;
242     ci->flags = CI_FLAGS_IN_TREE;
243
244     g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
245
246     RRDD_LOG (LOG_DEBUG, "handle_request_update: Created new tree node %s.",
247         ci->file);
248   }
249   assert (ci != NULL);
250
251   while (*buffer_ptr != 0)
252   {
253     char **temp;
254
255     value = buffer_ptr;
256     buffer_ptr += strlen (value) + 1;
257
258     temp = (char **) realloc (ci->values,
259         sizeof (char *) * (ci->values_num + 1));
260     if (temp == NULL)
261     {
262       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
263       continue;
264     }
265     ci->values = temp;
266
267     ci->values[ci->values_num] = strdup (value);
268     if (ci->values[ci->values_num] == NULL)
269     {
270       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
271       continue;
272     }
273     ci->values_num++;
274
275     values_num++;
276   }
277
278   if (((now - ci->last_flush_time) >= config_write_interval)
279       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0))
280   {
281     RRDD_LOG (LOG_DEBUG, "handle_request_update: Adding %s to the update queue.",
282         ci->file);
283
284     assert (ci->next == NULL);
285
286     if (cache_queue_tail == NULL)
287       cache_queue_head = ci;
288     else
289       cache_queue_tail->next = ci;
290     cache_queue_tail = ci;
291
292     pthread_cond_signal (&cache_cond);
293   }
294
295   pthread_mutex_unlock (&cache_lock);
296
297   snprintf (answer, sizeof (answer), "0 Enqueued %i value(s)\n", values_num);
298   answer[sizeof (answer) - 1] = 0;
299
300   status = write (fd, answer, sizeof (answer));
301   if (status < 0)
302   {
303     status = errno;
304     RRDD_LOG (LOG_INFO, "handle_request_update: write(2) returned an error.");
305     return (status);
306   }
307
308   return (0);
309 } /* }}} int handle_request_update */
310
311 static int handle_request (int fd) /* {{{ */
312 {
313   char buffer[4096];
314   int buffer_size;
315
316   RRDD_LOG (LOG_DEBUG, "handle_request (%i)", fd);
317
318   buffer_size = read (fd, buffer, sizeof (buffer));
319   if (buffer_size < 1)
320   {
321     RRDD_LOG (LOG_ERR, "handle_request: read(2) failed.");
322     return (-1);
323   }
324   assert (((size_t) buffer_size) <= sizeof (buffer));
325
326   if ((buffer[buffer_size - 2] != 0)
327       || (buffer[buffer_size - 1] != 0))
328   {
329     RRDD_LOG (LOG_INFO, "handle_request: malformed request.");
330     return (-1);
331   }
332
333   /* fields in the buffer a separated by null bytes. */
334   if (strcmp (buffer, "update") == 0)
335   {
336     int offset = strlen ("update") + 1;
337     return (handle_request_update (fd, buffer + offset,
338           buffer_size - offset));
339   }
340   else
341   {
342     RRDD_LOG (LOG_INFO, "handle_request: unknown command: %s.", buffer);
343     return (-1);
344   }
345 } /* }}} int handle_request */
346
347 static void *connection_thread_main (void *args) /* {{{ */
348 {
349   pthread_t self;
350   int i;
351   int fd;
352   
353   fd = *((int *) args);
354
355   RRDD_LOG (LOG_DEBUG, "connection_thread_main: Adding myself to "
356       "connetion_threads[]..");
357   pthread_mutex_lock (&connetion_threads_lock);
358   {
359     pthread_t *temp;
360
361     temp = (pthread_t *) realloc (connetion_threads,
362         sizeof (pthread_t) * (connetion_threads_num + 1));
363     if (temp == NULL)
364     {
365       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
366     }
367     else
368     {
369       connetion_threads = temp;
370       connetion_threads[connetion_threads_num] = pthread_self ();
371       connetion_threads_num++;
372     }
373   }
374   pthread_mutex_unlock (&connetion_threads_lock);
375   RRDD_LOG (LOG_DEBUG, "connection_thread_main: done");
376
377   while (do_shutdown == 0)
378   {
379     struct pollfd pollfd;
380     int status;
381
382     pollfd.fd = fd;
383     pollfd.events = POLLIN | POLLPRI;
384     pollfd.revents = 0;
385
386     status = poll (&pollfd, 1, /* timeout = */ 500);
387     if (status == 0) /* timeout */
388       continue;
389     else if (status < 0) /* error */
390     {
391       status = errno;
392       if (status == EINTR)
393         continue;
394       RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
395       continue;
396     }
397
398     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
399     {
400       RRDD_LOG (LOG_DEBUG, "connection_thread_main: "
401           "poll(2) returned POLLHUP.");
402       close (fd);
403       break;
404     }
405     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
406     {
407       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
408           "poll(2) returned something unexpected: %#04hx",
409           pollfd.revents);
410       close (fd);
411       break;
412     }
413
414     status = handle_request (fd);
415     if (status != 0)
416     {
417       close (fd);
418       break;
419     }
420   }
421
422   self = pthread_self ();
423   /* Remove this thread from the connection threads list */
424   pthread_mutex_lock (&connetion_threads_lock);
425   /* Find out own index in the array */
426   for (i = 0; i < connetion_threads_num; i++)
427     if (pthread_equal (connetion_threads[i], self) != 0)
428       break;
429   assert (i < connetion_threads_num);
430
431   /* Move the trailing threads forward. */
432   if (i < (connetion_threads_num - 1))
433   {
434     memmove (connetion_threads + i,
435         connetion_threads + i + 1,
436         sizeof (pthread_t) * (connetion_threads_num - i - 1));
437   }
438
439   connetion_threads_num--;
440   pthread_mutex_unlock (&connetion_threads_lock);
441
442   free (args);
443   return (NULL);
444 } /* }}} void *connection_thread_main */
445
446 static int open_listen_socket_unix (const char *path) /* {{{ */
447 {
448   int fd;
449   struct sockaddr_un sa;
450   listen_socket_t *temp;
451   int status;
452
453   temp = (listen_socket_t *) realloc (listen_fds,
454       sizeof (listen_fds[0]) * (listen_fds_num + 1));
455   if (temp == NULL)
456   {
457     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
458     return (-1);
459   }
460   listen_fds = temp;
461   memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
462
463   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
464   if (fd < 0)
465   {
466     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
467     return (-1);
468   }
469
470   memset (&sa, 0, sizeof (sa));
471   sa.sun_family = AF_UNIX;
472   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
473
474   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
475   if (status != 0)
476   {
477     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
478     close (fd);
479     unlink (path);
480     return (-1);
481   }
482
483   status = listen (fd, /* backlog = */ 10);
484   if (status != 0)
485   {
486     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
487     close (fd);
488     unlink (path);
489     return (-1);
490   }
491   
492   listen_fds[listen_fds_num].fd = fd;
493   snprintf (listen_fds[listen_fds_num].path,
494       sizeof (listen_fds[listen_fds_num].path) - 1,
495       "unix:%s", path);
496   listen_fds_num++;
497
498   return (0);
499 } /* }}} int open_listen_socket_unix */
500
501 static int open_listen_socket (const char *addr) /* {{{ */
502 {
503   struct addrinfo ai_hints;
504   struct addrinfo *ai_res;
505   struct addrinfo *ai_ptr;
506   int status;
507
508   assert (addr != NULL);
509
510   if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
511     return (open_listen_socket_unix (addr + strlen ("unix:")));
512   else if (addr[0] == '/')
513     return (open_listen_socket_unix (addr));
514
515   memset (&ai_hints, 0, sizeof (ai_hints));
516   ai_hints.ai_flags = 0;
517 #ifdef AI_ADDRCONFIG
518   ai_hints.ai_flags |= AI_ADDRCONFIG;
519 #endif
520   ai_hints.ai_family = AF_UNSPEC;
521   ai_hints.ai_socktype = SOCK_STREAM;
522
523   ai_res = NULL;
524   status = getaddrinfo (addr, DEFAULT_PORT, &ai_hints, &ai_res);
525   if (status != 0)
526   {
527     RRDD_LOG (LOG_ERR, "open_listen_socket: getaddrinfo(%s) failed: "
528         "%s", addr, gai_strerror (status));
529     return (-1);
530   }
531
532   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
533   {
534     int fd;
535     listen_socket_t *temp;
536
537     temp = (listen_socket_t *) realloc (listen_fds,
538         sizeof (listen_fds[0]) * (listen_fds_num + 1));
539     if (temp == NULL)
540     {
541       RRDD_LOG (LOG_ERR, "open_listen_socket: realloc failed.");
542       continue;
543     }
544     listen_fds = temp;
545     memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
546
547     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
548     if (fd < 0)
549     {
550       RRDD_LOG (LOG_ERR, "open_listen_socket: socket(2) failed.");
551       continue;
552     }
553
554     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
555     if (status != 0)
556     {
557       RRDD_LOG (LOG_ERR, "open_listen_socket: bind(2) failed.");
558       close (fd);
559       continue;
560     }
561
562     status = listen (fd, /* backlog = */ 10);
563     if (status != 0)
564     {
565       RRDD_LOG (LOG_ERR, "open_listen_socket: listen(2) failed.");
566       close (fd);
567       return (-1);
568     }
569
570     listen_fds[listen_fds_num].fd = fd;
571     strncpy (listen_fds[listen_fds_num].path, addr,
572         sizeof (listen_fds[listen_fds_num].path) - 1);
573     listen_fds_num++;
574   } /* for (ai_ptr) */
575
576   return (0);
577 } /* }}} int open_listen_socket */
578
579 static int close_listen_sockets (void) /* {{{ */
580 {
581   size_t i;
582
583   for (i = 0; i < listen_fds_num; i++)
584   {
585     close (listen_fds[i].fd);
586     if (strncmp ("unix:", listen_fds[i].path, strlen ("unix:")) == 0)
587       unlink (listen_fds[i].path + strlen ("unix:"));
588   }
589
590   free (listen_fds);
591   listen_fds = NULL;
592   listen_fds_num = 0;
593
594   return (0);
595 } /* }}} int close_listen_sockets */
596
597 static void *listen_thread_main (void *args) /* {{{ */
598 {
599   char buffer[4096];
600   struct pollfd *pollfds;
601   int pollfds_num;
602   int status;
603   int i;
604
605   for (i = 0; i < config_listen_address_list_len; i++)
606   {
607     RRDD_LOG (LOG_DEBUG, "listen_thread_main: config_listen_address_list[%i] "
608         "= %s", i, config_listen_address_list[i]);
609     open_listen_socket (config_listen_address_list[i]);
610   }
611
612   if (config_listen_address_list_len < 1)
613     open_listen_socket (RRDD_SOCK_PATH);
614
615   if (listen_fds_num < 1)
616   {
617     RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
618         "could be opened. Sorry.");
619     return (NULL);
620   }
621
622   pollfds_num = listen_fds_num;
623   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
624   if (pollfds == NULL)
625   {
626     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
627     return (NULL);
628   }
629   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
630
631   while (do_shutdown == 0)
632   {
633     assert (pollfds_num == listen_fds_num);
634     for (i = 0; i < pollfds_num; i++)
635     {
636       pollfds[i].fd = listen_fds[i].fd;
637       pollfds[i].events = POLLIN | POLLPRI;
638       pollfds[i].revents = 0;
639     }
640
641     status = poll (pollfds, pollfds_num, /* timeout = */ -1);
642     if (status < 1)
643     {
644       status = errno;
645       if (status != EINTR)
646       {
647         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
648       }
649       continue;
650     }
651
652     for (i = 0; i < pollfds_num; i++)
653     {
654       int *client_sd;
655       struct sockaddr_storage client_sa;
656       socklen_t client_sa_size;
657       pthread_t tid;
658
659       if (pollfds[i].revents == 0)
660         continue;
661
662       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
663       {
664         RRDD_LOG (LOG_ERR, "listen_thread_main: "
665             "poll(2) returned something unexpected for listen FD #%i.",
666             pollfds[i].fd);
667         continue;
668       }
669
670       client_sd = (int *) malloc (sizeof (int));
671       if (client_sd == NULL)
672       {
673         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
674         continue;
675       }
676
677       client_sa_size = sizeof (client_sa);
678       *client_sd = accept (pollfds[i].fd,
679           (struct sockaddr *) &client_sa, &client_sa_size);
680       if (*client_sd < 0)
681       {
682         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
683         continue;
684       }
685
686       RRDD_LOG (LOG_DEBUG, "listen_thread_main: accept(2) returned fd #%i.",
687           *client_sd);
688
689       status = pthread_create (&tid, /* attr = */ NULL, connection_thread_main,
690           /* args = */ (void *) client_sd);
691       if (status != 0)
692       {
693         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
694         close (*client_sd);
695         free (client_sd);
696         continue;
697       }
698
699       RRDD_LOG (LOG_DEBUG, "listen_thread_main: pthread_create succeeded: "
700           "tid = %lu",
701           *((unsigned long *) &tid));
702     } /* for (pollfds_num) */
703   } /* while (do_shutdown == 0) */
704
705   close_listen_sockets ();
706
707   pthread_mutex_lock (&connetion_threads_lock);
708   while (connetion_threads_num > 0)
709   {
710     pthread_t wait_for;
711
712     wait_for = connetion_threads[0];
713
714     pthread_mutex_unlock (&connetion_threads_lock);
715     pthread_join (wait_for, /* retval = */ NULL);
716     pthread_mutex_lock (&connetion_threads_lock);
717   }
718   pthread_mutex_unlock (&connetion_threads_lock);
719
720   RRDD_LOG (LOG_DEBUG, "listen_thread_main: Exiting.");
721
722   return (NULL);
723 } /* }}} void *listen_thread_main */
724
725 static int daemonize (void) /* {{{ */
726 {
727   pid_t child;
728   int status;
729
730 #if !RRDD_DEBUG
731   child = fork ();
732   if (child < 0)
733   {
734     fprintf (stderr, "daemonize: fork(2) failed.\n");
735     return (-1);
736   }
737   else if (child > 0)
738   {
739     return (1);
740   }
741
742   /* Change into the /tmp directory. */
743   chdir ("/tmp");
744
745   /* Become session leader */
746   setsid ();
747
748   /* Open the first three file descriptors to /dev/null */
749   close (2);
750   close (1);
751   close (0);
752
753   open ("/dev/null", O_RDWR);
754   dup (0);
755   dup (0);
756 #endif /* RRDD_DEBUG */
757
758   {
759     struct sigaction sa;
760
761     memset (&sa, 0, sizeof (sa));
762     sa.sa_handler = sig_int_handler;
763     sigaction (SIGINT, &sa, NULL);
764
765     memset (&sa, 0, sizeof (sa));
766     sa.sa_handler = sig_term_handler;
767     sigaction (SIGINT, &sa, NULL);
768
769     memset (&sa, 0, sizeof (sa));
770     sa.sa_handler = SIG_IGN;
771     sigaction (SIGPIPE, &sa, NULL);
772   }
773
774   openlog ("rrdd", LOG_PID, LOG_DAEMON);
775
776   cache_tree = g_tree_new ((GCompareFunc) strcmp);
777   if (cache_tree == NULL)
778   {
779     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
780     return (-1);
781   }
782
783   memset (&queue_thread, 0, sizeof (queue_thread));
784   status = pthread_create (&queue_thread, /* attr = */ NULL,
785       queue_thread_main, /* args = */ NULL);
786   if (status != 0)
787   {
788     RRDD_LOG (LOG_ERR, "daemonize: pthread_create failed.");
789     return (-1);
790   }
791
792   return (0);
793 } /* }}} int daemonize */
794
795 static int cleanup (void) /* {{{ */
796 {
797   RRDD_LOG (LOG_DEBUG, "cleanup ()");
798
799   do_shutdown++;
800
801   RRDD_LOG (LOG_DEBUG, "cleanup: Joining queue_thread..");
802   pthread_cond_signal (&cache_cond);
803   pthread_join (queue_thread, /* return = */ NULL);
804   RRDD_LOG (LOG_DEBUG, "cleanup: done");
805
806   closelog ();
807
808   return (0);
809 } /* }}} int cleanup */
810
811 static int read_options (int argc, char **argv) /* {{{ */
812 {
813   int option;
814   int status = 0;
815
816   while ((option = getopt(argc, argv, "l:f:w:h?")) != -1)
817   {
818     switch (option)
819     {
820       case 'l':
821       {
822         char **temp;
823
824         temp = (char **) realloc (config_listen_address_list,
825             sizeof (char *) * (config_listen_address_list_len + 1));
826         if (temp == NULL)
827         {
828           fprintf (stderr, "read_options: realloc failed.\n");
829           return (2);
830         }
831         config_listen_address_list = temp;
832
833         temp[config_listen_address_list_len] = strdup (optarg);
834         if (temp[config_listen_address_list_len] == NULL)
835         {
836           fprintf (stderr, "read_options: strdup failed.\n");
837           return (2);
838         }
839         config_listen_address_list_len++;
840       }
841       break;
842
843       case 'f':
844       {
845         int temp;
846
847         temp = atoi (optarg);
848         if (temp > 0)
849           config_flush_interval = temp;
850         else
851         {
852           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
853           status = 3;
854         }
855       }
856       break;
857
858       case 'w':
859       {
860         int temp;
861
862         temp = atoi (optarg);
863         if (temp > 0)
864           config_write_interval = temp;
865         else
866         {
867           fprintf (stderr, "Invalid write interval: %s\n", optarg);
868           status = 2;
869         }
870       }
871       break;
872
873       case 'h':
874       case '?':
875         printf ("RRDd %s  Copyright (C) 2008 Florian octo Forster\n"
876             "\n"
877             "Usage: rrdd [options]\n"
878             "\n"
879             "Valid options are:\n"
880             "  -l <address>  Socket address to listen to.\n"
881             "  -w <seconds>  Interval in which to write data.\n"
882             "  -f <seconds>  Interval in which to flush dead data.\n"
883             "\n"
884             "For more information and a detailed description of all options "
885             "please refer\n"
886             "to the rrdd(1) manual page.\n",
887             PACKAGE_VERSION);
888         status = -1;
889         break;
890     } /* switch (option) */
891   } /* while (getopt) */
892
893   return (status);
894 } /* }}} int read_options */
895
896 int main (int argc, char **argv)
897 {
898   int status;
899
900   status = read_options (argc, argv);
901   if (status != 0)
902   {
903     if (status < 0)
904       status = 0;
905     return (status);
906   }
907
908   status = daemonize ();
909   if (status == 1)
910   {
911     struct sigaction sigchld;
912
913     memset (&sigchld, 0, sizeof (sigchld));
914     sigchld.sa_handler = SIG_IGN;
915     sigaction (SIGCHLD, &sigchld, NULL);
916
917     return (0);
918   }
919   else if (status != 0)
920   {
921     fprintf (stderr, "daemonize failed, exiting.\n");
922     return (1);
923   }
924
925   listen_thread_main (NULL);
926
927   cleanup ();
928
929   return (0);
930 } /* int main */
931
932 /*
933  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
934  */