RRDcached patch. This implements an infrastructure, where rrd updates can be
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; only version 2 of the License is applicable.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
17  *
18  * Authors:
19  *   Florian octo Forster <octo at verplant.org>
20  **/
21
22 /*
23  * First tell the compiler to stick to the C99 and POSIX standards as close as
24  * possible.
25  */
26 #ifndef __STRICT_ANSI__ /* {{{ */
27 # define __STRICT_ANSI__
28 #endif
29
30 #ifndef _ISOC99_SOURCE
31 # define _ISOC99_SOURCE
32 #endif
33
34 #ifdef _POSIX_C_SOURCE
35 # undef _POSIX_C_SOURCE
36 #endif
37 #define _POSIX_C_SOURCE 200112L
38
39 /* Single UNIX needed for strdup. */
40 #ifdef _XOPEN_SOURCE
41 # undef _XOPEN_SOURCE
42 #endif
43 #define _XOPEN_SOURCE 500
44
45 #ifndef _REENTRANT
46 # define _REENTRANT
47 #endif
48
49 #ifndef _THREAD_SAFE
50 # define _THREAD_SAFE
51 #endif
52
53 #ifdef _GNU_SOURCE
54 # undef _GNU_SOURCE
55 #endif
56 /* }}} */
57
58 /*
59  * Now for some includes..
60  */
61 #include "rrd.h" /* {{{ */
62 #include "rrd_client.h"
63
64 #include <stdlib.h>
65 #include <stdint.h>
66 #include <stdio.h>
67 #include <unistd.h>
68 #include <string.h>
69 #include <strings.h>
70 #include <stdint.h>
71 #include <inttypes.h>
72
73 #include <sys/types.h>
74 #include <sys/stat.h>
75 #include <fcntl.h>
76 #include <signal.h>
77 #include <sys/socket.h>
78 #include <sys/un.h>
79 #include <netdb.h>
80 #include <poll.h>
81 #include <syslog.h>
82 #include <pthread.h>
83 #include <errno.h>
84 #include <assert.h>
85 #include <sys/time.h>
86 #include <time.h>
87
88 #include <glib-2.0/glib.h>
89 /* }}} */
90
91 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
92
93 #ifndef __GNUC__
94 # define __attribute__(x) /**/
95 #endif
96
97 /*
98  * Types
99  */
100 struct listen_socket_s
101 {
102   int fd;
103   char path[PATH_MAX + 1];
104 };
105 typedef struct listen_socket_s listen_socket_t;
106
107 struct cache_item_s;
108 typedef struct cache_item_s cache_item_t;
109 struct cache_item_s
110 {
111   char *file;
112   char **values;
113   int values_num;
114   time_t last_flush_time;
115 #define CI_FLAGS_IN_TREE  0x01
116 #define CI_FLAGS_IN_QUEUE 0x02
117   int flags;
118
119   cache_item_t *next;
120 };
121
122 struct callback_flush_data_s
123 {
124   time_t now;
125   time_t abs_timeout;
126   char **keys;
127   size_t keys_num;
128 };
129 typedef struct callback_flush_data_s callback_flush_data_t;
130
131 enum queue_side_e
132 {
133   HEAD,
134   TAIL
135 };
136 typedef enum queue_side_e queue_side_t;
137
138 /*
139  * Variables
140  */
141 static listen_socket_t *listen_fds = NULL;
142 static size_t listen_fds_num = 0;
143
144 static int do_shutdown = 0;
145
146 static pthread_t queue_thread;
147
148 static pthread_t *connetion_threads = NULL;
149 static pthread_mutex_t connetion_threads_lock = PTHREAD_MUTEX_INITIALIZER;
150 static int connetion_threads_num = 0;
151
152 /* Cache stuff */
153 static GTree          *cache_tree = NULL;
154 static cache_item_t   *cache_queue_head = NULL;
155 static cache_item_t   *cache_queue_tail = NULL;
156 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
157 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
158
159 static pthread_cond_t  flush_cond = PTHREAD_COND_INITIALIZER;
160
161 static int config_write_interval = 300;
162 static int config_flush_interval = 3600;
163 static char *config_pid_file = NULL;
164 static char *config_base_dir = NULL;
165
166 static char **config_listen_address_list = NULL;
167 static int config_listen_address_list_len = 0;
168
169 static uint64_t stats_queue_length = 0;
170 static uint64_t stats_updates_written = 0;
171 static uint64_t stats_data_sets_written = 0;
172 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
173
174 /* 
175  * Functions
176  */
177 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
178 {
179   do_shutdown++;
180 } /* }}} void sig_int_handler */
181
182 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
183 {
184   do_shutdown++;
185 } /* }}} void sig_term_handler */
186
187 static int write_pidfile (void) /* {{{ */
188 {
189   pid_t pid;
190   char *file;
191   FILE *fh;
192
193   pid = getpid ();
194   
195   file = (config_pid_file != NULL)
196     ? config_pid_file
197     : LOCALSTATEDIR "/run/rrdcached.pid";
198
199   fh = fopen (file, "w");
200   if (fh == NULL)
201   {
202     RRDD_LOG (LOG_ERR, "write_pidfile: Opening `%s' failed.", file);
203     return (-1);
204   }
205
206   fprintf (fh, "%i\n", (int) pid);
207   fclose (fh);
208
209   return (0);
210 } /* }}} int write_pidfile */
211
212 static int remove_pidfile (void) /* {{{ */
213 {
214   char *file;
215   int status;
216
217   file = (config_pid_file != NULL)
218     ? config_pid_file
219     : LOCALSTATEDIR "/run/rrdcached.pid";
220
221   status = unlink (file);
222   if (status == 0)
223     return (0);
224   return (errno);
225 } /* }}} int remove_pidfile */
226
227 static ssize_t sread (int fd, void *buffer_void, size_t buffer_size) /* {{{ */
228 {
229   char    *buffer;
230   size_t   buffer_used;
231   size_t   buffer_free;
232   ssize_t  status;
233
234   buffer       = (char *) buffer_void;
235   buffer_used  = 0;
236   buffer_free  = buffer_size;
237
238   while (buffer_free > 0)
239   {
240     status = read (fd, buffer + buffer_used, buffer_free);
241     if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
242       continue;
243
244     if (status < 0)
245       return (-1);
246
247     if (status == 0)
248       return (0);
249
250     assert ((0 > status) || (buffer_free >= (size_t) status));
251
252     buffer_free = buffer_free - status;
253     buffer_used = buffer_used + status;
254
255     if (buffer[buffer_used - 1] == '\n')
256       break;
257   }
258
259   assert (buffer_used > 0);
260
261   if (buffer[buffer_used - 1] != '\n')
262   {
263     errno = ENOBUFS;
264     return (-1);
265   }
266
267   buffer[buffer_used - 1] = 0;
268
269   /* Fix network line endings. */
270   if ((buffer_used > 1) && (buffer[buffer_used - 2] == '\r'))
271   {
272     buffer_used--;
273     buffer[buffer_used - 1] = 0;
274   }
275
276   return (buffer_used);
277 } /* }}} ssize_t sread */
278
279 static ssize_t swrite (int fd, const void *buf, size_t count) /* {{{ */
280 {
281   const char *ptr;
282   size_t      nleft;
283   ssize_t     status;
284
285   ptr   = (const char *) buf;
286   nleft = count;
287
288   while (nleft > 0)
289   {
290     status = write (fd, (const void *) ptr, nleft);
291
292     if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
293       continue;
294
295     if (status < 0)
296       return (status);
297
298     nleft = nleft - status;
299     ptr   = ptr   + status;
300   }
301
302   return (0);
303 } /* }}} ssize_t swrite */
304
305 /*
306  * enqueue_cache_item:
307  * `cache_lock' must be acquired before calling this function!
308  */
309 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
310     queue_side_t side)
311 {
312   int did_insert = 0;
313
314   if (ci == NULL)
315     return (-1);
316
317   if (ci->values_num == 0)
318     return (0);
319
320   if (side == HEAD)
321   {
322     if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
323     {
324       assert (ci->next == NULL);
325       ci->next = cache_queue_head;
326       cache_queue_head = ci;
327
328       if (cache_queue_tail == NULL)
329         cache_queue_tail = cache_queue_head;
330
331       did_insert = 1;
332     }
333     else if (cache_queue_head == ci)
334     {
335       /* do nothing */
336     }
337     else /* enqueued, but not first entry */
338     {
339       cache_item_t *prev;
340
341       /* find previous entry */
342       for (prev = cache_queue_head; prev != NULL; prev = prev->next)
343         if (prev->next == ci)
344           break;
345       assert (prev != NULL);
346
347       /* move to the front */
348       prev->next = ci->next;
349       ci->next = cache_queue_head;
350       cache_queue_head = ci;
351
352       /* check if we need to adapt the tail */
353       if (cache_queue_tail == ci)
354         cache_queue_tail = prev;
355     }
356   }
357   else /* (side == TAIL) */
358   {
359     /* We don't move values back in the list.. */
360     if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
361       return (0);
362
363     assert (ci->next == NULL);
364
365     if (cache_queue_tail == NULL)
366       cache_queue_head = ci;
367     else
368       cache_queue_tail->next = ci;
369     cache_queue_tail = ci;
370
371     did_insert = 1;
372   }
373
374   ci->flags |= CI_FLAGS_IN_QUEUE;
375
376   if (did_insert)
377   {
378     pthread_mutex_lock (&stats_lock);
379     stats_queue_length++;
380     pthread_mutex_unlock (&stats_lock);
381   }
382
383   return (0);
384 } /* }}} int enqueue_cache_item */
385
386 /*
387  * tree_callback_flush:
388  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
389  * while this is in progress.
390  */
391 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
392     gpointer data)
393 {
394   cache_item_t *ci;
395   callback_flush_data_t *cfd;
396
397   ci = (cache_item_t *) value;
398   cfd = (callback_flush_data_t *) data;
399
400   if ((ci->last_flush_time <= cfd->abs_timeout)
401       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
402       && (ci->values_num > 0))
403   {
404     enqueue_cache_item (ci, TAIL);
405   }
406   else if ((do_shutdown != 0)
407       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
408       && (ci->values_num > 0))
409   {
410     enqueue_cache_item (ci, TAIL);
411   }
412   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
413       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
414       && (ci->values_num <= 0))
415   {
416     char **temp;
417
418     temp = (char **) realloc (cfd->keys,
419         sizeof (char *) * (cfd->keys_num + 1));
420     if (temp == NULL)
421     {
422       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
423       return (FALSE);
424     }
425     cfd->keys = temp;
426     /* Make really sure this points to the _same_ place */
427     assert ((char *) key == ci->file);
428     cfd->keys[cfd->keys_num] = (char *) key;
429     cfd->keys_num++;
430   }
431
432   return (FALSE);
433 } /* }}} gboolean tree_callback_flush */
434
435 static int flush_old_values (int max_age)
436 {
437   callback_flush_data_t cfd;
438   size_t k;
439
440   memset (&cfd, 0, sizeof (cfd));
441   /* Pass the current time as user data so that we don't need to call
442    * `time' for each node. */
443   cfd.now = time (NULL);
444   cfd.keys = NULL;
445   cfd.keys_num = 0;
446
447   if (max_age > 0)
448     cfd.abs_timeout = cfd.now - max_age;
449   else
450     cfd.abs_timeout = cfd.now + 1;
451
452   /* `tree_callback_flush' will return the keys of all values that haven't
453    * been touched in the last `config_flush_interval' seconds in `cfd'.
454    * The char*'s in this array point to the same memory as ci->file, so we
455    * don't need to free them separately. */
456   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
457
458   for (k = 0; k < cfd.keys_num; k++)
459   {
460     cache_item_t *ci;
461
462     /* This must not fail. */
463     ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
464     assert (ci != NULL);
465
466     /* If we end up here with values available, something's seriously
467      * messed up. */
468     assert (ci->values_num == 0);
469
470     /* Remove the node from the tree */
471     g_tree_remove (cache_tree, cfd.keys[k]);
472     cfd.keys[k] = NULL;
473
474     /* Now free and clean up `ci'. */
475     free (ci->file);
476     ci->file = NULL;
477     free (ci);
478     ci = NULL;
479   } /* for (k = 0; k < cfd.keys_num; k++) */
480
481   if (cfd.keys != NULL)
482   {
483     free (cfd.keys);
484     cfd.keys = NULL;
485   }
486
487   return (0);
488 } /* int flush_old_values */
489
490 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
491 {
492   struct timeval now;
493   struct timespec next_flush;
494
495   gettimeofday (&now, NULL);
496   next_flush.tv_sec = now.tv_sec + config_flush_interval;
497   next_flush.tv_nsec = 1000 * now.tv_usec;
498
499   pthread_mutex_lock (&cache_lock);
500   while ((do_shutdown == 0) || (cache_queue_head != NULL))
501   {
502     cache_item_t *ci;
503     char *file;
504     char **values;
505     int values_num;
506     int status;
507     int i;
508
509     /* First, check if it's time to do the cache flush. */
510     gettimeofday (&now, NULL);
511     if ((now.tv_sec > next_flush.tv_sec)
512         || ((now.tv_sec == next_flush.tv_sec)
513           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
514     {
515       /* Flush all values that haven't been written in the last
516        * `config_write_interval' seconds. */
517       flush_old_values (config_write_interval);
518
519       /* Determine the time of the next cache flush. */
520       while (next_flush.tv_sec < now.tv_sec)
521         next_flush.tv_sec += config_flush_interval;
522     }
523
524     /* Now, check if there's something to store away. If not, wait until
525      * something comes in or it's time to do the cache flush. */
526     if (cache_queue_head == NULL)
527     {
528       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
529       if ((status != 0) && (status != ETIMEDOUT))
530       {
531         RRDD_LOG (LOG_ERR, "queue_thread_main: "
532             "pthread_cond_timedwait returned %i.", status);
533       }
534     }
535
536     /* We're about to shut down, so lets flush the entire tree. */
537     if ((do_shutdown != 0) && (cache_queue_head == NULL))
538       flush_old_values (/* max age = */ -1);
539
540     /* Check if a value has arrived. This may be NULL if we timed out or there
541      * was an interrupt such as a signal. */
542     if (cache_queue_head == NULL)
543       continue;
544
545     ci = cache_queue_head;
546
547     /* copy the relevant parts */
548     file = strdup (ci->file);
549     if (file == NULL)
550     {
551       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
552       continue;
553     }
554
555     values = ci->values;
556     values_num = ci->values_num;
557
558     ci->values = NULL;
559     ci->values_num = 0;
560
561     ci->last_flush_time = time (NULL);
562     ci->flags &= ~(CI_FLAGS_IN_QUEUE);
563
564     cache_queue_head = ci->next;
565     if (cache_queue_head == NULL)
566       cache_queue_tail = NULL;
567     ci->next = NULL;
568
569     pthread_mutex_lock (&stats_lock);
570     assert (stats_queue_length > 0);
571     stats_queue_length--;
572     pthread_mutex_unlock (&stats_lock);
573
574     pthread_mutex_unlock (&cache_lock);
575
576     status = rrd_update_r (file, NULL, values_num, (void *) values);
577     if (status != 0)
578     {
579       RRDD_LOG (LOG_ERR, "queue_thread_main: "
580           "rrd_update_r failed with status %i.",
581           status);
582     }
583
584     free (file);
585     for (i = 0; i < values_num; i++)
586       free (values[i]);
587
588     if (status == 0)
589     {
590       pthread_mutex_lock (&stats_lock);
591       stats_updates_written++;
592       stats_data_sets_written += values_num;
593       pthread_mutex_unlock (&stats_lock);
594     }
595
596     pthread_mutex_lock (&cache_lock);
597     pthread_cond_broadcast (&flush_cond);
598
599     /* We're about to shut down, so lets flush the entire tree. */
600     if ((do_shutdown != 0) && (cache_queue_head == NULL))
601       flush_old_values (/* max age = */ -1);
602   } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
603   pthread_mutex_unlock (&cache_lock);
604
605   return (NULL);
606 } /* }}} void *queue_thread_main */
607
608 static int buffer_get_field (char **buffer_ret, /* {{{ */
609     size_t *buffer_size_ret, char **field_ret)
610 {
611   char *buffer;
612   size_t buffer_pos;
613   size_t buffer_size;
614   char *field;
615   size_t field_size;
616   int status;
617
618   buffer = *buffer_ret;
619   buffer_pos = 0;
620   buffer_size = *buffer_size_ret;
621   field = *buffer_ret;
622   field_size = 0;
623
624   if (buffer_size <= 0)
625     return (-1);
626
627   /* This is ensured by `handle_request'. */
628   assert (buffer[buffer_size - 1] == ' ');
629
630   status = -1;
631   while (buffer_pos < buffer_size)
632   {
633     /* Check for end-of-field or end-of-buffer */
634     if (buffer[buffer_pos] == ' ')
635     {
636       field[field_size] = 0;
637       field_size++;
638       buffer_pos++;
639       status = 0;
640       break;
641     }
642     /* Handle escaped characters. */
643     else if (buffer[buffer_pos] == '\\')
644     {
645       if (buffer_pos >= (buffer_size - 1))
646         break;
647       buffer_pos++;
648       field[field_size] = buffer[buffer_pos];
649       field_size++;
650       buffer_pos++;
651     }
652     /* Normal operation */ 
653     else
654     {
655       field[field_size] = buffer[buffer_pos];
656       field_size++;
657       buffer_pos++;
658     }
659   } /* while (buffer_pos < buffer_size) */
660
661   if (status != 0)
662     return (status);
663
664   *buffer_ret = buffer + buffer_pos;
665   *buffer_size_ret = buffer_size - buffer_pos;
666   *field_ret = field;
667
668   return (0);
669 } /* }}} int buffer_get_field */
670
671 static int flush_file (const char *filename) /* {{{ */
672 {
673   cache_item_t *ci;
674
675   pthread_mutex_lock (&cache_lock);
676
677   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
678   if (ci == NULL)
679   {
680     pthread_mutex_unlock (&cache_lock);
681     return (ENOENT);
682   }
683
684   /* Enqueue at head */
685   enqueue_cache_item (ci, HEAD);
686   pthread_cond_signal (&cache_cond);
687
688   while ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
689   {
690     ci = NULL;
691
692     pthread_cond_wait (&flush_cond, &cache_lock);
693
694     ci = g_tree_lookup (cache_tree, filename);
695     if (ci == NULL)
696     {
697       RRDD_LOG (LOG_ERR, "flush_file: Tree node went away "
698           "while waiting for flush.");
699       pthread_mutex_unlock (&cache_lock);
700       return (-1);
701     }
702   }
703
704   pthread_mutex_unlock (&cache_lock);
705   return (0);
706 } /* }}} int flush_file */
707
708 static int handle_request_help (int fd, /* {{{ */
709     char *buffer, size_t buffer_size)
710 {
711   int status;
712   char **help_text;
713   size_t help_text_len;
714   char *command;
715   size_t i;
716
717   char *help_help[] =
718   {
719     "4 Command overview\n",
720     "FLUSH <filename>\n",
721     "HELP [<command>]\n",
722     "UPDATE <filename> <values> [<values> ...]\n",
723     "STATS\n"
724   };
725   size_t help_help_len = sizeof (help_help) / sizeof (help_help[0]);
726
727   char *help_flush[] =
728   {
729     "4 Help for FLUSH\n",
730     "Usage: FLUSH <filename>\n",
731     "\n",
732     "Adds the given filename to the head of the update queue and returns\n",
733     "after is has been dequeued.\n"
734   };
735   size_t help_flush_len = sizeof (help_flush) / sizeof (help_flush[0]);
736
737   char *help_update[] =
738   {
739     "9 Help for UPDATE\n",
740     "Usage: UPDATE <filename> <values> [<values> ...]\n"
741     "\n",
742     "Adds the given file to the internal cache if it is not yet known and\n",
743     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n",
744     "for details.\n",
745     "\n",
746     "Each <values> has the following form:\n",
747     "  <values> = <time>:<value>[:<value>[...]]\n",
748     "See the rrdupdate(1) manpage for details.\n"
749   };
750   size_t help_update_len = sizeof (help_update) / sizeof (help_update[0]);
751
752   char *help_stats[] =
753   {
754     "4 Help for STATS\n",
755     "Usage: STATS\n",
756     "\n",
757     "Returns some performance counters, see the rrdcached(1) manpage for\n",
758     "a description of the values.\n"
759   };
760   size_t help_stats_len = sizeof (help_stats) / sizeof (help_stats[0]);
761
762   status = buffer_get_field (&buffer, &buffer_size, &command);
763   if (status != 0)
764   {
765     help_text = help_help;
766     help_text_len = help_help_len;
767   }
768   else
769   {
770     if (strcasecmp (command, "update") == 0)
771     {
772       help_text = help_update;
773       help_text_len = help_update_len;
774     }
775     else if (strcasecmp (command, "flush") == 0)
776     {
777       help_text = help_flush;
778       help_text_len = help_flush_len;
779     }
780     else if (strcasecmp (command, "stats") == 0)
781     {
782       help_text = help_stats;
783       help_text_len = help_stats_len;
784     }
785     else
786     {
787       help_text = help_help;
788       help_text_len = help_help_len;
789     }
790   }
791
792   for (i = 0; i < help_text_len; i++)
793   {
794     status = swrite (fd, help_text[i], strlen (help_text[i]));
795     if (status < 0)
796     {
797       status = errno;
798       RRDD_LOG (LOG_ERR, "handle_request_help: swrite returned an error.");
799       return (status);
800     }
801   }
802
803   return (0);
804 } /* }}} int handle_request_help */
805
806 static int handle_request_stats (int fd, /* {{{ */
807     char *buffer __attribute__((unused)),
808     size_t buffer_size __attribute__((unused)))
809 {
810   int status;
811   char outbuf[4096];
812
813   uint64_t copy_queue_length;
814   uint64_t copy_updates_written;
815   uint64_t copy_data_sets_written;
816
817   uint64_t tree_nodes_number;
818   uint64_t tree_depth;
819
820   pthread_mutex_lock (&stats_lock);
821   copy_queue_length       = stats_queue_length;
822   copy_updates_written    = stats_updates_written;
823   copy_data_sets_written  = stats_data_sets_written;
824   pthread_mutex_unlock (&stats_lock);
825
826   pthread_mutex_lock (&cache_lock);
827   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
828   tree_depth        = (uint64_t) g_tree_height (cache_tree);
829   pthread_mutex_unlock (&cache_lock);
830
831 #define RRDD_STATS_SEND \
832   outbuf[sizeof (outbuf) - 1] = 0; \
833   status = swrite (fd, outbuf, strlen (outbuf)); \
834   if (status < 0) \
835   { \
836     status = errno; \
837     RRDD_LOG (LOG_INFO, "handle_request_stats: swrite returned an error."); \
838     return (status); \
839   }
840
841   strncpy (outbuf, "5 Statistics follow\n", sizeof (outbuf));
842   RRDD_STATS_SEND;
843
844   snprintf (outbuf, sizeof (outbuf),
845       "QueueLength: %"PRIu64"\n", copy_queue_length);
846   RRDD_STATS_SEND;
847
848   snprintf (outbuf, sizeof (outbuf),
849       "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
850   RRDD_STATS_SEND;
851
852   snprintf (outbuf, sizeof (outbuf),
853       "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
854   RRDD_STATS_SEND;
855
856   snprintf (outbuf, sizeof (outbuf),
857       "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
858   RRDD_STATS_SEND;
859
860   snprintf (outbuf, sizeof (outbuf),
861       "TreeDepth: %"PRIu64"\n", tree_depth);
862   RRDD_STATS_SEND;
863
864   return (0);
865 #undef RRDD_STATS_SEND
866 } /* }}} int handle_request_stats */
867
868 static int handle_request_flush (int fd, /* {{{ */
869     char *buffer, size_t buffer_size)
870 {
871   char *file;
872   int status;
873   char result[4096];
874
875   status = buffer_get_field (&buffer, &buffer_size, &file);
876   if (status != 0)
877   {
878     strncpy (result, "-1 Usage: flush <filename>\n", sizeof (result));
879   }
880   else
881   {
882     status = flush_file (file);
883     if (status == 0)
884       snprintf (result, sizeof (result), "0 Successfully flushed %s.\n", file);
885     else if (status == ENOENT)
886       snprintf (result, sizeof (result), "-1 No such file: %s.\n", file);
887     else if (status < 0)
888       strncpy (result, "-1 Internal error.\n", sizeof (result));
889     else
890       snprintf (result, sizeof (result), "-1 Failed with status %i.\n", status);
891   }
892   result[sizeof (result) - 1] = 0;
893
894   status = swrite (fd, result, strlen (result));
895   if (status < 0)
896   {
897     status = errno;
898     RRDD_LOG (LOG_INFO, "handle_request_flush: swrite returned an error.");
899     return (status);
900   }
901
902   return (0);
903 } /* }}} int handle_request_flush */
904
905 static int handle_request_update (int fd, /* {{{ */
906     char *buffer, size_t buffer_size)
907 {
908   char *file;
909   int values_num = 0;
910   int status;
911
912   time_t now;
913
914   cache_item_t *ci;
915   char answer[4096];
916
917 #define RRDD_UPDATE_SEND \
918   answer[sizeof (answer) - 1] = 0; \
919   status = swrite (fd, answer, strlen (answer)); \
920   if (status < 0) \
921   { \
922     status = errno; \
923     RRDD_LOG (LOG_INFO, "handle_request_update: swrite returned an error."); \
924     return (status); \
925   }
926
927   now = time (NULL);
928
929   status = buffer_get_field (&buffer, &buffer_size, &file);
930   if (status != 0)
931   {
932     strncpy (answer, "-1 Usage: UPDATE <filename> <values> [<values> ...]\n",
933         sizeof (answer));
934     RRDD_UPDATE_SEND;
935     return (0);
936   }
937
938   pthread_mutex_lock (&cache_lock);
939
940   ci = g_tree_lookup (cache_tree, file);
941   if (ci == NULL) /* {{{ */
942   {
943     struct stat statbuf;
944
945     memset (&statbuf, 0, sizeof (statbuf));
946     status = stat (file, &statbuf);
947     if (status != 0)
948     {
949       pthread_mutex_unlock (&cache_lock);
950       RRDD_LOG (LOG_ERR, "handle_request_update: stat (%s) failed.", file);
951
952       status = errno;
953       if (status == ENOENT)
954         snprintf (answer, sizeof (answer), "-1 No such file: %s", file);
955       else
956         snprintf (answer, sizeof (answer), "-1 stat failed with error %i.\n",
957             status);
958       RRDD_UPDATE_SEND;
959       return (0);
960     }
961     if (!S_ISREG (statbuf.st_mode))
962     {
963       pthread_mutex_unlock (&cache_lock);
964
965       snprintf (answer, sizeof (answer), "-1 Not a regular file: %s", file);
966       RRDD_UPDATE_SEND;
967       return (0);
968     }
969
970     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
971     if (ci == NULL)
972     {
973       pthread_mutex_unlock (&cache_lock);
974       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
975
976       strncpy (answer, "-1 malloc failed.\n", sizeof (answer));
977       RRDD_UPDATE_SEND;
978       return (0);
979     }
980     memset (ci, 0, sizeof (cache_item_t));
981
982     ci->file = strdup (file);
983     if (ci->file == NULL)
984     {
985       pthread_mutex_unlock (&cache_lock);
986       free (ci);
987       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
988
989       strncpy (answer, "-1 strdup failed.\n", sizeof (answer));
990       RRDD_UPDATE_SEND;
991       return (0);
992     }
993
994     ci->values = NULL;
995     ci->values_num = 0;
996     ci->last_flush_time = now;
997     ci->flags = CI_FLAGS_IN_TREE;
998
999     g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1000   } /* }}} */
1001   assert (ci != NULL);
1002
1003   while (buffer_size > 0)
1004   {
1005     char **temp;
1006     char *value;
1007
1008     status = buffer_get_field (&buffer, &buffer_size, &value);
1009     if (status != 0)
1010     {
1011       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1012       break;
1013     }
1014
1015     temp = (char **) realloc (ci->values,
1016         sizeof (char *) * (ci->values_num + 1));
1017     if (temp == NULL)
1018     {
1019       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1020       continue;
1021     }
1022     ci->values = temp;
1023
1024     ci->values[ci->values_num] = strdup (value);
1025     if (ci->values[ci->values_num] == NULL)
1026     {
1027       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1028       continue;
1029     }
1030     ci->values_num++;
1031
1032     values_num++;
1033   }
1034
1035   if (((now - ci->last_flush_time) >= config_write_interval)
1036       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1037       && (ci->values_num > 0))
1038   {
1039     enqueue_cache_item (ci, TAIL);
1040     pthread_cond_signal (&cache_cond);
1041   }
1042
1043   pthread_mutex_unlock (&cache_lock);
1044
1045   if (values_num < 1)
1046   {
1047     strncpy (answer, "-1 No values updated.\n", sizeof (answer));
1048   }
1049   else
1050   {
1051     snprintf (answer, sizeof (answer), "0 Enqueued %i value%s\n", values_num,
1052         (values_num == 1) ? "" : "s");
1053   }
1054   RRDD_UPDATE_SEND;
1055   return (0);
1056 #undef RRDD_UPDATE_SEND
1057 } /* }}} int handle_request_update */
1058
1059 static int handle_request (int fd) /* {{{ */
1060 {
1061   char buffer[4096];
1062   size_t buffer_size;
1063   char *buffer_ptr;
1064   char *command;
1065   int status;
1066
1067   status = (int) sread (fd, buffer, sizeof (buffer));
1068   if (status == 0)
1069   {
1070     return (1);
1071   }
1072   else if (status < 0)
1073   {
1074     RRDD_LOG (LOG_ERR, "handle_request: sread failed.");
1075     return (-1);
1076   }
1077   buffer_size = (size_t) status;
1078   assert (buffer_size <= sizeof (buffer));
1079   assert (buffer[buffer_size - 1] == 0);
1080
1081   /* Place the normal field separator at the end to simplify
1082    * `buffer_get_field's work. */
1083   buffer[buffer_size - 1] = ' ';
1084
1085   buffer_ptr = buffer;
1086   command = NULL;
1087   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1088   if (status != 0)
1089   {
1090     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1091     return (-1);
1092   }
1093
1094   if (strcasecmp (command, "update") == 0)
1095   {
1096     return (handle_request_update (fd, buffer_ptr, buffer_size));
1097   }
1098   else if (strcasecmp (command, "flush") == 0)
1099   {
1100     return (handle_request_flush (fd, buffer_ptr, buffer_size));
1101   }
1102   else if (strcasecmp (command, "stats") == 0)
1103   {
1104     return (handle_request_stats (fd, buffer_ptr, buffer_size));
1105   }
1106   else if (strcasecmp (command, "help") == 0)
1107   {
1108     return (handle_request_help (fd, buffer_ptr, buffer_size));
1109   }
1110   else
1111   {
1112     char result[4096];
1113
1114     snprintf (result, sizeof (result), "-1 Unknown command: %s\n", command);
1115     result[sizeof (result) - 1] = 0;
1116
1117     status = swrite (fd, result, strlen (result));
1118     if (status < 0)
1119     {
1120       RRDD_LOG (LOG_ERR, "handle_request: swrite failed.");
1121       return (-1);
1122     }
1123   }
1124
1125   return (0);
1126 } /* }}} int handle_request */
1127
1128 static void *connection_thread_main (void *args /* {{{ */
1129     __attribute__((unused)))
1130 {
1131   pthread_t self;
1132   int i;
1133   int fd;
1134   
1135   fd = *((int *) args);
1136
1137   pthread_mutex_lock (&connetion_threads_lock);
1138   {
1139     pthread_t *temp;
1140
1141     temp = (pthread_t *) realloc (connetion_threads,
1142         sizeof (pthread_t) * (connetion_threads_num + 1));
1143     if (temp == NULL)
1144     {
1145       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1146     }
1147     else
1148     {
1149       connetion_threads = temp;
1150       connetion_threads[connetion_threads_num] = pthread_self ();
1151       connetion_threads_num++;
1152     }
1153   }
1154   pthread_mutex_unlock (&connetion_threads_lock);
1155
1156   while (do_shutdown == 0)
1157   {
1158     struct pollfd pollfd;
1159     int status;
1160
1161     pollfd.fd = fd;
1162     pollfd.events = POLLIN | POLLPRI;
1163     pollfd.revents = 0;
1164
1165     status = poll (&pollfd, 1, /* timeout = */ 500);
1166     if (status == 0) /* timeout */
1167       continue;
1168     else if (status < 0) /* error */
1169     {
1170       status = errno;
1171       if (status == EINTR)
1172         continue;
1173       RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1174       continue;
1175     }
1176
1177     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1178     {
1179       close (fd);
1180       break;
1181     }
1182     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1183     {
1184       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1185           "poll(2) returned something unexpected: %#04hx",
1186           pollfd.revents);
1187       close (fd);
1188       break;
1189     }
1190
1191     status = handle_request (fd);
1192     if (status != 0)
1193     {
1194       close (fd);
1195       break;
1196     }
1197   }
1198
1199   self = pthread_self ();
1200   /* Remove this thread from the connection threads list */
1201   pthread_mutex_lock (&connetion_threads_lock);
1202   /* Find out own index in the array */
1203   for (i = 0; i < connetion_threads_num; i++)
1204     if (pthread_equal (connetion_threads[i], self) != 0)
1205       break;
1206   assert (i < connetion_threads_num);
1207
1208   /* Move the trailing threads forward. */
1209   if (i < (connetion_threads_num - 1))
1210   {
1211     memmove (connetion_threads + i,
1212         connetion_threads + i + 1,
1213         sizeof (pthread_t) * (connetion_threads_num - i - 1));
1214   }
1215
1216   connetion_threads_num--;
1217   pthread_mutex_unlock (&connetion_threads_lock);
1218
1219   free (args);
1220   return (NULL);
1221 } /* }}} void *connection_thread_main */
1222
1223 static int open_listen_socket_unix (const char *path) /* {{{ */
1224 {
1225   int fd;
1226   struct sockaddr_un sa;
1227   listen_socket_t *temp;
1228   int status;
1229
1230   temp = (listen_socket_t *) realloc (listen_fds,
1231       sizeof (listen_fds[0]) * (listen_fds_num + 1));
1232   if (temp == NULL)
1233   {
1234     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1235     return (-1);
1236   }
1237   listen_fds = temp;
1238   memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
1239
1240   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
1241   if (fd < 0)
1242   {
1243     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
1244     return (-1);
1245   }
1246
1247   memset (&sa, 0, sizeof (sa));
1248   sa.sun_family = AF_UNIX;
1249   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
1250
1251   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
1252   if (status != 0)
1253   {
1254     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1255     close (fd);
1256     unlink (path);
1257     return (-1);
1258   }
1259
1260   status = listen (fd, /* backlog = */ 10);
1261   if (status != 0)
1262   {
1263     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
1264     close (fd);
1265     unlink (path);
1266     return (-1);
1267   }
1268   
1269   listen_fds[listen_fds_num].fd = fd;
1270   snprintf (listen_fds[listen_fds_num].path,
1271       sizeof (listen_fds[listen_fds_num].path) - 1,
1272       "unix:%s", path);
1273   listen_fds_num++;
1274
1275   return (0);
1276 } /* }}} int open_listen_socket_unix */
1277
1278 static int open_listen_socket (const char *addr) /* {{{ */
1279 {
1280   struct addrinfo ai_hints;
1281   struct addrinfo *ai_res;
1282   struct addrinfo *ai_ptr;
1283   int status;
1284
1285   assert (addr != NULL);
1286
1287   if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
1288     return (open_listen_socket_unix (addr + strlen ("unix:")));
1289   else if (addr[0] == '/')
1290     return (open_listen_socket_unix (addr));
1291
1292   memset (&ai_hints, 0, sizeof (ai_hints));
1293   ai_hints.ai_flags = 0;
1294 #ifdef AI_ADDRCONFIG
1295   ai_hints.ai_flags |= AI_ADDRCONFIG;
1296 #endif
1297   ai_hints.ai_family = AF_UNSPEC;
1298   ai_hints.ai_socktype = SOCK_STREAM;
1299
1300   ai_res = NULL;
1301   status = getaddrinfo (addr, RRDCACHED_DEFAULT_PORT, &ai_hints, &ai_res);
1302   if (status != 0)
1303   {
1304     RRDD_LOG (LOG_ERR, "open_listen_socket: getaddrinfo(%s) failed: "
1305         "%s", addr, gai_strerror (status));
1306     return (-1);
1307   }
1308
1309   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
1310   {
1311     int fd;
1312     listen_socket_t *temp;
1313
1314     temp = (listen_socket_t *) realloc (listen_fds,
1315         sizeof (listen_fds[0]) * (listen_fds_num + 1));
1316     if (temp == NULL)
1317     {
1318       RRDD_LOG (LOG_ERR, "open_listen_socket: realloc failed.");
1319       continue;
1320     }
1321     listen_fds = temp;
1322     memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
1323
1324     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
1325     if (fd < 0)
1326     {
1327       RRDD_LOG (LOG_ERR, "open_listen_socket: socket(2) failed.");
1328       continue;
1329     }
1330
1331     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
1332     if (status != 0)
1333     {
1334       RRDD_LOG (LOG_ERR, "open_listen_socket: bind(2) failed.");
1335       close (fd);
1336       continue;
1337     }
1338
1339     status = listen (fd, /* backlog = */ 10);
1340     if (status != 0)
1341     {
1342       RRDD_LOG (LOG_ERR, "open_listen_socket: listen(2) failed.");
1343       close (fd);
1344       return (-1);
1345     }
1346
1347     listen_fds[listen_fds_num].fd = fd;
1348     strncpy (listen_fds[listen_fds_num].path, addr,
1349         sizeof (listen_fds[listen_fds_num].path) - 1);
1350     listen_fds_num++;
1351   } /* for (ai_ptr) */
1352
1353   return (0);
1354 } /* }}} int open_listen_socket */
1355
1356 static int close_listen_sockets (void) /* {{{ */
1357 {
1358   size_t i;
1359
1360   for (i = 0; i < listen_fds_num; i++)
1361   {
1362     close (listen_fds[i].fd);
1363     if (strncmp ("unix:", listen_fds[i].path, strlen ("unix:")) == 0)
1364       unlink (listen_fds[i].path + strlen ("unix:"));
1365   }
1366
1367   free (listen_fds);
1368   listen_fds = NULL;
1369   listen_fds_num = 0;
1370
1371   return (0);
1372 } /* }}} int close_listen_sockets */
1373
1374 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
1375 {
1376   struct pollfd *pollfds;
1377   int pollfds_num;
1378   int status;
1379   int i;
1380
1381   for (i = 0; i < config_listen_address_list_len; i++)
1382     open_listen_socket (config_listen_address_list[i]);
1383
1384   if (config_listen_address_list_len < 1)
1385     open_listen_socket (RRDCACHED_DEFAULT_ADDRESS);
1386
1387   if (listen_fds_num < 1)
1388   {
1389     RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
1390         "could be opened. Sorry.");
1391     return (NULL);
1392   }
1393
1394   pollfds_num = listen_fds_num;
1395   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
1396   if (pollfds == NULL)
1397   {
1398     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1399     return (NULL);
1400   }
1401   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
1402
1403   while (do_shutdown == 0)
1404   {
1405     assert (pollfds_num == ((int) listen_fds_num));
1406     for (i = 0; i < pollfds_num; i++)
1407     {
1408       pollfds[i].fd = listen_fds[i].fd;
1409       pollfds[i].events = POLLIN | POLLPRI;
1410       pollfds[i].revents = 0;
1411     }
1412
1413     status = poll (pollfds, pollfds_num, /* timeout = */ -1);
1414     if (status < 1)
1415     {
1416       status = errno;
1417       if (status != EINTR)
1418       {
1419         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
1420       }
1421       continue;
1422     }
1423
1424     for (i = 0; i < pollfds_num; i++)
1425     {
1426       int *client_sd;
1427       struct sockaddr_storage client_sa;
1428       socklen_t client_sa_size;
1429       pthread_t tid;
1430       pthread_attr_t attr;
1431
1432       if (pollfds[i].revents == 0)
1433         continue;
1434
1435       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
1436       {
1437         RRDD_LOG (LOG_ERR, "listen_thread_main: "
1438             "poll(2) returned something unexpected for listen FD #%i.",
1439             pollfds[i].fd);
1440         continue;
1441       }
1442
1443       client_sd = (int *) malloc (sizeof (int));
1444       if (client_sd == NULL)
1445       {
1446         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1447         continue;
1448       }
1449
1450       client_sa_size = sizeof (client_sa);
1451       *client_sd = accept (pollfds[i].fd,
1452           (struct sockaddr *) &client_sa, &client_sa_size);
1453       if (*client_sd < 0)
1454       {
1455         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
1456         continue;
1457       }
1458
1459       pthread_attr_init (&attr);
1460       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
1461
1462       status = pthread_create (&tid, &attr, connection_thread_main,
1463           /* args = */ (void *) client_sd);
1464       if (status != 0)
1465       {
1466         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
1467         close (*client_sd);
1468         free (client_sd);
1469         continue;
1470       }
1471     } /* for (pollfds_num) */
1472   } /* while (do_shutdown == 0) */
1473
1474   close_listen_sockets ();
1475
1476   pthread_mutex_lock (&connetion_threads_lock);
1477   while (connetion_threads_num > 0)
1478   {
1479     pthread_t wait_for;
1480
1481     wait_for = connetion_threads[0];
1482
1483     pthread_mutex_unlock (&connetion_threads_lock);
1484     pthread_join (wait_for, /* retval = */ NULL);
1485     pthread_mutex_lock (&connetion_threads_lock);
1486   }
1487   pthread_mutex_unlock (&connetion_threads_lock);
1488
1489   return (NULL);
1490 } /* }}} void *listen_thread_main */
1491
1492 static int daemonize (void) /* {{{ */
1493 {
1494   pid_t child;
1495   int status;
1496   char *base_dir;
1497
1498   /* These structures are static, because `sigaction' behaves weird if the are
1499    * overwritten.. */
1500   static struct sigaction sa_int;
1501   static struct sigaction sa_term;
1502   static struct sigaction sa_pipe;
1503
1504   child = fork ();
1505   if (child < 0)
1506   {
1507     fprintf (stderr, "daemonize: fork(2) failed.\n");
1508     return (-1);
1509   }
1510   else if (child > 0)
1511   {
1512     return (1);
1513   }
1514
1515   /* Change into the /tmp directory. */
1516   base_dir = (config_base_dir != NULL)
1517     ? config_base_dir
1518     : "/tmp";
1519   status = chdir (base_dir);
1520   if (status != 0)
1521   {
1522     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
1523     return (-1);
1524   }
1525
1526   /* Become session leader */
1527   setsid ();
1528
1529   /* Open the first three file descriptors to /dev/null */
1530   close (2);
1531   close (1);
1532   close (0);
1533
1534   open ("/dev/null", O_RDWR);
1535   dup (0);
1536   dup (0);
1537
1538   /* Install signal handlers */
1539   memset (&sa_int, 0, sizeof (sa_int));
1540   sa_int.sa_handler = sig_int_handler;
1541   sigaction (SIGINT, &sa_int, NULL);
1542
1543   memset (&sa_term, 0, sizeof (sa_term));
1544   sa_term.sa_handler = sig_term_handler;
1545   sigaction (SIGTERM, &sa_term, NULL);
1546
1547   memset (&sa_pipe, 0, sizeof (sa_pipe));
1548   sa_pipe.sa_handler = SIG_IGN;
1549   sigaction (SIGPIPE, &sa_pipe, NULL);
1550
1551   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
1552
1553   cache_tree = g_tree_new ((GCompareFunc) strcmp);
1554   if (cache_tree == NULL)
1555   {
1556     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
1557     return (-1);
1558   }
1559
1560   memset (&queue_thread, 0, sizeof (queue_thread));
1561   status = pthread_create (&queue_thread, /* attr = */ NULL,
1562       queue_thread_main, /* args = */ NULL);
1563   if (status != 0)
1564   {
1565     RRDD_LOG (LOG_ERR, "daemonize: pthread_create failed.");
1566     return (-1);
1567   }
1568
1569   write_pidfile ();
1570
1571   return (0);
1572 } /* }}} int daemonize */
1573
1574 static int cleanup (void) /* {{{ */
1575 {
1576   do_shutdown++;
1577
1578   pthread_cond_signal (&cache_cond);
1579   pthread_join (queue_thread, /* return = */ NULL);
1580
1581   remove_pidfile ();
1582
1583   closelog ();
1584
1585   return (0);
1586 } /* }}} int cleanup */
1587
1588 static int read_options (int argc, char **argv) /* {{{ */
1589 {
1590   int option;
1591   int status = 0;
1592
1593   while ((option = getopt(argc, argv, "l:f:w:b:p:h?")) != -1)
1594   {
1595     switch (option)
1596     {
1597       case 'l':
1598       {
1599         char **temp;
1600
1601         temp = (char **) realloc (config_listen_address_list,
1602             sizeof (char *) * (config_listen_address_list_len + 1));
1603         if (temp == NULL)
1604         {
1605           fprintf (stderr, "read_options: realloc failed.\n");
1606           return (2);
1607         }
1608         config_listen_address_list = temp;
1609
1610         temp[config_listen_address_list_len] = strdup (optarg);
1611         if (temp[config_listen_address_list_len] == NULL)
1612         {
1613           fprintf (stderr, "read_options: strdup failed.\n");
1614           return (2);
1615         }
1616         config_listen_address_list_len++;
1617       }
1618       break;
1619
1620       case 'f':
1621       {
1622         int temp;
1623
1624         temp = atoi (optarg);
1625         if (temp > 0)
1626           config_flush_interval = temp;
1627         else
1628         {
1629           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
1630           status = 3;
1631         }
1632       }
1633       break;
1634
1635       case 'w':
1636       {
1637         int temp;
1638
1639         temp = atoi (optarg);
1640         if (temp > 0)
1641           config_write_interval = temp;
1642         else
1643         {
1644           fprintf (stderr, "Invalid write interval: %s\n", optarg);
1645           status = 2;
1646         }
1647       }
1648       break;
1649
1650       case 'b':
1651       {
1652         size_t len;
1653
1654         if (config_base_dir != NULL)
1655           free (config_base_dir);
1656         config_base_dir = strdup (optarg);
1657         if (config_base_dir == NULL)
1658         {
1659           fprintf (stderr, "read_options: strdup failed.\n");
1660           return (3);
1661         }
1662
1663         len = strlen (config_base_dir);
1664         while ((len > 0) && (config_base_dir[len - 1] == '/'))
1665         {
1666           config_base_dir[len - 1] = 0;
1667           len--;
1668         }
1669
1670         if (len < 1)
1671         {
1672           fprintf (stderr, "Invalid base directory: %s\n", optarg);
1673           return (4);
1674         }
1675       }
1676       break;
1677
1678       case 'p':
1679       {
1680         if (config_pid_file != NULL)
1681           free (config_pid_file);
1682         config_pid_file = strdup (optarg);
1683         if (config_pid_file == NULL)
1684         {
1685           fprintf (stderr, "read_options: strdup failed.\n");
1686           return (3);
1687         }
1688       }
1689       break;
1690
1691       case 'h':
1692       case '?':
1693         printf ("RRDd %s  Copyright (C) 2008 Florian octo Forster\n"
1694             "\n"
1695             "Usage: rrdcached [options]\n"
1696             "\n"
1697             "Valid options are:\n"
1698             "  -l <address>  Socket address to listen to.\n"
1699             "  -w <seconds>  Interval in which to write data.\n"
1700             "  -f <seconds>  Interval in which to flush dead data.\n"
1701             "  -p <file>     Location of the PID-file.\n"
1702             "  -b <dir>      Base directory to change to.\n"
1703             "\n"
1704             "For more information and a detailed description of all options "
1705             "please refer\n"
1706             "to the rrdcached(1) manual page.\n",
1707             VERSION);
1708         status = -1;
1709         break;
1710     } /* switch (option) */
1711   } /* while (getopt) */
1712
1713   return (status);
1714 } /* }}} int read_options */
1715
1716 int main (int argc, char **argv)
1717 {
1718   int status;
1719
1720   status = read_options (argc, argv);
1721   if (status != 0)
1722   {
1723     if (status < 0)
1724       status = 0;
1725     return (status);
1726   }
1727
1728   status = daemonize ();
1729   if (status == 1)
1730   {
1731     struct sigaction sigchld;
1732
1733     memset (&sigchld, 0, sizeof (sigchld));
1734     sigchld.sa_handler = SIG_IGN;
1735     sigaction (SIGCHLD, &sigchld, NULL);
1736
1737     return (0);
1738   }
1739   else if (status != 0)
1740   {
1741     fprintf (stderr, "daemonize failed, exiting.\n");
1742     return (1);
1743   }
1744
1745   listen_thread_main (NULL);
1746
1747   cleanup ();
1748
1749   return (0);
1750 } /* int main */
1751
1752 /*
1753  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
1754  */