bd9db79aef4bf215fe1052592122eab5fa65e506
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; only version 2 of the License is applicable.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
17  *
18  * Authors:
19  *   Florian octo Forster <octo at verplant.org>
20  **/
21
22 /*
23  * First tell the compiler to stick to the C99 and POSIX standards as close as
24  * possible.
25  */
26 #ifndef __STRICT_ANSI__ /* {{{ */
27 # define __STRICT_ANSI__
28 #endif
29
30 #ifndef _ISOC99_SOURCE
31 # define _ISOC99_SOURCE
32 #endif
33
34 #ifdef _POSIX_C_SOURCE
35 # undef _POSIX_C_SOURCE
36 #endif
37 #define _POSIX_C_SOURCE 200112L
38
39 /* Single UNIX needed for strdup. */
40 #ifdef _XOPEN_SOURCE
41 # undef _XOPEN_SOURCE
42 #endif
43 #define _XOPEN_SOURCE 500
44
45 #ifndef _REENTRANT
46 # define _REENTRANT
47 #endif
48
49 #ifndef _THREAD_SAFE
50 # define _THREAD_SAFE
51 #endif
52
53 #ifdef _GNU_SOURCE
54 # undef _GNU_SOURCE
55 #endif
56 /* }}} */
57
58 /*
59  * Now for some includes..
60  */
61 #include "rrd.h" /* {{{ */
62 #include "rrd_client.h"
63
64 #include <stdlib.h>
65 #include <stdint.h>
66 #include <stdio.h>
67 #include <unistd.h>
68 #include <string.h>
69
70 #include <sys/types.h>
71 #include <sys/stat.h>
72 #include <fcntl.h>
73 #include <signal.h>
74 #include <sys/socket.h>
75 #include <sys/un.h>
76 #include <netdb.h>
77 #include <poll.h>
78 #include <syslog.h>
79 #include <pthread.h>
80 #include <errno.h>
81 #include <assert.h>
82 #include <sys/time.h>
83 #include <time.h>
84
85 #include <glib-2.0/glib.h>
86 /* }}} */
87
88 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
89
90 #ifndef __GNUC__
91 # define __attribute__(x) /**/
92 #endif
93
94 /*
95  * Types
96  */
97 struct listen_socket_s
98 {
99   int fd;
100   char path[PATH_MAX + 1];
101 };
102 typedef struct listen_socket_s listen_socket_t;
103
104 struct cache_item_s;
105 typedef struct cache_item_s cache_item_t;
106 struct cache_item_s
107 {
108   char *file;
109   char **values;
110   int values_num;
111   time_t last_flush_time;
112 #define CI_FLAGS_IN_TREE  0x01
113 #define CI_FLAGS_IN_QUEUE 0x02
114   int flags;
115
116   cache_item_t *next;
117 };
118
119 enum queue_side_e
120 {
121   HEAD,
122   TAIL
123 };
124 typedef enum queue_side_e queue_side_t;
125
126 /*
127  * Variables
128  */
129 static listen_socket_t *listen_fds = NULL;
130 static size_t listen_fds_num = 0;
131
132 static int do_shutdown = 0;
133
134 static pthread_t queue_thread;
135
136 static pthread_t *connetion_threads = NULL;
137 static pthread_mutex_t connetion_threads_lock = PTHREAD_MUTEX_INITIALIZER;
138 static int connetion_threads_num = 0;
139
140 /* Cache stuff */
141 static GTree          *cache_tree = NULL;
142 static cache_item_t   *cache_queue_head = NULL;
143 static cache_item_t   *cache_queue_tail = NULL;
144 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
145 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
146
147 static pthread_cond_t  flush_cond = PTHREAD_COND_INITIALIZER;
148
149 static int config_write_interval = 300;
150 static int config_flush_interval = 3600;
151
152 static char **config_listen_address_list = NULL;
153 static int config_listen_address_list_len = 0;
154
155 /* 
156  * Functions
157  */
158 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
159 {
160   do_shutdown++;
161 } /* }}} void sig_int_handler */
162
163 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
164 {
165   do_shutdown++;
166 } /* }}} void sig_term_handler */
167
168 /*
169  * enqueue_cache_item:
170  * `cache_lock' must be acquired before calling this function!
171  */
172 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
173     queue_side_t side)
174 {
175   RRDD_LOG (LOG_DEBUG, "enqueue_cache_item: Adding %s to the update queue.",
176       ci->file);
177
178   if (ci == NULL)
179     return (-1);
180
181   if (ci->values_num == 0)
182     return (0);
183
184   if (side == HEAD)
185   {
186     if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
187     {
188       assert (ci->next == NULL);
189       ci->next = cache_queue_head;
190       cache_queue_head = ci;
191
192       if (cache_queue_tail == NULL)
193         cache_queue_tail = cache_queue_head;
194     }
195     else if (cache_queue_head == ci)
196     {
197       /* do nothing */
198     }
199     else /* enqueued, but not first entry */
200     {
201       cache_item_t *prev;
202
203       /* find previous entry */
204       for (prev = cache_queue_head; prev != NULL; prev = prev->next)
205         if (prev->next == ci)
206           break;
207       assert (prev != NULL);
208
209       /* move to the front */
210       prev->next = ci->next;
211       ci->next = cache_queue_head;
212       cache_queue_head = ci;
213
214       /* check if we need to adapt the tail */
215       if (cache_queue_tail == ci)
216         cache_queue_tail = prev;
217     }
218   }
219   else /* (side == TAIL) */
220   {
221     /* We don't move values back in the list.. */
222     if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
223       return (0);
224
225     assert (ci->next == NULL);
226
227     if (cache_queue_tail == NULL)
228       cache_queue_head = ci;
229     else
230       cache_queue_tail->next = ci;
231     cache_queue_tail = ci;
232   }
233
234   ci->flags |= CI_FLAGS_IN_QUEUE;
235
236   return (0);
237 } /* }}} int enqueue_cache_item */
238
239 /*
240  * tree_callback_flush:
241  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
242  * while this is in progress.
243  */
244 static gboolean tree_callback_flush (gpointer key /* {{{ */
245     __attribute__((unused)), gpointer value, gpointer data)
246 {
247   cache_item_t *ci;
248   time_t now;
249
250   key = NULL; /* make compiler happy */
251
252   ci = (cache_item_t *) value;
253   now = *((time_t *) data);
254
255   if (((now - ci->last_flush_time) >= config_write_interval)
256       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
257       && (ci->values_num > 0))
258     enqueue_cache_item (ci, TAIL);
259
260   return (TRUE);
261 } /* }}} gboolean tree_callback_flush */
262
263 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
264 {
265   struct timeval now;
266   struct timespec next_flush;
267
268   gettimeofday (&now, NULL);
269   next_flush.tv_sec = now.tv_sec + config_flush_interval;
270   next_flush.tv_nsec = 1000 * now.tv_usec;
271
272   pthread_mutex_lock (&cache_lock);
273   while ((do_shutdown == 0) || (cache_queue_head != NULL))
274   {
275     cache_item_t *ci;
276     char *file;
277     char **values;
278     int values_num;
279     int status;
280     int i;
281
282     /* First, check if it's time to do the cache flush. */
283     gettimeofday (&now, NULL);
284     if ((now.tv_sec > next_flush.tv_sec)
285         || ((now.tv_sec == next_flush.tv_sec)
286           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
287     {
288       time_t time_now;
289
290       /* Pass the current time as user data so that we don't need to call
291        * `time' for each node. */
292       time_now = time (NULL);
293
294       g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &time_now);
295
296       /* Determine the time of the next cache flush. */
297       while (next_flush.tv_sec < now.tv_sec)
298         next_flush.tv_sec += config_flush_interval;
299     }
300
301     /* Now, check if there's something to store away. If not, wait until
302      * something comes in or it's time to do the cache flush. */
303     if (cache_queue_head == NULL)
304     {
305       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
306       if ((status != 0) && (status != ETIMEDOUT))
307       {
308         RRDD_LOG (LOG_ERR, "queue_thread_main: "
309             "pthread_cond_timedwait returned %i.", status);
310       }
311     }
312
313     /* Check if a value has arrived. This may be NULL if we timed out or there
314      * was an interrupt such as a signal. */
315     if (cache_queue_head == NULL)
316       continue;
317
318     ci = cache_queue_head;
319
320     /* copy the relevant parts */
321     file = strdup (ci->file);
322     if (file == NULL)
323     {
324       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
325       continue;
326     }
327
328     values = ci->values;
329     values_num = ci->values_num;
330
331     ci->values = NULL;
332     ci->values_num = 0;
333
334     ci->last_flush_time = time (NULL);
335     ci->flags &= ~(CI_FLAGS_IN_QUEUE);
336
337     cache_queue_head = ci->next;
338     if (cache_queue_head == NULL)
339       cache_queue_tail = NULL;
340     ci->next = NULL;
341
342     pthread_mutex_unlock (&cache_lock);
343
344     RRDD_LOG (LOG_DEBUG, "queue_thread_main: rrd_update (%s, %i, %p)",
345         file, values_num, (void *) values);
346
347     status = rrd_update_r (file, NULL, values_num, (void *) values);
348     if (status != 0)
349     {
350       RRDD_LOG (LOG_ERR, "queue_thread_main: "
351           "rrd_update_r failed with status %i.",
352           status);
353     }
354
355     free (file);
356     for (i = 0; i < values_num; i++)
357       free (values[i]);
358
359     pthread_mutex_lock (&cache_lock);
360     pthread_cond_broadcast (&flush_cond);
361   } /* while (do_shutdown == 0) */
362   pthread_mutex_unlock (&cache_lock);
363
364   RRDD_LOG (LOG_DEBUG, "queue_thread_main: Exiting.");
365
366   return (NULL);
367 } /* }}} void *queue_thread_main */
368
369 static int buffer_get_field (char **buffer_ret, /* {{{ */
370     size_t *buffer_size_ret, char **field_ret)
371 {
372   char *buffer;
373   size_t buffer_pos;
374   size_t buffer_size;
375   char *field;
376   size_t field_size;
377   int status;
378
379   buffer = *buffer_ret;
380   buffer_pos = 0;
381   buffer_size = *buffer_size_ret;
382   field = *buffer_ret;
383   field_size = 0;
384
385   /* This is ensured by `handle_request'. */
386   assert (buffer[buffer_size - 1] == ' ');
387
388   status = -1;
389   while (buffer_pos < buffer_size)
390   {
391     /* Check for end-of-field or end-of-buffer */
392     if (buffer[buffer_pos] == ' ')
393     {
394       field[field_size] = 0;
395       field_size++;
396       buffer_pos++;
397       status = 0;
398       break;
399     }
400     /* Handle escaped characters. */
401     else if (buffer[buffer_pos] == '\\')
402     {
403       if (buffer_pos >= (buffer_size - 1))
404         break;
405       buffer_pos++;
406       field[field_size] = buffer[buffer_pos];
407       field_size++;
408       buffer_pos++;
409     }
410     /* Normal operation */ 
411     else
412     {
413       field[field_size] = buffer[buffer_pos];
414       field_size++;
415       buffer_pos++;
416     }
417   } /* while (buffer_pos < buffer_size) */
418
419   if (status != 0)
420     return (status);
421
422   *buffer_ret = buffer + buffer_pos;
423   *buffer_size_ret = buffer_size - buffer_pos;
424   *field_ret = field;
425
426   return (0);
427 } /* }}} int buffer_get_field */
428
429 static int flush_file (const char *filename) /* {{{ */
430 {
431   cache_item_t *ci;
432
433   pthread_mutex_lock (&cache_lock);
434
435   ci = g_tree_lookup (cache_tree, filename);
436   if (ci == NULL)
437   {
438     pthread_mutex_unlock (&cache_lock);
439     return (ENOENT);
440   }
441
442   /* Enqueue at head */
443   enqueue_cache_item (ci, HEAD);
444   pthread_cond_signal (&cache_cond);
445
446   while ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
447   {
448     ci = NULL;
449
450     pthread_cond_wait (&flush_cond, &cache_lock);
451
452     ci = g_tree_lookup (cache_tree, filename);
453     if (ci == NULL)
454     {
455       RRDD_LOG (LOG_ERR, "flush_file: Tree node went away "
456           "while waiting for flush.");
457       pthread_mutex_unlock (&cache_lock);
458       return (-1);
459     }
460   }
461
462   pthread_mutex_unlock (&cache_lock);
463   return (0);
464 } /* }}} int flush_file */
465
466 static int handle_request_flush (int fd, /* {{{ */
467     char *buffer, size_t buffer_size)
468 {
469   char *file;
470   int status;
471   char result[4096];
472
473   status = buffer_get_field (&buffer, &buffer_size, &file);
474   if (status != 0)
475   {
476     RRDD_LOG (LOG_INFO, "handle_request_flush: Cannot get file name.");
477     return (-1);
478   }
479
480   status = flush_file (file);
481   if (status == 0)
482     snprintf (result, sizeof (result), "0 Successfully flushed %s.\n", file);
483   else if (status == ENOENT)
484     snprintf (result, sizeof (result), "-1 No such file: %s.\n", file);
485   else if (status < 0)
486     strncpy (result, "-1 Internal error.\n", sizeof (result));
487   else
488     snprintf (result, sizeof (result), "-1 Failed with status %i.\n", status);
489   result[sizeof (result) - 1] = 0;
490
491   status = write (fd, result, strlen (result));
492   if (status < 0)
493   {
494     status = errno;
495     RRDD_LOG (LOG_INFO, "handle_request_flush: write(2) returned an error.");
496     return (status);
497   }
498
499   return (0);
500 } /* }}} int handle_request_flush */
501
502 static int handle_request_update (int fd, /* {{{ */
503     char *buffer, size_t buffer_size)
504 {
505   char *file;
506   int values_num = 0;
507   int status;
508
509   time_t now;
510
511   cache_item_t *ci;
512   char answer[4096];
513
514   now = time (NULL);
515
516   status = buffer_get_field (&buffer, &buffer_size, &file);
517   if (status != 0)
518   {
519     RRDD_LOG (LOG_INFO, "handle_request_update: Cannot get file name.");
520     return (-1);
521   }
522
523   pthread_mutex_lock (&cache_lock);
524
525   ci = g_tree_lookup (cache_tree, file);
526   if (ci == NULL) /* {{{ */
527   {
528     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
529     if (ci == NULL)
530     {
531       pthread_mutex_unlock (&cache_lock);
532       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
533       return (-1);
534     }
535     memset (ci, 0, sizeof (cache_item_t));
536
537     ci->file = strdup (file);
538     if (ci->file == NULL)
539     {
540       pthread_mutex_unlock (&cache_lock);
541       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
542       free (ci);
543       return (-1);
544     }
545
546     ci->values = NULL;
547     ci->values_num = 0;
548     ci->last_flush_time = now;
549     ci->flags = CI_FLAGS_IN_TREE;
550
551     g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
552
553     RRDD_LOG (LOG_DEBUG, "handle_request_update: Created new tree node %s.",
554         ci->file);
555   } /* }}} */
556   assert (ci != NULL);
557
558   while (buffer_size > 0)
559   {
560     char **temp;
561     char *value;
562
563     status = buffer_get_field (&buffer, &buffer_size, &value);
564     if (status != 0)
565     {
566       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
567       break;
568     }
569
570     temp = (char **) realloc (ci->values,
571         sizeof (char *) * (ci->values_num + 1));
572     if (temp == NULL)
573     {
574       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
575       continue;
576     }
577     ci->values = temp;
578
579     ci->values[ci->values_num] = strdup (value);
580     if (ci->values[ci->values_num] == NULL)
581     {
582       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
583       continue;
584     }
585     ci->values_num++;
586
587     values_num++;
588   }
589
590   if (((now - ci->last_flush_time) >= config_write_interval)
591       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
592       && (ci->values_num > 0))
593   {
594     enqueue_cache_item (ci, TAIL);
595     pthread_cond_signal (&cache_cond);
596   }
597
598   pthread_mutex_unlock (&cache_lock);
599
600   snprintf (answer, sizeof (answer), "0 Enqueued %i value(s)\n", values_num);
601   answer[sizeof (answer) - 1] = 0;
602
603   status = write (fd, answer, strlen (answer));
604   if (status < 0)
605   {
606     status = errno;
607     RRDD_LOG (LOG_INFO, "handle_request_update: write(2) returned an error.");
608     return (status);
609   }
610
611   return (0);
612 } /* }}} int handle_request_update */
613
614 static int handle_request (int fd) /* {{{ */
615 {
616   char buffer[4096];
617   size_t buffer_size;
618   char *buffer_ptr;
619   char *command;
620   int status;
621
622   status = read (fd, buffer, sizeof (buffer));
623   if (status < 1)
624   {
625     RRDD_LOG (LOG_ERR, "handle_request: read(2) failed.");
626     return (-1);
627   }
628   buffer_size = status;
629   assert (((size_t) buffer_size) <= sizeof (buffer));
630
631   if (buffer[buffer_size - 1] != '\n')
632   {
633     RRDD_LOG (LOG_INFO, "handle_request: malformed request.");
634     return (-1);
635   }
636   /* Place the normal field separator at the end to simplify
637    * `buffer_get_field's work. */
638   buffer[buffer_size - 1] = ' ';
639
640   buffer_ptr = buffer;
641   command = NULL;
642   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
643   if (status != 0)
644   {
645     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
646     return (-1);
647   }
648
649   if (strcmp (command, "update") == 0)
650   {
651     return (handle_request_update (fd, buffer_ptr, buffer_size));
652   }
653   else if (strcmp (command, "flush") == 0)
654   {
655     return (handle_request_flush (fd, buffer_ptr, buffer_size));
656   }
657   else
658   {
659     RRDD_LOG (LOG_INFO, "handle_request: unknown command: %s.", buffer);
660     return (-1);
661   }
662 } /* }}} int handle_request */
663
664 static void *connection_thread_main (void *args /* {{{ */
665     __attribute__((unused)))
666 {
667   pthread_t self;
668   int i;
669   int fd;
670   
671   fd = *((int *) args);
672
673   pthread_mutex_lock (&connetion_threads_lock);
674   {
675     pthread_t *temp;
676
677     temp = (pthread_t *) realloc (connetion_threads,
678         sizeof (pthread_t) * (connetion_threads_num + 1));
679     if (temp == NULL)
680     {
681       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
682     }
683     else
684     {
685       connetion_threads = temp;
686       connetion_threads[connetion_threads_num] = pthread_self ();
687       connetion_threads_num++;
688     }
689   }
690   pthread_mutex_unlock (&connetion_threads_lock);
691
692   while (do_shutdown == 0)
693   {
694     struct pollfd pollfd;
695     int status;
696
697     pollfd.fd = fd;
698     pollfd.events = POLLIN | POLLPRI;
699     pollfd.revents = 0;
700
701     status = poll (&pollfd, 1, /* timeout = */ 500);
702     if (status == 0) /* timeout */
703       continue;
704     else if (status < 0) /* error */
705     {
706       status = errno;
707       if (status == EINTR)
708         continue;
709       RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
710       continue;
711     }
712
713     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
714     {
715       close (fd);
716       break;
717     }
718     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
719     {
720       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
721           "poll(2) returned something unexpected: %#04hx",
722           pollfd.revents);
723       close (fd);
724       break;
725     }
726
727     status = handle_request (fd);
728     if (status != 0)
729     {
730       close (fd);
731       break;
732     }
733   }
734
735   self = pthread_self ();
736   /* Remove this thread from the connection threads list */
737   pthread_mutex_lock (&connetion_threads_lock);
738   /* Find out own index in the array */
739   for (i = 0; i < connetion_threads_num; i++)
740     if (pthread_equal (connetion_threads[i], self) != 0)
741       break;
742   assert (i < connetion_threads_num);
743
744   /* Move the trailing threads forward. */
745   if (i < (connetion_threads_num - 1))
746   {
747     memmove (connetion_threads + i,
748         connetion_threads + i + 1,
749         sizeof (pthread_t) * (connetion_threads_num - i - 1));
750   }
751
752   connetion_threads_num--;
753   pthread_mutex_unlock (&connetion_threads_lock);
754
755   free (args);
756   return (NULL);
757 } /* }}} void *connection_thread_main */
758
759 static int open_listen_socket_unix (const char *path) /* {{{ */
760 {
761   int fd;
762   struct sockaddr_un sa;
763   listen_socket_t *temp;
764   int status;
765
766   temp = (listen_socket_t *) realloc (listen_fds,
767       sizeof (listen_fds[0]) * (listen_fds_num + 1));
768   if (temp == NULL)
769   {
770     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
771     return (-1);
772   }
773   listen_fds = temp;
774   memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
775
776   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
777   if (fd < 0)
778   {
779     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
780     return (-1);
781   }
782
783   memset (&sa, 0, sizeof (sa));
784   sa.sun_family = AF_UNIX;
785   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
786
787   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
788   if (status != 0)
789   {
790     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
791     close (fd);
792     unlink (path);
793     return (-1);
794   }
795
796   status = listen (fd, /* backlog = */ 10);
797   if (status != 0)
798   {
799     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
800     close (fd);
801     unlink (path);
802     return (-1);
803   }
804   
805   listen_fds[listen_fds_num].fd = fd;
806   snprintf (listen_fds[listen_fds_num].path,
807       sizeof (listen_fds[listen_fds_num].path) - 1,
808       "unix:%s", path);
809   listen_fds_num++;
810
811   return (0);
812 } /* }}} int open_listen_socket_unix */
813
814 static int open_listen_socket (const char *addr) /* {{{ */
815 {
816   struct addrinfo ai_hints;
817   struct addrinfo *ai_res;
818   struct addrinfo *ai_ptr;
819   int status;
820
821   assert (addr != NULL);
822
823   if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
824     return (open_listen_socket_unix (addr + strlen ("unix:")));
825   else if (addr[0] == '/')
826     return (open_listen_socket_unix (addr));
827
828   memset (&ai_hints, 0, sizeof (ai_hints));
829   ai_hints.ai_flags = 0;
830 #ifdef AI_ADDRCONFIG
831   ai_hints.ai_flags |= AI_ADDRCONFIG;
832 #endif
833   ai_hints.ai_family = AF_UNSPEC;
834   ai_hints.ai_socktype = SOCK_STREAM;
835
836   ai_res = NULL;
837   status = getaddrinfo (addr, DEFAULT_PORT, &ai_hints, &ai_res);
838   if (status != 0)
839   {
840     RRDD_LOG (LOG_ERR, "open_listen_socket: getaddrinfo(%s) failed: "
841         "%s", addr, gai_strerror (status));
842     return (-1);
843   }
844
845   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
846   {
847     int fd;
848     listen_socket_t *temp;
849
850     temp = (listen_socket_t *) realloc (listen_fds,
851         sizeof (listen_fds[0]) * (listen_fds_num + 1));
852     if (temp == NULL)
853     {
854       RRDD_LOG (LOG_ERR, "open_listen_socket: realloc failed.");
855       continue;
856     }
857     listen_fds = temp;
858     memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
859
860     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
861     if (fd < 0)
862     {
863       RRDD_LOG (LOG_ERR, "open_listen_socket: socket(2) failed.");
864       continue;
865     }
866
867     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
868     if (status != 0)
869     {
870       RRDD_LOG (LOG_ERR, "open_listen_socket: bind(2) failed.");
871       close (fd);
872       continue;
873     }
874
875     status = listen (fd, /* backlog = */ 10);
876     if (status != 0)
877     {
878       RRDD_LOG (LOG_ERR, "open_listen_socket: listen(2) failed.");
879       close (fd);
880       return (-1);
881     }
882
883     listen_fds[listen_fds_num].fd = fd;
884     strncpy (listen_fds[listen_fds_num].path, addr,
885         sizeof (listen_fds[listen_fds_num].path) - 1);
886     listen_fds_num++;
887   } /* for (ai_ptr) */
888
889   return (0);
890 } /* }}} int open_listen_socket */
891
892 static int close_listen_sockets (void) /* {{{ */
893 {
894   size_t i;
895
896   for (i = 0; i < listen_fds_num; i++)
897   {
898     close (listen_fds[i].fd);
899     if (strncmp ("unix:", listen_fds[i].path, strlen ("unix:")) == 0)
900       unlink (listen_fds[i].path + strlen ("unix:"));
901   }
902
903   free (listen_fds);
904   listen_fds = NULL;
905   listen_fds_num = 0;
906
907   return (0);
908 } /* }}} int close_listen_sockets */
909
910 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
911 {
912   struct pollfd *pollfds;
913   int pollfds_num;
914   int status;
915   int i;
916
917   for (i = 0; i < config_listen_address_list_len; i++)
918   {
919     RRDD_LOG (LOG_DEBUG, "listen_thread_main: config_listen_address_list[%i] "
920         "= %s", i, config_listen_address_list[i]);
921     open_listen_socket (config_listen_address_list[i]);
922   }
923
924   if (config_listen_address_list_len < 1)
925     open_listen_socket (RRDD_SOCK_PATH);
926
927   if (listen_fds_num < 1)
928   {
929     RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
930         "could be opened. Sorry.");
931     return (NULL);
932   }
933
934   pollfds_num = listen_fds_num;
935   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
936   if (pollfds == NULL)
937   {
938     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
939     return (NULL);
940   }
941   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
942
943   while (do_shutdown == 0)
944   {
945     assert (pollfds_num == ((int) listen_fds_num));
946     for (i = 0; i < pollfds_num; i++)
947     {
948       pollfds[i].fd = listen_fds[i].fd;
949       pollfds[i].events = POLLIN | POLLPRI;
950       pollfds[i].revents = 0;
951     }
952
953     status = poll (pollfds, pollfds_num, /* timeout = */ -1);
954     if (status < 1)
955     {
956       status = errno;
957       if (status != EINTR)
958       {
959         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
960       }
961       continue;
962     }
963
964     for (i = 0; i < pollfds_num; i++)
965     {
966       int *client_sd;
967       struct sockaddr_storage client_sa;
968       socklen_t client_sa_size;
969       pthread_t tid;
970
971       if (pollfds[i].revents == 0)
972         continue;
973
974       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
975       {
976         RRDD_LOG (LOG_ERR, "listen_thread_main: "
977             "poll(2) returned something unexpected for listen FD #%i.",
978             pollfds[i].fd);
979         continue;
980       }
981
982       client_sd = (int *) malloc (sizeof (int));
983       if (client_sd == NULL)
984       {
985         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
986         continue;
987       }
988
989       client_sa_size = sizeof (client_sa);
990       *client_sd = accept (pollfds[i].fd,
991           (struct sockaddr *) &client_sa, &client_sa_size);
992       if (*client_sd < 0)
993       {
994         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
995         continue;
996       }
997
998       status = pthread_create (&tid, /* attr = */ NULL, connection_thread_main,
999           /* args = */ (void *) client_sd);
1000       if (status != 0)
1001       {
1002         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
1003         close (*client_sd);
1004         free (client_sd);
1005         continue;
1006       }
1007     } /* for (pollfds_num) */
1008   } /* while (do_shutdown == 0) */
1009
1010   close_listen_sockets ();
1011
1012   pthread_mutex_lock (&connetion_threads_lock);
1013   while (connetion_threads_num > 0)
1014   {
1015     pthread_t wait_for;
1016
1017     wait_for = connetion_threads[0];
1018
1019     pthread_mutex_unlock (&connetion_threads_lock);
1020     pthread_join (wait_for, /* retval = */ NULL);
1021     pthread_mutex_lock (&connetion_threads_lock);
1022   }
1023   pthread_mutex_unlock (&connetion_threads_lock);
1024
1025   RRDD_LOG (LOG_DEBUG, "listen_thread_main: Exiting.");
1026
1027   return (NULL);
1028 } /* }}} void *listen_thread_main */
1029
1030 static int daemonize (void) /* {{{ */
1031 {
1032   pid_t child;
1033   int status;
1034
1035   child = fork ();
1036   if (child < 0)
1037   {
1038     fprintf (stderr, "daemonize: fork(2) failed.\n");
1039     return (-1);
1040   }
1041   else if (child > 0)
1042   {
1043     return (1);
1044   }
1045
1046   /* Change into the /tmp directory. */
1047   chdir ("/tmp");
1048
1049   /* Become session leader */
1050   setsid ();
1051
1052   /* Open the first three file descriptors to /dev/null */
1053   close (2);
1054   close (1);
1055   close (0);
1056
1057   open ("/dev/null", O_RDWR);
1058   dup (0);
1059   dup (0);
1060
1061   {
1062     struct sigaction sa;
1063
1064     memset (&sa, 0, sizeof (sa));
1065     sa.sa_handler = sig_int_handler;
1066     sigaction (SIGINT, &sa, NULL);
1067
1068     memset (&sa, 0, sizeof (sa));
1069     sa.sa_handler = sig_term_handler;
1070     sigaction (SIGINT, &sa, NULL);
1071
1072     memset (&sa, 0, sizeof (sa));
1073     sa.sa_handler = SIG_IGN;
1074     sigaction (SIGPIPE, &sa, NULL);
1075   }
1076
1077   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
1078
1079   cache_tree = g_tree_new ((GCompareFunc) strcmp);
1080   if (cache_tree == NULL)
1081   {
1082     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
1083     return (-1);
1084   }
1085
1086   memset (&queue_thread, 0, sizeof (queue_thread));
1087   status = pthread_create (&queue_thread, /* attr = */ NULL,
1088       queue_thread_main, /* args = */ NULL);
1089   if (status != 0)
1090   {
1091     RRDD_LOG (LOG_ERR, "daemonize: pthread_create failed.");
1092     return (-1);
1093   }
1094
1095   return (0);
1096 } /* }}} int daemonize */
1097
1098 static int cleanup (void) /* {{{ */
1099 {
1100   RRDD_LOG (LOG_DEBUG, "cleanup ()");
1101
1102   do_shutdown++;
1103
1104   RRDD_LOG (LOG_DEBUG, "cleanup: Joining queue_thread..");
1105   pthread_cond_signal (&cache_cond);
1106   pthread_join (queue_thread, /* return = */ NULL);
1107   RRDD_LOG (LOG_DEBUG, "cleanup: done");
1108
1109   closelog ();
1110
1111   return (0);
1112 } /* }}} int cleanup */
1113
1114 static int read_options (int argc, char **argv) /* {{{ */
1115 {
1116   int option;
1117   int status = 0;
1118
1119   while ((option = getopt(argc, argv, "l:f:w:h?")) != -1)
1120   {
1121     switch (option)
1122     {
1123       case 'l':
1124       {
1125         char **temp;
1126
1127         temp = (char **) realloc (config_listen_address_list,
1128             sizeof (char *) * (config_listen_address_list_len + 1));
1129         if (temp == NULL)
1130         {
1131           fprintf (stderr, "read_options: realloc failed.\n");
1132           return (2);
1133         }
1134         config_listen_address_list = temp;
1135
1136         temp[config_listen_address_list_len] = strdup (optarg);
1137         if (temp[config_listen_address_list_len] == NULL)
1138         {
1139           fprintf (stderr, "read_options: strdup failed.\n");
1140           return (2);
1141         }
1142         config_listen_address_list_len++;
1143       }
1144       break;
1145
1146       case 'f':
1147       {
1148         int temp;
1149
1150         temp = atoi (optarg);
1151         if (temp > 0)
1152           config_flush_interval = temp;
1153         else
1154         {
1155           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
1156           status = 3;
1157         }
1158       }
1159       break;
1160
1161       case 'w':
1162       {
1163         int temp;
1164
1165         temp = atoi (optarg);
1166         if (temp > 0)
1167           config_write_interval = temp;
1168         else
1169         {
1170           fprintf (stderr, "Invalid write interval: %s\n", optarg);
1171           status = 2;
1172         }
1173       }
1174       break;
1175
1176       case 'h':
1177       case '?':
1178         printf ("RRDd %s  Copyright (C) 2008 Florian octo Forster\n"
1179             "\n"
1180             "Usage: rrdcached [options]\n"
1181             "\n"
1182             "Valid options are:\n"
1183             "  -l <address>  Socket address to listen to.\n"
1184             "  -w <seconds>  Interval in which to write data.\n"
1185             "  -f <seconds>  Interval in which to flush dead data.\n"
1186             "\n"
1187             "For more information and a detailed description of all options "
1188             "please refer\n"
1189             "to the rrdcached(1) manual page.\n",
1190             VERSION);
1191         status = -1;
1192         break;
1193     } /* switch (option) */
1194   } /* while (getopt) */
1195
1196   return (status);
1197 } /* }}} int read_options */
1198
1199 int main (int argc, char **argv)
1200 {
1201   int status;
1202
1203   status = read_options (argc, argv);
1204   if (status != 0)
1205   {
1206     if (status < 0)
1207       status = 0;
1208     return (status);
1209   }
1210
1211   status = daemonize ();
1212   if (status == 1)
1213   {
1214     struct sigaction sigchld;
1215
1216     memset (&sigchld, 0, sizeof (sigchld));
1217     sigchld.sa_handler = SIG_IGN;
1218     sigaction (SIGCHLD, &sigchld, NULL);
1219
1220     return (0);
1221   }
1222   else if (status != 0)
1223   {
1224     fprintf (stderr, "daemonize failed, exiting.\n");
1225     return (1);
1226   }
1227
1228   listen_thread_main (NULL);
1229
1230   cleanup ();
1231
1232   return (0);
1233 } /* int main */
1234
1235 /*
1236  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
1237  */