87ac8e9c581873abf57f99fead9952a28639bd0f
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; only version 2 of the License is applicable.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
17  *
18  * Authors:
19  *   Florian octo Forster <octo at verplant.org>
20  **/
21
22 /*
23  * First tell the compiler to stick to the C99 and POSIX standards as close as
24  * possible.
25  */
26 #ifndef __STRICT_ANSI__ /* {{{ */
27 # define __STRICT_ANSI__
28 #endif
29
30 #ifndef _ISOC99_SOURCE
31 # define _ISOC99_SOURCE
32 #endif
33
34 #ifdef _POSIX_C_SOURCE
35 # undef _POSIX_C_SOURCE
36 #endif
37 #define _POSIX_C_SOURCE 200112L
38
39 /* Single UNIX needed for strdup. */
40 #ifdef _XOPEN_SOURCE
41 # undef _XOPEN_SOURCE
42 #endif
43 #define _XOPEN_SOURCE 500
44
45 #ifndef _REENTRANT
46 # define _REENTRANT
47 #endif
48
49 #ifndef _THREAD_SAFE
50 # define _THREAD_SAFE
51 #endif
52
53 #ifdef _GNU_SOURCE
54 # undef _GNU_SOURCE
55 #endif
56 /* }}} */
57
58 /*
59  * Now for some includes..
60  */
61 #include "rrd.h" /* {{{ */
62 #include "rrd_client.h"
63
64 #include <stdlib.h>
65 #include <stdint.h>
66 #include <stdio.h>
67 #include <unistd.h>
68 #include <string.h>
69
70 #include <sys/types.h>
71 #include <sys/stat.h>
72 #include <fcntl.h>
73 #include <signal.h>
74 #include <sys/socket.h>
75 #include <sys/un.h>
76 #include <netdb.h>
77 #include <poll.h>
78 #include <syslog.h>
79 #include <pthread.h>
80 #include <errno.h>
81 #include <assert.h>
82 #include <sys/time.h>
83 #include <time.h>
84
85 #include <glib-2.0/glib.h>
86 /* }}} */
87
88 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
89
90 #ifndef __GNUC__
91 # define __attribute__(x) /**/
92 #endif
93
94 /*
95  * Types
96  */
97 struct listen_socket_s
98 {
99   int fd;
100   char path[PATH_MAX + 1];
101 };
102 typedef struct listen_socket_s listen_socket_t;
103
104 struct cache_item_s;
105 typedef struct cache_item_s cache_item_t;
106 struct cache_item_s
107 {
108   char *file;
109   char **values;
110   int values_num;
111   time_t last_flush_time;
112 #define CI_FLAGS_IN_TREE  0x01
113 #define CI_FLAGS_IN_QUEUE 0x02
114   int flags;
115
116   cache_item_t *next;
117 };
118
119 enum queue_side_e
120 {
121   HEAD,
122   TAIL
123 };
124 typedef enum queue_side_e queue_side_t;
125
126 /*
127  * Variables
128  */
129 static listen_socket_t *listen_fds = NULL;
130 static size_t listen_fds_num = 0;
131
132 static int do_shutdown = 0;
133
134 static pthread_t queue_thread;
135
136 static pthread_t *connetion_threads = NULL;
137 static pthread_mutex_t connetion_threads_lock = PTHREAD_MUTEX_INITIALIZER;
138 static int connetion_threads_num = 0;
139
140 /* Cache stuff */
141 static GTree          *cache_tree = NULL;
142 static cache_item_t   *cache_queue_head = NULL;
143 static cache_item_t   *cache_queue_tail = NULL;
144 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
145 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
146
147 static pthread_cond_t  flush_cond = PTHREAD_COND_INITIALIZER;
148
149 static int config_write_interval = 300;
150 static int config_flush_interval = 3600;
151 static char *config_pid_file = NULL;
152 static char *config_base_dir = NULL;
153
154 static char **config_listen_address_list = NULL;
155 static int config_listen_address_list_len = 0;
156
157 /* 
158  * Functions
159  */
160 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
161 {
162   do_shutdown++;
163 } /* }}} void sig_int_handler */
164
165 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
166 {
167   do_shutdown++;
168 } /* }}} void sig_term_handler */
169
170 static int write_pidfile (void) /* {{{ */
171 {
172   pid_t pid;
173   char *file;
174   FILE *fh;
175
176   pid = getpid ();
177   
178   file = (config_pid_file != NULL)
179     ? config_pid_file
180     : LOCALSTATEDIR "/run/rrdcached.pid";
181
182   fh = fopen (file, "w");
183   if (fh == NULL)
184   {
185     RRDD_LOG (LOG_ERR, "write_pidfile: Opening `%s' failed.", file);
186     return (-1);
187   }
188
189   fprintf (fh, "%i\n", (int) pid);
190   fclose (fh);
191
192   return (0);
193 } /* }}} int write_pidfile */
194
195 static int remove_pidfile (void) /* {{{ */
196 {
197   char *file;
198   int status;
199
200   file = (config_pid_file != NULL)
201     ? config_pid_file
202     : LOCALSTATEDIR "/run/rrdcached.pid";
203
204   status = unlink (file);
205   if (status == 0)
206     return (0);
207   return (errno);
208 } /* }}} int remove_pidfile */
209
210 /*
211  * enqueue_cache_item:
212  * `cache_lock' must be acquired before calling this function!
213  */
214 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
215     queue_side_t side)
216 {
217   RRDD_LOG (LOG_DEBUG, "enqueue_cache_item: Adding %s to the update queue.",
218       ci->file);
219
220   if (ci == NULL)
221     return (-1);
222
223   if (ci->values_num == 0)
224     return (0);
225
226   if (side == HEAD)
227   {
228     if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
229     {
230       assert (ci->next == NULL);
231       ci->next = cache_queue_head;
232       cache_queue_head = ci;
233
234       if (cache_queue_tail == NULL)
235         cache_queue_tail = cache_queue_head;
236     }
237     else if (cache_queue_head == ci)
238     {
239       /* do nothing */
240     }
241     else /* enqueued, but not first entry */
242     {
243       cache_item_t *prev;
244
245       /* find previous entry */
246       for (prev = cache_queue_head; prev != NULL; prev = prev->next)
247         if (prev->next == ci)
248           break;
249       assert (prev != NULL);
250
251       /* move to the front */
252       prev->next = ci->next;
253       ci->next = cache_queue_head;
254       cache_queue_head = ci;
255
256       /* check if we need to adapt the tail */
257       if (cache_queue_tail == ci)
258         cache_queue_tail = prev;
259     }
260   }
261   else /* (side == TAIL) */
262   {
263     /* We don't move values back in the list.. */
264     if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
265       return (0);
266
267     assert (ci->next == NULL);
268
269     if (cache_queue_tail == NULL)
270       cache_queue_head = ci;
271     else
272       cache_queue_tail->next = ci;
273     cache_queue_tail = ci;
274   }
275
276   ci->flags |= CI_FLAGS_IN_QUEUE;
277
278   return (0);
279 } /* }}} int enqueue_cache_item */
280
281 /*
282  * tree_callback_flush:
283  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
284  * while this is in progress.
285  */
286 static gboolean tree_callback_flush (gpointer key /* {{{ */
287     __attribute__((unused)), gpointer value, gpointer data)
288 {
289   cache_item_t *ci;
290   time_t now;
291
292   key = NULL; /* make compiler happy */
293
294   ci = (cache_item_t *) value;
295   now = *((time_t *) data);
296
297   if (((now - ci->last_flush_time) >= config_write_interval)
298       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
299       && (ci->values_num > 0))
300     enqueue_cache_item (ci, TAIL);
301
302   return (TRUE);
303 } /* }}} gboolean tree_callback_flush */
304
305 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
306 {
307   struct timeval now;
308   struct timespec next_flush;
309
310   gettimeofday (&now, NULL);
311   next_flush.tv_sec = now.tv_sec + config_flush_interval;
312   next_flush.tv_nsec = 1000 * now.tv_usec;
313
314   pthread_mutex_lock (&cache_lock);
315   while ((do_shutdown == 0) || (cache_queue_head != NULL))
316   {
317     cache_item_t *ci;
318     char *file;
319     char **values;
320     int values_num;
321     int status;
322     int i;
323
324     /* First, check if it's time to do the cache flush. */
325     gettimeofday (&now, NULL);
326     if ((now.tv_sec > next_flush.tv_sec)
327         || ((now.tv_sec == next_flush.tv_sec)
328           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
329     {
330       time_t time_now;
331
332       /* Pass the current time as user data so that we don't need to call
333        * `time' for each node. */
334       time_now = time (NULL);
335
336       g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &time_now);
337
338       /* Determine the time of the next cache flush. */
339       while (next_flush.tv_sec < now.tv_sec)
340         next_flush.tv_sec += config_flush_interval;
341     }
342
343     /* Now, check if there's something to store away. If not, wait until
344      * something comes in or it's time to do the cache flush. */
345     if (cache_queue_head == NULL)
346     {
347       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
348       if ((status != 0) && (status != ETIMEDOUT))
349       {
350         RRDD_LOG (LOG_ERR, "queue_thread_main: "
351             "pthread_cond_timedwait returned %i.", status);
352       }
353     }
354
355     /* Check if a value has arrived. This may be NULL if we timed out or there
356      * was an interrupt such as a signal. */
357     if (cache_queue_head == NULL)
358       continue;
359
360     ci = cache_queue_head;
361
362     /* copy the relevant parts */
363     file = strdup (ci->file);
364     if (file == NULL)
365     {
366       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
367       continue;
368     }
369
370     values = ci->values;
371     values_num = ci->values_num;
372
373     ci->values = NULL;
374     ci->values_num = 0;
375
376     ci->last_flush_time = time (NULL);
377     ci->flags &= ~(CI_FLAGS_IN_QUEUE);
378
379     cache_queue_head = ci->next;
380     if (cache_queue_head == NULL)
381       cache_queue_tail = NULL;
382     ci->next = NULL;
383
384     pthread_mutex_unlock (&cache_lock);
385
386     RRDD_LOG (LOG_DEBUG, "queue_thread_main: rrd_update (%s, %i, %p)",
387         file, values_num, (void *) values);
388
389     status = rrd_update_r (file, NULL, values_num, (void *) values);
390     if (status != 0)
391     {
392       RRDD_LOG (LOG_ERR, "queue_thread_main: "
393           "rrd_update_r failed with status %i.",
394           status);
395     }
396
397     free (file);
398     for (i = 0; i < values_num; i++)
399       free (values[i]);
400
401     pthread_mutex_lock (&cache_lock);
402     pthread_cond_broadcast (&flush_cond);
403   } /* while (do_shutdown == 0) */
404   pthread_mutex_unlock (&cache_lock);
405
406   RRDD_LOG (LOG_DEBUG, "queue_thread_main: Exiting.");
407
408   return (NULL);
409 } /* }}} void *queue_thread_main */
410
411 static int buffer_get_field (char **buffer_ret, /* {{{ */
412     size_t *buffer_size_ret, char **field_ret)
413 {
414   char *buffer;
415   size_t buffer_pos;
416   size_t buffer_size;
417   char *field;
418   size_t field_size;
419   int status;
420
421   buffer = *buffer_ret;
422   buffer_pos = 0;
423   buffer_size = *buffer_size_ret;
424   field = *buffer_ret;
425   field_size = 0;
426
427   /* This is ensured by `handle_request'. */
428   assert (buffer[buffer_size - 1] == ' ');
429
430   status = -1;
431   while (buffer_pos < buffer_size)
432   {
433     /* Check for end-of-field or end-of-buffer */
434     if (buffer[buffer_pos] == ' ')
435     {
436       field[field_size] = 0;
437       field_size++;
438       buffer_pos++;
439       status = 0;
440       break;
441     }
442     /* Handle escaped characters. */
443     else if (buffer[buffer_pos] == '\\')
444     {
445       if (buffer_pos >= (buffer_size - 1))
446         break;
447       buffer_pos++;
448       field[field_size] = buffer[buffer_pos];
449       field_size++;
450       buffer_pos++;
451     }
452     /* Normal operation */ 
453     else
454     {
455       field[field_size] = buffer[buffer_pos];
456       field_size++;
457       buffer_pos++;
458     }
459   } /* while (buffer_pos < buffer_size) */
460
461   if (status != 0)
462     return (status);
463
464   *buffer_ret = buffer + buffer_pos;
465   *buffer_size_ret = buffer_size - buffer_pos;
466   *field_ret = field;
467
468   return (0);
469 } /* }}} int buffer_get_field */
470
471 static int flush_file (const char *filename) /* {{{ */
472 {
473   cache_item_t *ci;
474
475   pthread_mutex_lock (&cache_lock);
476
477   ci = g_tree_lookup (cache_tree, filename);
478   if (ci == NULL)
479   {
480     pthread_mutex_unlock (&cache_lock);
481     return (ENOENT);
482   }
483
484   /* Enqueue at head */
485   enqueue_cache_item (ci, HEAD);
486   pthread_cond_signal (&cache_cond);
487
488   while ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
489   {
490     ci = NULL;
491
492     pthread_cond_wait (&flush_cond, &cache_lock);
493
494     ci = g_tree_lookup (cache_tree, filename);
495     if (ci == NULL)
496     {
497       RRDD_LOG (LOG_ERR, "flush_file: Tree node went away "
498           "while waiting for flush.");
499       pthread_mutex_unlock (&cache_lock);
500       return (-1);
501     }
502   }
503
504   pthread_mutex_unlock (&cache_lock);
505   return (0);
506 } /* }}} int flush_file */
507
508 static int handle_request_flush (int fd, /* {{{ */
509     char *buffer, size_t buffer_size)
510 {
511   char *file;
512   int status;
513   char result[4096];
514
515   status = buffer_get_field (&buffer, &buffer_size, &file);
516   if (status != 0)
517   {
518     RRDD_LOG (LOG_INFO, "handle_request_flush: Cannot get file name.");
519     return (-1);
520   }
521
522   status = flush_file (file);
523   if (status == 0)
524     snprintf (result, sizeof (result), "0 Successfully flushed %s.\n", file);
525   else if (status == ENOENT)
526     snprintf (result, sizeof (result), "-1 No such file: %s.\n", file);
527   else if (status < 0)
528     strncpy (result, "-1 Internal error.\n", sizeof (result));
529   else
530     snprintf (result, sizeof (result), "-1 Failed with status %i.\n", status);
531   result[sizeof (result) - 1] = 0;
532
533   status = write (fd, result, strlen (result));
534   if (status < 0)
535   {
536     status = errno;
537     RRDD_LOG (LOG_INFO, "handle_request_flush: write(2) returned an error.");
538     return (status);
539   }
540
541   return (0);
542 } /* }}} int handle_request_flush */
543
544 static int handle_request_update (int fd, /* {{{ */
545     char *buffer, size_t buffer_size)
546 {
547   char *file;
548   int values_num = 0;
549   int status;
550
551   time_t now;
552
553   cache_item_t *ci;
554   char answer[4096];
555
556   now = time (NULL);
557
558   status = buffer_get_field (&buffer, &buffer_size, &file);
559   if (status != 0)
560   {
561     RRDD_LOG (LOG_INFO, "handle_request_update: Cannot get file name.");
562     return (-1);
563   }
564
565   pthread_mutex_lock (&cache_lock);
566
567   ci = g_tree_lookup (cache_tree, file);
568   if (ci == NULL) /* {{{ */
569   {
570     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
571     if (ci == NULL)
572     {
573       pthread_mutex_unlock (&cache_lock);
574       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
575       return (-1);
576     }
577     memset (ci, 0, sizeof (cache_item_t));
578
579     ci->file = strdup (file);
580     if (ci->file == NULL)
581     {
582       pthread_mutex_unlock (&cache_lock);
583       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
584       free (ci);
585       return (-1);
586     }
587
588     ci->values = NULL;
589     ci->values_num = 0;
590     ci->last_flush_time = now;
591     ci->flags = CI_FLAGS_IN_TREE;
592
593     g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
594
595     RRDD_LOG (LOG_DEBUG, "handle_request_update: Created new tree node %s.",
596         ci->file);
597   } /* }}} */
598   assert (ci != NULL);
599
600   while (buffer_size > 0)
601   {
602     char **temp;
603     char *value;
604
605     status = buffer_get_field (&buffer, &buffer_size, &value);
606     if (status != 0)
607     {
608       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
609       break;
610     }
611
612     temp = (char **) realloc (ci->values,
613         sizeof (char *) * (ci->values_num + 1));
614     if (temp == NULL)
615     {
616       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
617       continue;
618     }
619     ci->values = temp;
620
621     ci->values[ci->values_num] = strdup (value);
622     if (ci->values[ci->values_num] == NULL)
623     {
624       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
625       continue;
626     }
627     ci->values_num++;
628
629     values_num++;
630   }
631
632   if (((now - ci->last_flush_time) >= config_write_interval)
633       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
634       && (ci->values_num > 0))
635   {
636     enqueue_cache_item (ci, TAIL);
637     pthread_cond_signal (&cache_cond);
638   }
639
640   pthread_mutex_unlock (&cache_lock);
641
642   snprintf (answer, sizeof (answer), "0 Enqueued %i value(s)\n", values_num);
643   answer[sizeof (answer) - 1] = 0;
644
645   status = write (fd, answer, strlen (answer));
646   if (status < 0)
647   {
648     status = errno;
649     RRDD_LOG (LOG_INFO, "handle_request_update: write(2) returned an error.");
650     return (status);
651   }
652
653   return (0);
654 } /* }}} int handle_request_update */
655
656 static int handle_request (int fd) /* {{{ */
657 {
658   char buffer[4096];
659   size_t buffer_size;
660   char *buffer_ptr;
661   char *command;
662   int status;
663
664   status = read (fd, buffer, sizeof (buffer));
665   if (status == 0)
666   {
667     return (1);
668   }
669   else if (status < 0)
670   {
671     RRDD_LOG (LOG_ERR, "handle_request: read(2) failed.");
672     return (-1);
673   }
674   buffer_size = status;
675   assert (((size_t) buffer_size) <= sizeof (buffer));
676
677   if (buffer[buffer_size - 1] != '\n')
678   {
679     RRDD_LOG (LOG_INFO, "handle_request: malformed request.");
680     return (-1);
681   }
682
683   /* Accept Windows style line endings, too */
684   if ((buffer_size > 2) && (buffer[buffer_size - 2] == '\r'))
685   {
686     buffer_size--;
687     buffer[buffer_size - 1] = '\n';
688   }
689
690   /* Place the normal field separator at the end to simplify
691    * `buffer_get_field's work. */
692   buffer[buffer_size - 1] = ' ';
693
694   buffer_ptr = buffer;
695   command = NULL;
696   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
697   if (status != 0)
698   {
699     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
700     return (-1);
701   }
702
703   if (strcmp (command, "update") == 0)
704   {
705     return (handle_request_update (fd, buffer_ptr, buffer_size));
706   }
707   else if (strcmp (command, "flush") == 0)
708   {
709     return (handle_request_flush (fd, buffer_ptr, buffer_size));
710   }
711   else
712   {
713     RRDD_LOG (LOG_INFO, "handle_request: unknown command: %s.", buffer);
714     return (-1);
715   }
716 } /* }}} int handle_request */
717
718 static void *connection_thread_main (void *args /* {{{ */
719     __attribute__((unused)))
720 {
721   pthread_t self;
722   int i;
723   int fd;
724   
725   fd = *((int *) args);
726
727   pthread_mutex_lock (&connetion_threads_lock);
728   {
729     pthread_t *temp;
730
731     temp = (pthread_t *) realloc (connetion_threads,
732         sizeof (pthread_t) * (connetion_threads_num + 1));
733     if (temp == NULL)
734     {
735       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
736     }
737     else
738     {
739       connetion_threads = temp;
740       connetion_threads[connetion_threads_num] = pthread_self ();
741       connetion_threads_num++;
742     }
743   }
744   pthread_mutex_unlock (&connetion_threads_lock);
745
746   while (do_shutdown == 0)
747   {
748     struct pollfd pollfd;
749     int status;
750
751     pollfd.fd = fd;
752     pollfd.events = POLLIN | POLLPRI;
753     pollfd.revents = 0;
754
755     status = poll (&pollfd, 1, /* timeout = */ 500);
756     if (status == 0) /* timeout */
757       continue;
758     else if (status < 0) /* error */
759     {
760       status = errno;
761       if (status == EINTR)
762         continue;
763       RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
764       continue;
765     }
766
767     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
768     {
769       close (fd);
770       break;
771     }
772     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
773     {
774       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
775           "poll(2) returned something unexpected: %#04hx",
776           pollfd.revents);
777       close (fd);
778       break;
779     }
780
781     status = handle_request (fd);
782     if (status != 0)
783     {
784       close (fd);
785       break;
786     }
787   }
788
789   self = pthread_self ();
790   /* Remove this thread from the connection threads list */
791   pthread_mutex_lock (&connetion_threads_lock);
792   /* Find out own index in the array */
793   for (i = 0; i < connetion_threads_num; i++)
794     if (pthread_equal (connetion_threads[i], self) != 0)
795       break;
796   assert (i < connetion_threads_num);
797
798   /* Move the trailing threads forward. */
799   if (i < (connetion_threads_num - 1))
800   {
801     memmove (connetion_threads + i,
802         connetion_threads + i + 1,
803         sizeof (pthread_t) * (connetion_threads_num - i - 1));
804   }
805
806   connetion_threads_num--;
807   pthread_mutex_unlock (&connetion_threads_lock);
808
809   free (args);
810   return (NULL);
811 } /* }}} void *connection_thread_main */
812
813 static int open_listen_socket_unix (const char *path) /* {{{ */
814 {
815   int fd;
816   struct sockaddr_un sa;
817   listen_socket_t *temp;
818   int status;
819
820   temp = (listen_socket_t *) realloc (listen_fds,
821       sizeof (listen_fds[0]) * (listen_fds_num + 1));
822   if (temp == NULL)
823   {
824     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
825     return (-1);
826   }
827   listen_fds = temp;
828   memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
829
830   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
831   if (fd < 0)
832   {
833     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
834     return (-1);
835   }
836
837   memset (&sa, 0, sizeof (sa));
838   sa.sun_family = AF_UNIX;
839   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
840
841   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
842   if (status != 0)
843   {
844     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
845     close (fd);
846     unlink (path);
847     return (-1);
848   }
849
850   status = listen (fd, /* backlog = */ 10);
851   if (status != 0)
852   {
853     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
854     close (fd);
855     unlink (path);
856     return (-1);
857   }
858   
859   listen_fds[listen_fds_num].fd = fd;
860   snprintf (listen_fds[listen_fds_num].path,
861       sizeof (listen_fds[listen_fds_num].path) - 1,
862       "unix:%s", path);
863   listen_fds_num++;
864
865   return (0);
866 } /* }}} int open_listen_socket_unix */
867
868 static int open_listen_socket (const char *addr) /* {{{ */
869 {
870   struct addrinfo ai_hints;
871   struct addrinfo *ai_res;
872   struct addrinfo *ai_ptr;
873   int status;
874
875   assert (addr != NULL);
876
877   if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
878     return (open_listen_socket_unix (addr + strlen ("unix:")));
879   else if (addr[0] == '/')
880     return (open_listen_socket_unix (addr));
881
882   memset (&ai_hints, 0, sizeof (ai_hints));
883   ai_hints.ai_flags = 0;
884 #ifdef AI_ADDRCONFIG
885   ai_hints.ai_flags |= AI_ADDRCONFIG;
886 #endif
887   ai_hints.ai_family = AF_UNSPEC;
888   ai_hints.ai_socktype = SOCK_STREAM;
889
890   ai_res = NULL;
891   status = getaddrinfo (addr, RRDCACHED_DEFAULT_PORT, &ai_hints, &ai_res);
892   if (status != 0)
893   {
894     RRDD_LOG (LOG_ERR, "open_listen_socket: getaddrinfo(%s) failed: "
895         "%s", addr, gai_strerror (status));
896     return (-1);
897   }
898
899   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
900   {
901     int fd;
902     listen_socket_t *temp;
903
904     temp = (listen_socket_t *) realloc (listen_fds,
905         sizeof (listen_fds[0]) * (listen_fds_num + 1));
906     if (temp == NULL)
907     {
908       RRDD_LOG (LOG_ERR, "open_listen_socket: realloc failed.");
909       continue;
910     }
911     listen_fds = temp;
912     memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
913
914     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
915     if (fd < 0)
916     {
917       RRDD_LOG (LOG_ERR, "open_listen_socket: socket(2) failed.");
918       continue;
919     }
920
921     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
922     if (status != 0)
923     {
924       RRDD_LOG (LOG_ERR, "open_listen_socket: bind(2) failed.");
925       close (fd);
926       continue;
927     }
928
929     status = listen (fd, /* backlog = */ 10);
930     if (status != 0)
931     {
932       RRDD_LOG (LOG_ERR, "open_listen_socket: listen(2) failed.");
933       close (fd);
934       return (-1);
935     }
936
937     listen_fds[listen_fds_num].fd = fd;
938     strncpy (listen_fds[listen_fds_num].path, addr,
939         sizeof (listen_fds[listen_fds_num].path) - 1);
940     listen_fds_num++;
941   } /* for (ai_ptr) */
942
943   return (0);
944 } /* }}} int open_listen_socket */
945
946 static int close_listen_sockets (void) /* {{{ */
947 {
948   size_t i;
949
950   for (i = 0; i < listen_fds_num; i++)
951   {
952     close (listen_fds[i].fd);
953     if (strncmp ("unix:", listen_fds[i].path, strlen ("unix:")) == 0)
954       unlink (listen_fds[i].path + strlen ("unix:"));
955   }
956
957   free (listen_fds);
958   listen_fds = NULL;
959   listen_fds_num = 0;
960
961   return (0);
962 } /* }}} int close_listen_sockets */
963
964 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
965 {
966   struct pollfd *pollfds;
967   int pollfds_num;
968   int status;
969   int i;
970
971   for (i = 0; i < config_listen_address_list_len; i++)
972   {
973     RRDD_LOG (LOG_DEBUG, "listen_thread_main: config_listen_address_list[%i] "
974         "= %s", i, config_listen_address_list[i]);
975     open_listen_socket (config_listen_address_list[i]);
976   }
977
978   if (config_listen_address_list_len < 1)
979     open_listen_socket (RRDCACHED_DEFAULT_ADDRESS);
980
981   if (listen_fds_num < 1)
982   {
983     RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
984         "could be opened. Sorry.");
985     return (NULL);
986   }
987
988   pollfds_num = listen_fds_num;
989   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
990   if (pollfds == NULL)
991   {
992     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
993     return (NULL);
994   }
995   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
996
997   while (do_shutdown == 0)
998   {
999     assert (pollfds_num == ((int) listen_fds_num));
1000     for (i = 0; i < pollfds_num; i++)
1001     {
1002       pollfds[i].fd = listen_fds[i].fd;
1003       pollfds[i].events = POLLIN | POLLPRI;
1004       pollfds[i].revents = 0;
1005     }
1006
1007     status = poll (pollfds, pollfds_num, /* timeout = */ -1);
1008     if (status < 1)
1009     {
1010       status = errno;
1011       if (status != EINTR)
1012       {
1013         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
1014       }
1015       continue;
1016     }
1017
1018     for (i = 0; i < pollfds_num; i++)
1019     {
1020       int *client_sd;
1021       struct sockaddr_storage client_sa;
1022       socklen_t client_sa_size;
1023       pthread_t tid;
1024
1025       if (pollfds[i].revents == 0)
1026         continue;
1027
1028       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
1029       {
1030         RRDD_LOG (LOG_ERR, "listen_thread_main: "
1031             "poll(2) returned something unexpected for listen FD #%i.",
1032             pollfds[i].fd);
1033         continue;
1034       }
1035
1036       client_sd = (int *) malloc (sizeof (int));
1037       if (client_sd == NULL)
1038       {
1039         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1040         continue;
1041       }
1042
1043       client_sa_size = sizeof (client_sa);
1044       *client_sd = accept (pollfds[i].fd,
1045           (struct sockaddr *) &client_sa, &client_sa_size);
1046       if (*client_sd < 0)
1047       {
1048         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
1049         continue;
1050       }
1051
1052       status = pthread_create (&tid, /* attr = */ NULL, connection_thread_main,
1053           /* args = */ (void *) client_sd);
1054       if (status != 0)
1055       {
1056         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
1057         close (*client_sd);
1058         free (client_sd);
1059         continue;
1060       }
1061     } /* for (pollfds_num) */
1062   } /* while (do_shutdown == 0) */
1063
1064   close_listen_sockets ();
1065
1066   pthread_mutex_lock (&connetion_threads_lock);
1067   while (connetion_threads_num > 0)
1068   {
1069     pthread_t wait_for;
1070
1071     wait_for = connetion_threads[0];
1072
1073     pthread_mutex_unlock (&connetion_threads_lock);
1074     pthread_join (wait_for, /* retval = */ NULL);
1075     pthread_mutex_lock (&connetion_threads_lock);
1076   }
1077   pthread_mutex_unlock (&connetion_threads_lock);
1078
1079   RRDD_LOG (LOG_DEBUG, "listen_thread_main: Exiting.");
1080
1081   return (NULL);
1082 } /* }}} void *listen_thread_main */
1083
1084 static int daemonize (void) /* {{{ */
1085 {
1086   pid_t child;
1087   int status;
1088   char *base_dir;
1089
1090   /* These structures are static, because `sigaction' behaves weird if the are
1091    * overwritten.. */
1092   static struct sigaction sa_int;
1093   static struct sigaction sa_term;
1094   static struct sigaction sa_pipe;
1095
1096   child = fork ();
1097   if (child < 0)
1098   {
1099     fprintf (stderr, "daemonize: fork(2) failed.\n");
1100     return (-1);
1101   }
1102   else if (child > 0)
1103   {
1104     return (1);
1105   }
1106
1107   /* Change into the /tmp directory. */
1108   base_dir = (config_base_dir != NULL)
1109     ? config_base_dir
1110     : "/tmp";
1111   status = chdir (base_dir);
1112   if (status != 0)
1113   {
1114     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
1115     return (-1);
1116   }
1117
1118   /* Become session leader */
1119   setsid ();
1120
1121   /* Open the first three file descriptors to /dev/null */
1122   close (2);
1123   close (1);
1124   close (0);
1125
1126   open ("/dev/null", O_RDWR);
1127   dup (0);
1128   dup (0);
1129
1130   /* Install signal handlers */
1131   memset (&sa_int, 0, sizeof (sa_int));
1132   sa_int.sa_handler = sig_int_handler;
1133   sigaction (SIGINT, &sa_int, NULL);
1134
1135   memset (&sa_term, 0, sizeof (sa_term));
1136   sa_term.sa_handler = sig_term_handler;
1137   sigaction (SIGINT, &sa_term, NULL);
1138
1139   memset (&sa_pipe, 0, sizeof (sa_pipe));
1140   sa_pipe.sa_handler = SIG_IGN;
1141   sigaction (SIGPIPE, &sa_pipe, NULL);
1142
1143   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
1144
1145   cache_tree = g_tree_new ((GCompareFunc) strcmp);
1146   if (cache_tree == NULL)
1147   {
1148     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
1149     return (-1);
1150   }
1151
1152   memset (&queue_thread, 0, sizeof (queue_thread));
1153   status = pthread_create (&queue_thread, /* attr = */ NULL,
1154       queue_thread_main, /* args = */ NULL);
1155   if (status != 0)
1156   {
1157     RRDD_LOG (LOG_ERR, "daemonize: pthread_create failed.");
1158     return (-1);
1159   }
1160
1161   write_pidfile ();
1162
1163   return (0);
1164 } /* }}} int daemonize */
1165
1166 static int cleanup (void) /* {{{ */
1167 {
1168   RRDD_LOG (LOG_DEBUG, "cleanup ()");
1169
1170   do_shutdown++;
1171
1172   RRDD_LOG (LOG_DEBUG, "cleanup: Joining queue_thread..");
1173   pthread_cond_signal (&cache_cond);
1174   pthread_join (queue_thread, /* return = */ NULL);
1175   RRDD_LOG (LOG_DEBUG, "cleanup: done");
1176
1177   remove_pidfile ();
1178
1179   closelog ();
1180
1181   return (0);
1182 } /* }}} int cleanup */
1183
1184 static int read_options (int argc, char **argv) /* {{{ */
1185 {
1186   int option;
1187   int status = 0;
1188
1189   while ((option = getopt(argc, argv, "l:f:w:b:p:h?")) != -1)
1190   {
1191     switch (option)
1192     {
1193       case 'l':
1194       {
1195         char **temp;
1196
1197         temp = (char **) realloc (config_listen_address_list,
1198             sizeof (char *) * (config_listen_address_list_len + 1));
1199         if (temp == NULL)
1200         {
1201           fprintf (stderr, "read_options: realloc failed.\n");
1202           return (2);
1203         }
1204         config_listen_address_list = temp;
1205
1206         temp[config_listen_address_list_len] = strdup (optarg);
1207         if (temp[config_listen_address_list_len] == NULL)
1208         {
1209           fprintf (stderr, "read_options: strdup failed.\n");
1210           return (2);
1211         }
1212         config_listen_address_list_len++;
1213       }
1214       break;
1215
1216       case 'f':
1217       {
1218         int temp;
1219
1220         temp = atoi (optarg);
1221         if (temp > 0)
1222           config_flush_interval = temp;
1223         else
1224         {
1225           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
1226           status = 3;
1227         }
1228       }
1229       break;
1230
1231       case 'w':
1232       {
1233         int temp;
1234
1235         temp = atoi (optarg);
1236         if (temp > 0)
1237           config_write_interval = temp;
1238         else
1239         {
1240           fprintf (stderr, "Invalid write interval: %s\n", optarg);
1241           status = 2;
1242         }
1243       }
1244       break;
1245
1246       case 'b':
1247       {
1248         size_t len;
1249
1250         if (config_base_dir != NULL)
1251           free (config_base_dir);
1252         config_base_dir = strdup (optarg);
1253         if (config_base_dir == NULL)
1254         {
1255           fprintf (stderr, "read_options: strdup failed.\n");
1256           return (3);
1257         }
1258
1259         len = strlen (config_base_dir);
1260         while ((len > 0) && (config_base_dir[len - 1] == '/'))
1261         {
1262           config_base_dir[len - 1] = 0;
1263           len--;
1264         }
1265
1266         if (len < 1)
1267         {
1268           fprintf (stderr, "Invalid base directory: %s\n", optarg);
1269           return (4);
1270         }
1271       }
1272       break;
1273
1274       case 'p':
1275       {
1276         if (config_pid_file != NULL)
1277           free (config_pid_file);
1278         config_pid_file = strdup (optarg);
1279         if (config_pid_file == NULL)
1280         {
1281           fprintf (stderr, "read_options: strdup failed.\n");
1282           return (3);
1283         }
1284       }
1285       break;
1286
1287       case 'h':
1288       case '?':
1289         printf ("RRDd %s  Copyright (C) 2008 Florian octo Forster\n"
1290             "\n"
1291             "Usage: rrdcached [options]\n"
1292             "\n"
1293             "Valid options are:\n"
1294             "  -l <address>  Socket address to listen to.\n"
1295             "  -w <seconds>  Interval in which to write data.\n"
1296             "  -f <seconds>  Interval in which to flush dead data.\n"
1297             "  -p <file>     Location of the PID-file.\n"
1298             "  -b <dir>      Base directory to change to.\n"
1299             "\n"
1300             "For more information and a detailed description of all options "
1301             "please refer\n"
1302             "to the rrdcached(1) manual page.\n",
1303             VERSION);
1304         status = -1;
1305         break;
1306     } /* switch (option) */
1307   } /* while (getopt) */
1308
1309   return (status);
1310 } /* }}} int read_options */
1311
1312 int main (int argc, char **argv)
1313 {
1314   int status;
1315
1316   status = read_options (argc, argv);
1317   if (status != 0)
1318   {
1319     if (status < 0)
1320       status = 0;
1321     return (status);
1322   }
1323
1324   status = daemonize ();
1325   if (status == 1)
1326   {
1327     struct sigaction sigchld;
1328
1329     memset (&sigchld, 0, sizeof (sigchld));
1330     sigchld.sa_handler = SIG_IGN;
1331     sigaction (SIGCHLD, &sigchld, NULL);
1332
1333     return (0);
1334   }
1335   else if (status != 0)
1336   {
1337     fprintf (stderr, "daemonize failed, exiting.\n");
1338     return (1);
1339   }
1340
1341   listen_thread_main (NULL);
1342
1343   cleanup ();
1344
1345   return (0);
1346 } /* int main */
1347
1348 /*
1349  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
1350  */