The daemon should behave the same way w/r/t files whether we "-g" or not.
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
67
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
76
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
91
92 #include <glib-2.0/glib.h>
93 /* }}} */
94
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
96
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
100
101 /*
102  * Types
103  */
104 typedef enum
105 {
106   PRIV_LOW,
107   PRIV_HIGH
108 } socket_privilege;
109
110 struct listen_socket_s
111 {
112   int fd;
113   char addr[PATH_MAX + 1];
114   int family;
115   socket_privilege privilege;
116 };
117 typedef struct listen_socket_s listen_socket_t;
118
119 struct cache_item_s;
120 typedef struct cache_item_s cache_item_t;
121 struct cache_item_s
122 {
123   char *file;
124   char **values;
125   int values_num;
126   time_t last_flush_time;
127 #define CI_FLAGS_IN_TREE  (1<<0)
128 #define CI_FLAGS_IN_QUEUE (1<<1)
129   int flags;
130   pthread_cond_t  flushed;
131   cache_item_t *next;
132 };
133
134 struct callback_flush_data_s
135 {
136   time_t now;
137   time_t abs_timeout;
138   char **keys;
139   size_t keys_num;
140 };
141 typedef struct callback_flush_data_s callback_flush_data_t;
142
143 enum queue_side_e
144 {
145   HEAD,
146   TAIL
147 };
148 typedef enum queue_side_e queue_side_t;
149
150 /* max length of socket command or response */
151 #define CMD_MAX 4096
152
153 /*
154  * Variables
155  */
156 static int stay_foreground = 0;
157
158 static listen_socket_t *listen_fds = NULL;
159 static size_t listen_fds_num = 0;
160
161 static int do_shutdown = 0;
162
163 static pthread_t queue_thread;
164
165 static pthread_t *connection_threads = NULL;
166 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
167 static int connection_threads_num = 0;
168
169 /* Cache stuff */
170 static GTree          *cache_tree = NULL;
171 static cache_item_t   *cache_queue_head = NULL;
172 static cache_item_t   *cache_queue_tail = NULL;
173 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
174 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
175
176 static int config_write_interval = 300;
177 static int config_write_jitter   = 0;
178 static int config_flush_interval = 3600;
179 static int config_flush_at_shutdown = 0;
180 static char *config_pid_file = NULL;
181 static char *config_base_dir = NULL;
182
183 static listen_socket_t **config_listen_address_list = NULL;
184 static int config_listen_address_list_len = 0;
185
186 static uint64_t stats_queue_length = 0;
187 static uint64_t stats_updates_received = 0;
188 static uint64_t stats_flush_received = 0;
189 static uint64_t stats_updates_written = 0;
190 static uint64_t stats_data_sets_written = 0;
191 static uint64_t stats_journal_bytes = 0;
192 static uint64_t stats_journal_rotate = 0;
193 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
194
195 /* Journaled updates */
196 static char *journal_cur = NULL;
197 static char *journal_old = NULL;
198 static FILE *journal_fh = NULL;
199 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
200 static int journal_write(char *cmd, char *args);
201 static void journal_done(void);
202 static void journal_rotate(void);
203
204 /* 
205  * Functions
206  */
207 static void sig_common (const char *sig) /* {{{ */
208 {
209   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
210   do_shutdown++;
211   pthread_cond_broadcast(&cache_cond);
212 } /* }}} void sig_common */
213
214 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
215 {
216   sig_common("INT");
217 } /* }}} void sig_int_handler */
218
219 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
220 {
221   sig_common("TERM");
222 } /* }}} void sig_term_handler */
223
224 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
225 {
226   config_flush_at_shutdown = 1;
227   sig_common("USR1");
228 } /* }}} void sig_usr1_handler */
229
230 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
231 {
232   config_flush_at_shutdown = 0;
233   sig_common("USR2");
234 } /* }}} void sig_usr2_handler */
235
236 static void install_signal_handlers(void) /* {{{ */
237 {
238   /* These structures are static, because `sigaction' behaves weird if the are
239    * overwritten.. */
240   static struct sigaction sa_int;
241   static struct sigaction sa_term;
242   static struct sigaction sa_pipe;
243   static struct sigaction sa_usr1;
244   static struct sigaction sa_usr2;
245
246   /* Install signal handlers */
247   memset (&sa_int, 0, sizeof (sa_int));
248   sa_int.sa_handler = sig_int_handler;
249   sigaction (SIGINT, &sa_int, NULL);
250
251   memset (&sa_term, 0, sizeof (sa_term));
252   sa_term.sa_handler = sig_term_handler;
253   sigaction (SIGTERM, &sa_term, NULL);
254
255   memset (&sa_pipe, 0, sizeof (sa_pipe));
256   sa_pipe.sa_handler = SIG_IGN;
257   sigaction (SIGPIPE, &sa_pipe, NULL);
258
259   memset (&sa_pipe, 0, sizeof (sa_usr1));
260   sa_usr1.sa_handler = sig_usr1_handler;
261   sigaction (SIGUSR1, &sa_usr1, NULL);
262
263   memset (&sa_usr2, 0, sizeof (sa_usr2));
264   sa_usr2.sa_handler = sig_usr2_handler;
265   sigaction (SIGUSR2, &sa_usr2, NULL);
266
267 } /* }}} void install_signal_handlers */
268
269 static int open_pidfile(void) /* {{{ */
270 {
271   int fd;
272   char *file;
273
274   file = (config_pid_file != NULL)
275     ? config_pid_file
276     : LOCALSTATEDIR "/run/rrdcached.pid";
277
278   fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
279   if (fd < 0)
280     fprintf(stderr, "FATAL: cannot create '%s' (%s)\n",
281             file, rrd_strerror(errno));
282
283   return(fd);
284 } /* }}} static int open_pidfile */
285
286 static int write_pidfile (int fd) /* {{{ */
287 {
288   pid_t pid;
289   FILE *fh;
290
291   pid = getpid ();
292
293   fh = fdopen (fd, "w");
294   if (fh == NULL)
295   {
296     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
297     close(fd);
298     return (-1);
299   }
300
301   fprintf (fh, "%i\n", (int) pid);
302   fclose (fh);
303
304   return (0);
305 } /* }}} int write_pidfile */
306
307 static int remove_pidfile (void) /* {{{ */
308 {
309   char *file;
310   int status;
311
312   file = (config_pid_file != NULL)
313     ? config_pid_file
314     : LOCALSTATEDIR "/run/rrdcached.pid";
315
316   status = unlink (file);
317   if (status == 0)
318     return (0);
319   return (errno);
320 } /* }}} int remove_pidfile */
321
322 static ssize_t sread (int fd, void *buffer_void, size_t buffer_size) /* {{{ */
323 {
324   char    *buffer;
325   size_t   buffer_used;
326   size_t   buffer_free;
327   ssize_t  status;
328
329   buffer       = (char *) buffer_void;
330   buffer_used  = 0;
331   buffer_free  = buffer_size;
332
333   while (buffer_free > 0)
334   {
335     status = read (fd, buffer + buffer_used, buffer_free);
336     if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
337       continue;
338
339     if (status < 0)
340       return (-1);
341
342     if (status == 0)
343       return (0);
344
345     assert ((0 > status) || (buffer_free >= (size_t) status));
346
347     buffer_free = buffer_free - status;
348     buffer_used = buffer_used + status;
349
350     if (buffer[buffer_used - 1] == '\n')
351       break;
352   }
353
354   assert (buffer_used > 0);
355
356   if (buffer[buffer_used - 1] != '\n')
357   {
358     errno = ENOBUFS;
359     return (-1);
360   }
361
362   buffer[buffer_used - 1] = 0;
363
364   /* Fix network line endings. */
365   if ((buffer_used > 1) && (buffer[buffer_used - 2] == '\r'))
366   {
367     buffer_used--;
368     buffer[buffer_used - 1] = 0;
369   }
370
371   return (buffer_used);
372 } /* }}} ssize_t sread */
373
374 static ssize_t swrite (int fd, const void *buf, size_t count) /* {{{ */
375 {
376   const char *ptr;
377   size_t      nleft;
378   ssize_t     status;
379
380   /* special case for journal replay */
381   if (fd < 0) return 0;
382
383   ptr   = (const char *) buf;
384   nleft = count;
385
386   while (nleft > 0)
387   {
388     status = write (fd, (const void *) ptr, nleft);
389
390     if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
391       continue;
392
393     if (status < 0)
394       return (status);
395
396     nleft -= status;
397     ptr   += status;
398   }
399
400   return (0);
401 } /* }}} ssize_t swrite */
402
403 static void _wipe_ci_values(cache_item_t *ci, time_t when)
404 {
405   ci->values = NULL;
406   ci->values_num = 0;
407
408   ci->last_flush_time = when;
409   if (config_write_jitter > 0)
410     ci->last_flush_time += (random() % config_write_jitter);
411
412   ci->flags &= ~(CI_FLAGS_IN_QUEUE);
413 }
414
415 /*
416  * enqueue_cache_item:
417  * `cache_lock' must be acquired before calling this function!
418  */
419 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
420     queue_side_t side)
421 {
422   int did_insert = 0;
423
424   if (ci == NULL)
425     return (-1);
426
427   if (ci->values_num == 0)
428     return (0);
429
430   if (side == HEAD)
431   {
432     if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
433     {
434       assert (ci->next == NULL);
435       ci->next = cache_queue_head;
436       cache_queue_head = ci;
437
438       if (cache_queue_tail == NULL)
439         cache_queue_tail = cache_queue_head;
440
441       did_insert = 1;
442     }
443     else if (cache_queue_head == ci)
444     {
445       /* do nothing */
446     }
447     else /* enqueued, but not first entry */
448     {
449       cache_item_t *prev;
450
451       /* find previous entry */
452       for (prev = cache_queue_head; prev != NULL; prev = prev->next)
453         if (prev->next == ci)
454           break;
455       assert (prev != NULL);
456
457       /* move to the front */
458       prev->next = ci->next;
459       ci->next = cache_queue_head;
460       cache_queue_head = ci;
461
462       /* check if we need to adapt the tail */
463       if (cache_queue_tail == ci)
464         cache_queue_tail = prev;
465     }
466   }
467   else /* (side == TAIL) */
468   {
469     /* We don't move values back in the list.. */
470     if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
471       return (0);
472
473     assert (ci->next == NULL);
474
475     if (cache_queue_tail == NULL)
476       cache_queue_head = ci;
477     else
478       cache_queue_tail->next = ci;
479     cache_queue_tail = ci;
480
481     did_insert = 1;
482   }
483
484   ci->flags |= CI_FLAGS_IN_QUEUE;
485
486   if (did_insert)
487   {
488     pthread_cond_broadcast(&cache_cond);
489     pthread_mutex_lock (&stats_lock);
490     stats_queue_length++;
491     pthread_mutex_unlock (&stats_lock);
492   }
493
494   return (0);
495 } /* }}} int enqueue_cache_item */
496
497 /*
498  * tree_callback_flush:
499  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
500  * while this is in progress.
501  */
502 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
503     gpointer data)
504 {
505   cache_item_t *ci;
506   callback_flush_data_t *cfd;
507
508   ci = (cache_item_t *) value;
509   cfd = (callback_flush_data_t *) data;
510
511   if ((ci->last_flush_time <= cfd->abs_timeout)
512       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
513       && (ci->values_num > 0))
514   {
515     enqueue_cache_item (ci, TAIL);
516   }
517   else if ((do_shutdown != 0)
518       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
519       && (ci->values_num > 0))
520   {
521     enqueue_cache_item (ci, TAIL);
522   }
523   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
524       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
525       && (ci->values_num <= 0))
526   {
527     char **temp;
528
529     temp = (char **) realloc (cfd->keys,
530         sizeof (char *) * (cfd->keys_num + 1));
531     if (temp == NULL)
532     {
533       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
534       return (FALSE);
535     }
536     cfd->keys = temp;
537     /* Make really sure this points to the _same_ place */
538     assert ((char *) key == ci->file);
539     cfd->keys[cfd->keys_num] = (char *) key;
540     cfd->keys_num++;
541   }
542
543   return (FALSE);
544 } /* }}} gboolean tree_callback_flush */
545
546 static int flush_old_values (int max_age)
547 {
548   callback_flush_data_t cfd;
549   size_t k;
550
551   memset (&cfd, 0, sizeof (cfd));
552   /* Pass the current time as user data so that we don't need to call
553    * `time' for each node. */
554   cfd.now = time (NULL);
555   cfd.keys = NULL;
556   cfd.keys_num = 0;
557
558   if (max_age > 0)
559     cfd.abs_timeout = cfd.now - max_age;
560   else
561     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
562
563   /* `tree_callback_flush' will return the keys of all values that haven't
564    * been touched in the last `config_flush_interval' seconds in `cfd'.
565    * The char*'s in this array point to the same memory as ci->file, so we
566    * don't need to free them separately. */
567   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
568
569   for (k = 0; k < cfd.keys_num; k++)
570   {
571     cache_item_t *ci;
572
573     /* This must not fail. */
574     ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
575     assert (ci != NULL);
576
577     /* If we end up here with values available, something's seriously
578      * messed up. */
579     assert (ci->values_num == 0);
580
581     /* Remove the node from the tree */
582     g_tree_remove (cache_tree, cfd.keys[k]);
583     cfd.keys[k] = NULL;
584
585     /* Now free and clean up `ci'. */
586     free (ci->file);
587     ci->file = NULL;
588     free (ci);
589     ci = NULL;
590   } /* for (k = 0; k < cfd.keys_num; k++) */
591
592   if (cfd.keys != NULL)
593   {
594     free (cfd.keys);
595     cfd.keys = NULL;
596   }
597
598   return (0);
599 } /* int flush_old_values */
600
601 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
602 {
603   struct timeval now;
604   struct timespec next_flush;
605   int final_flush = 0; /* make sure we only flush once on shutdown */
606
607   gettimeofday (&now, NULL);
608   next_flush.tv_sec = now.tv_sec + config_flush_interval;
609   next_flush.tv_nsec = 1000 * now.tv_usec;
610
611   pthread_mutex_lock (&cache_lock);
612   while ((do_shutdown == 0) || (cache_queue_head != NULL))
613   {
614     cache_item_t *ci;
615     char *file;
616     char **values;
617     int values_num;
618     int status;
619     int i;
620
621     /* First, check if it's time to do the cache flush. */
622     gettimeofday (&now, NULL);
623     if ((now.tv_sec > next_flush.tv_sec)
624         || ((now.tv_sec == next_flush.tv_sec)
625           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
626     {
627       /* Flush all values that haven't been written in the last
628        * `config_write_interval' seconds. */
629       flush_old_values (config_write_interval);
630
631       /* Determine the time of the next cache flush. */
632       while (next_flush.tv_sec <= now.tv_sec)
633         next_flush.tv_sec += config_flush_interval;
634
635       /* unlock the cache while we rotate so we don't block incoming
636        * updates if the fsync() blocks on disk I/O */
637       pthread_mutex_unlock(&cache_lock);
638       journal_rotate();
639       pthread_mutex_lock(&cache_lock);
640     }
641
642     /* Now, check if there's something to store away. If not, wait until
643      * something comes in or it's time to do the cache flush.  if we are
644      * shutting down, do not wait around.  */
645     if (cache_queue_head == NULL && !do_shutdown)
646     {
647       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
648       if ((status != 0) && (status != ETIMEDOUT))
649       {
650         RRDD_LOG (LOG_ERR, "queue_thread_main: "
651             "pthread_cond_timedwait returned %i.", status);
652       }
653     }
654
655     /* We're about to shut down */
656     if (do_shutdown != 0 && !final_flush++)
657     {
658       if (config_flush_at_shutdown)
659         flush_old_values (-1); /* flush everything */
660       else
661         break;
662     }
663
664     /* Check if a value has arrived. This may be NULL if we timed out or there
665      * was an interrupt such as a signal. */
666     if (cache_queue_head == NULL)
667       continue;
668
669     ci = cache_queue_head;
670
671     /* copy the relevant parts */
672     file = strdup (ci->file);
673     if (file == NULL)
674     {
675       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
676       continue;
677     }
678
679     assert(ci->values != NULL);
680     assert(ci->values_num > 0);
681
682     values = ci->values;
683     values_num = ci->values_num;
684
685     _wipe_ci_values(ci, time(NULL));
686
687     cache_queue_head = ci->next;
688     if (cache_queue_head == NULL)
689       cache_queue_tail = NULL;
690     ci->next = NULL;
691
692     pthread_mutex_lock (&stats_lock);
693     assert (stats_queue_length > 0);
694     stats_queue_length--;
695     pthread_mutex_unlock (&stats_lock);
696
697     pthread_mutex_unlock (&cache_lock);
698
699     rrd_clear_error ();
700     status = rrd_update_r (file, NULL, values_num, (void *) values);
701     if (status != 0)
702     {
703       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
704           "rrd_update_r (%s) failed with status %i. (%s)",
705           file, status, rrd_get_error());
706     }
707
708     journal_write("wrote", file);
709     pthread_cond_broadcast(&ci->flushed);
710
711     for (i = 0; i < values_num; i++)
712       free (values[i]);
713
714     free(values);
715     free(file);
716
717     if (status == 0)
718     {
719       pthread_mutex_lock (&stats_lock);
720       stats_updates_written++;
721       stats_data_sets_written += values_num;
722       pthread_mutex_unlock (&stats_lock);
723     }
724
725     pthread_mutex_lock (&cache_lock);
726
727     /* We're about to shut down */
728     if (do_shutdown != 0 && !final_flush++)
729     {
730       if (config_flush_at_shutdown)
731           flush_old_values (-1); /* flush everything */
732       else
733         break;
734     }
735   } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
736   pthread_mutex_unlock (&cache_lock);
737
738   if (config_flush_at_shutdown)
739   {
740     assert(cache_queue_head == NULL);
741     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
742   }
743
744   journal_done();
745
746   return (NULL);
747 } /* }}} void *queue_thread_main */
748
749 static int buffer_get_field (char **buffer_ret, /* {{{ */
750     size_t *buffer_size_ret, char **field_ret)
751 {
752   char *buffer;
753   size_t buffer_pos;
754   size_t buffer_size;
755   char *field;
756   size_t field_size;
757   int status;
758
759   buffer = *buffer_ret;
760   buffer_pos = 0;
761   buffer_size = *buffer_size_ret;
762   field = *buffer_ret;
763   field_size = 0;
764
765   if (buffer_size <= 0)
766     return (-1);
767
768   /* This is ensured by `handle_request'. */
769   assert (buffer[buffer_size - 1] == '\0');
770
771   status = -1;
772   while (buffer_pos < buffer_size)
773   {
774     /* Check for end-of-field or end-of-buffer */
775     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
776     {
777       field[field_size] = 0;
778       field_size++;
779       buffer_pos++;
780       status = 0;
781       break;
782     }
783     /* Handle escaped characters. */
784     else if (buffer[buffer_pos] == '\\')
785     {
786       if (buffer_pos >= (buffer_size - 1))
787         break;
788       buffer_pos++;
789       field[field_size] = buffer[buffer_pos];
790       field_size++;
791       buffer_pos++;
792     }
793     /* Normal operation */ 
794     else
795     {
796       field[field_size] = buffer[buffer_pos];
797       field_size++;
798       buffer_pos++;
799     }
800   } /* while (buffer_pos < buffer_size) */
801
802   if (status != 0)
803     return (status);
804
805   *buffer_ret = buffer + buffer_pos;
806   *buffer_size_ret = buffer_size - buffer_pos;
807   *field_ret = field;
808
809   return (0);
810 } /* }}} int buffer_get_field */
811
812 static int flush_file (const char *filename) /* {{{ */
813 {
814   cache_item_t *ci;
815
816   pthread_mutex_lock (&cache_lock);
817
818   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
819   if (ci == NULL)
820   {
821     pthread_mutex_unlock (&cache_lock);
822     return (ENOENT);
823   }
824
825   if (ci->values_num > 0)
826   {
827     /* Enqueue at head */
828     enqueue_cache_item (ci, HEAD);
829     pthread_cond_wait(&ci->flushed, &cache_lock);
830   }
831
832   pthread_mutex_unlock(&cache_lock);
833
834   return (0);
835 } /* }}} int flush_file */
836
837 static int handle_request_help (int fd, /* {{{ */
838     char *buffer, size_t buffer_size)
839 {
840   int status;
841   char **help_text;
842   size_t help_text_len;
843   char *command;
844   size_t i;
845
846   char *help_help[] =
847   {
848     "5 Command overview\n",
849     "FLUSH <filename>\n",
850     "FLUSHALL\n",
851     "HELP [<command>]\n",
852     "UPDATE <filename> <values> [<values> ...]\n",
853     "STATS\n"
854   };
855   size_t help_help_len = sizeof (help_help) / sizeof (help_help[0]);
856
857   char *help_flush[] =
858   {
859     "4 Help for FLUSH\n",
860     "Usage: FLUSH <filename>\n",
861     "\n",
862     "Adds the given filename to the head of the update queue and returns\n",
863     "after is has been dequeued.\n"
864   };
865   size_t help_flush_len = sizeof (help_flush) / sizeof (help_flush[0]);
866
867   char *help_flushall[] =
868   {
869     "3 Help for FLUSHALL\n",
870     "Usage: FLUSHALL\n",
871     "\n",
872     "Triggers writing of all pending updates.  Returns immediately.\n"
873   };
874   size_t help_flushall_len = sizeof(help_flushall) / sizeof(help_flushall[0]);
875
876   char *help_update[] =
877   {
878     "9 Help for UPDATE\n",
879     "Usage: UPDATE <filename> <values> [<values> ...]\n"
880     "\n",
881     "Adds the given file to the internal cache if it is not yet known and\n",
882     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n",
883     "for details.\n",
884     "\n",
885     "Each <values> has the following form:\n",
886     "  <values> = <time>:<value>[:<value>[...]]\n",
887     "See the rrdupdate(1) manpage for details.\n"
888   };
889   size_t help_update_len = sizeof (help_update) / sizeof (help_update[0]);
890
891   char *help_stats[] =
892   {
893     "4 Help for STATS\n",
894     "Usage: STATS\n",
895     "\n",
896     "Returns some performance counters, see the rrdcached(1) manpage for\n",
897     "a description of the values.\n"
898   };
899   size_t help_stats_len = sizeof (help_stats) / sizeof (help_stats[0]);
900
901   status = buffer_get_field (&buffer, &buffer_size, &command);
902   if (status != 0)
903   {
904     help_text = help_help;
905     help_text_len = help_help_len;
906   }
907   else
908   {
909     if (strcasecmp (command, "update") == 0)
910     {
911       help_text = help_update;
912       help_text_len = help_update_len;
913     }
914     else if (strcasecmp (command, "flush") == 0)
915     {
916       help_text = help_flush;
917       help_text_len = help_flush_len;
918     }
919     else if (strcasecmp (command, "flushall") == 0)
920     {
921       help_text = help_flushall;
922       help_text_len = help_flushall_len;
923     }
924     else if (strcasecmp (command, "stats") == 0)
925     {
926       help_text = help_stats;
927       help_text_len = help_stats_len;
928     }
929     else
930     {
931       help_text = help_help;
932       help_text_len = help_help_len;
933     }
934   }
935
936   for (i = 0; i < help_text_len; i++)
937   {
938     status = swrite (fd, help_text[i], strlen (help_text[i]));
939     if (status < 0)
940     {
941       status = errno;
942       RRDD_LOG (LOG_ERR, "handle_request_help: swrite returned an error.");
943       return (status);
944     }
945   }
946
947   return (0);
948 } /* }}} int handle_request_help */
949
950 static int handle_request_stats (int fd, /* {{{ */
951     char *buffer __attribute__((unused)),
952     size_t buffer_size __attribute__((unused)))
953 {
954   int status;
955   char outbuf[CMD_MAX];
956
957   uint64_t copy_queue_length;
958   uint64_t copy_updates_received;
959   uint64_t copy_flush_received;
960   uint64_t copy_updates_written;
961   uint64_t copy_data_sets_written;
962   uint64_t copy_journal_bytes;
963   uint64_t copy_journal_rotate;
964
965   uint64_t tree_nodes_number;
966   uint64_t tree_depth;
967
968   pthread_mutex_lock (&stats_lock);
969   copy_queue_length       = stats_queue_length;
970   copy_updates_received   = stats_updates_received;
971   copy_flush_received     = stats_flush_received;
972   copy_updates_written    = stats_updates_written;
973   copy_data_sets_written  = stats_data_sets_written;
974   copy_journal_bytes      = stats_journal_bytes;
975   copy_journal_rotate     = stats_journal_rotate;
976   pthread_mutex_unlock (&stats_lock);
977
978   pthread_mutex_lock (&cache_lock);
979   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
980   tree_depth        = (uint64_t) g_tree_height (cache_tree);
981   pthread_mutex_unlock (&cache_lock);
982
983 #define RRDD_STATS_SEND \
984   outbuf[sizeof (outbuf) - 1] = 0; \
985   status = swrite (fd, outbuf, strlen (outbuf)); \
986   if (status < 0) \
987   { \
988     status = errno; \
989     RRDD_LOG (LOG_INFO, "handle_request_stats: swrite returned an error."); \
990     return (status); \
991   }
992
993   strncpy (outbuf, "9 Statistics follow\n", sizeof (outbuf));
994   RRDD_STATS_SEND;
995
996   snprintf (outbuf, sizeof (outbuf),
997       "QueueLength: %"PRIu64"\n", copy_queue_length);
998   RRDD_STATS_SEND;
999
1000   snprintf (outbuf, sizeof (outbuf),
1001       "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1002   RRDD_STATS_SEND;
1003
1004   snprintf (outbuf, sizeof (outbuf),
1005       "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1006   RRDD_STATS_SEND;
1007
1008   snprintf (outbuf, sizeof (outbuf),
1009       "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1010   RRDD_STATS_SEND;
1011
1012   snprintf (outbuf, sizeof (outbuf),
1013       "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1014   RRDD_STATS_SEND;
1015
1016   snprintf (outbuf, sizeof (outbuf),
1017       "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1018   RRDD_STATS_SEND;
1019
1020   snprintf (outbuf, sizeof (outbuf),
1021       "TreeDepth: %"PRIu64"\n", tree_depth);
1022   RRDD_STATS_SEND;
1023
1024   snprintf (outbuf, sizeof(outbuf),
1025       "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1026   RRDD_STATS_SEND;
1027
1028   snprintf (outbuf, sizeof(outbuf),
1029       "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1030   RRDD_STATS_SEND;
1031
1032   return (0);
1033 #undef RRDD_STATS_SEND
1034 } /* }}} int handle_request_stats */
1035
1036 static int handle_request_flush (int fd, /* {{{ */
1037     char *buffer, size_t buffer_size)
1038 {
1039   char *file;
1040   int status;
1041   char result[CMD_MAX];
1042
1043   status = buffer_get_field (&buffer, &buffer_size, &file);
1044   if (status != 0)
1045   {
1046     strncpy (result, "-1 Usage: flush <filename>\n", sizeof (result));
1047   }
1048   else
1049   {
1050     pthread_mutex_lock(&stats_lock);
1051     stats_flush_received++;
1052     pthread_mutex_unlock(&stats_lock);
1053
1054     status = flush_file (file);
1055     if (status == 0)
1056       snprintf (result, sizeof (result), "0 Successfully flushed %s.\n", file);
1057     else if (status == ENOENT)
1058     {
1059       /* no file in our tree; see whether it exists at all */
1060       struct stat statbuf;
1061
1062       memset(&statbuf, 0, sizeof(statbuf));
1063       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1064         snprintf (result, sizeof (result), "0 Nothing to flush: %s.\n", file);
1065       else
1066         snprintf (result, sizeof (result), "-1 No such file: %s.\n", file);
1067     }
1068     else if (status < 0)
1069       strncpy (result, "-1 Internal error.\n", sizeof (result));
1070     else
1071       snprintf (result, sizeof (result), "-1 Failed with status %i.\n", status);
1072   }
1073   result[sizeof (result) - 1] = 0;
1074
1075   status = swrite (fd, result, strlen (result));
1076   if (status < 0)
1077   {
1078     status = errno;
1079     RRDD_LOG (LOG_INFO, "handle_request_flush: swrite returned an error.");
1080     return (status);
1081   }
1082
1083   return (0);
1084 } /* }}} int handle_request_flush */
1085
1086 static int handle_request_flushall(int fd) /* {{{ */
1087 {
1088   int status;
1089   char answer[] ="0 Started flush.\n";
1090
1091   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1092
1093   pthread_mutex_lock(&cache_lock);
1094   flush_old_values(-1);
1095   pthread_mutex_unlock(&cache_lock);
1096
1097   status = swrite(fd, answer, strlen(answer));
1098   if (status < 0)
1099   {
1100     status = errno;
1101     RRDD_LOG(LOG_INFO, "handle_request_flushall: swrite returned an error.");
1102   }
1103
1104   return (status);
1105 } /* }}} static int handle_request_flushall */
1106
1107 static int handle_request_update (int fd, /* {{{ */
1108     char *buffer, size_t buffer_size)
1109 {
1110   char *file;
1111   int values_num = 0;
1112   int status;
1113
1114   time_t now;
1115
1116   cache_item_t *ci;
1117   char answer[CMD_MAX];
1118
1119 #define RRDD_UPDATE_SEND \
1120   answer[sizeof (answer) - 1] = 0; \
1121   status = swrite (fd, answer, strlen (answer)); \
1122   if (status < 0) \
1123   { \
1124     status = errno; \
1125     RRDD_LOG (LOG_INFO, "handle_request_update: swrite returned an error."); \
1126     return (status); \
1127   }
1128
1129   now = time (NULL);
1130
1131   status = buffer_get_field (&buffer, &buffer_size, &file);
1132   if (status != 0)
1133   {
1134     strncpy (answer, "-1 Usage: UPDATE <filename> <values> [<values> ...]\n",
1135         sizeof (answer));
1136     RRDD_UPDATE_SEND;
1137     return (0);
1138   }
1139
1140   pthread_mutex_lock(&stats_lock);
1141   stats_updates_received++;
1142   pthread_mutex_unlock(&stats_lock);
1143
1144   pthread_mutex_lock (&cache_lock);
1145   ci = g_tree_lookup (cache_tree, file);
1146
1147   if (ci == NULL) /* {{{ */
1148   {
1149     struct stat statbuf;
1150
1151     /* don't hold the lock while we setup; stat(2) might block */
1152     pthread_mutex_unlock(&cache_lock);
1153
1154     memset (&statbuf, 0, sizeof (statbuf));
1155     status = stat (file, &statbuf);
1156     if (status != 0)
1157     {
1158       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1159
1160       status = errno;
1161       if (status == ENOENT)
1162         snprintf (answer, sizeof (answer), "-1 No such file: %s\n", file);
1163       else
1164         snprintf (answer, sizeof (answer), "-1 stat failed with error %i.\n",
1165             status);
1166       RRDD_UPDATE_SEND;
1167       return (0);
1168     }
1169     if (!S_ISREG (statbuf.st_mode))
1170     {
1171       snprintf (answer, sizeof (answer), "-1 Not a regular file: %s\n", file);
1172       RRDD_UPDATE_SEND;
1173       return (0);
1174     }
1175     if (access(file, R_OK|W_OK) != 0)
1176     {
1177       snprintf (answer, sizeof (answer), "-1 Cannot read/write %s: %s\n",
1178                 file, rrd_strerror(errno));
1179       RRDD_UPDATE_SEND;
1180       return (0);
1181     }
1182
1183     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1184     if (ci == NULL)
1185     {
1186       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1187
1188       strncpy (answer, "-1 malloc failed.\n", sizeof (answer));
1189       RRDD_UPDATE_SEND;
1190       return (0);
1191     }
1192     memset (ci, 0, sizeof (cache_item_t));
1193
1194     ci->file = strdup (file);
1195     if (ci->file == NULL)
1196     {
1197       free (ci);
1198       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1199
1200       strncpy (answer, "-1 strdup failed.\n", sizeof (answer));
1201       RRDD_UPDATE_SEND;
1202       return (0);
1203     }
1204
1205     _wipe_ci_values(ci, now);
1206     ci->flags = CI_FLAGS_IN_TREE;
1207
1208     pthread_mutex_lock(&cache_lock);
1209     g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1210   } /* }}} */
1211   assert (ci != NULL);
1212
1213   while (buffer_size > 0)
1214   {
1215     char **temp;
1216     char *value;
1217
1218     status = buffer_get_field (&buffer, &buffer_size, &value);
1219     if (status != 0)
1220     {
1221       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1222       break;
1223     }
1224
1225     temp = (char **) realloc (ci->values,
1226         sizeof (char *) * (ci->values_num + 1));
1227     if (temp == NULL)
1228     {
1229       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1230       continue;
1231     }
1232     ci->values = temp;
1233
1234     ci->values[ci->values_num] = strdup (value);
1235     if (ci->values[ci->values_num] == NULL)
1236     {
1237       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1238       continue;
1239     }
1240     ci->values_num++;
1241
1242     values_num++;
1243   }
1244
1245   if (((now - ci->last_flush_time) >= config_write_interval)
1246       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1247       && (ci->values_num > 0))
1248   {
1249     enqueue_cache_item (ci, TAIL);
1250   }
1251
1252   pthread_mutex_unlock (&cache_lock);
1253
1254   if (values_num < 1)
1255   {
1256     strncpy (answer, "-1 No values updated.\n", sizeof (answer));
1257   }
1258   else
1259   {
1260     snprintf (answer, sizeof (answer), "0 Enqueued %i value%s\n", values_num,
1261         (values_num == 1) ? "" : "s");
1262   }
1263   RRDD_UPDATE_SEND;
1264   return (0);
1265 #undef RRDD_UPDATE_SEND
1266 } /* }}} int handle_request_update */
1267
1268 /* we came across a "WROTE" entry during journal replay.
1269  * throw away any values that we have accumulated for this file
1270  */
1271 static int handle_request_wrote (int fd __attribute__((unused)), /* {{{ */
1272                                  const char *buffer,
1273                                  size_t buffer_size __attribute__((unused)))
1274 {
1275   int i;
1276   cache_item_t *ci;
1277   const char *file = buffer;
1278
1279   pthread_mutex_lock(&cache_lock);
1280
1281   ci = g_tree_lookup(cache_tree, file);
1282   if (ci == NULL)
1283   {
1284     pthread_mutex_unlock(&cache_lock);
1285     return (0);
1286   }
1287
1288   if (ci->values)
1289   {
1290     for (i=0; i < ci->values_num; i++)
1291       free(ci->values[i]);
1292
1293     free(ci->values);
1294   }
1295
1296   _wipe_ci_values(ci, time(NULL));
1297
1298   pthread_mutex_unlock(&cache_lock);
1299   return (0);
1300 } /* }}} int handle_request_wrote */
1301
1302 /* returns 1 if we have the required privilege level */
1303 static int has_privilege (socket_privilege priv, /* {{{ */
1304                           socket_privilege required, int fd)
1305 {
1306   int status;
1307   char error[CMD_MAX];
1308
1309   if (priv >= required)
1310     return 1;
1311
1312   sprintf(error, "-1 %s\n", rrd_strerror(EACCES));
1313   status = swrite(fd, error, strlen(error));
1314
1315   if (status < 0)
1316     return status;
1317   else
1318     return 0;
1319 } /* }}} static int has_privilege */
1320
1321 /* if fd < 0, we are in journal replay mode */
1322 static int handle_request (int fd, socket_privilege privilege, /* {{{ */
1323                            char *buffer, size_t buffer_size)
1324 {
1325   char *buffer_ptr;
1326   char *command;
1327   int status;
1328
1329   assert (buffer[buffer_size - 1] == '\0');
1330
1331   buffer_ptr = buffer;
1332   command = NULL;
1333   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1334   if (status != 0)
1335   {
1336     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1337     return (-1);
1338   }
1339
1340   if (strcasecmp (command, "update") == 0)
1341   {
1342     /* don't re-write updates in replay mode */
1343     if (fd >= 0)
1344       journal_write(command, buffer_ptr);
1345
1346     status = has_privilege(privilege, PRIV_HIGH, fd);
1347     if (status <= 0)
1348       return status;
1349
1350     return (handle_request_update (fd, buffer_ptr, buffer_size));
1351   }
1352   else if (strcasecmp (command, "wrote") == 0 && fd < 0)
1353   {
1354     /* this is only valid in replay mode */
1355     return (handle_request_wrote (fd, buffer_ptr, buffer_size));
1356   }
1357   else if (strcasecmp (command, "flush") == 0)
1358   {
1359     return (handle_request_flush (fd, buffer_ptr, buffer_size));
1360   }
1361   else if (strcasecmp (command, "flushall") == 0)
1362   {
1363     status = has_privilege(privilege, PRIV_HIGH, fd);
1364     if (status <= 0)
1365       return status;
1366
1367     return (handle_request_flushall(fd));
1368   }
1369   else if (strcasecmp (command, "stats") == 0)
1370   {
1371     return (handle_request_stats (fd, buffer_ptr, buffer_size));
1372   }
1373   else if (strcasecmp (command, "help") == 0)
1374   {
1375     return (handle_request_help (fd, buffer_ptr, buffer_size));
1376   }
1377   else
1378   {
1379     char result[CMD_MAX];
1380
1381     snprintf (result, sizeof (result), "-1 Unknown command: %s\n", command);
1382     result[sizeof (result) - 1] = 0;
1383
1384     status = swrite (fd, result, strlen (result));
1385     if (status < 0)
1386     {
1387       RRDD_LOG (LOG_ERR, "handle_request: swrite failed.");
1388       return (-1);
1389     }
1390   }
1391
1392   return (0);
1393 } /* }}} int handle_request */
1394
1395 /* MUST NOT hold journal_lock before calling this */
1396 static void journal_rotate(void) /* {{{ */
1397 {
1398   FILE *old_fh = NULL;
1399
1400   if (journal_cur == NULL || journal_old == NULL)
1401     return;
1402
1403   pthread_mutex_lock(&journal_lock);
1404
1405   /* we rotate this way (rename before close) so that the we can release
1406    * the journal lock as fast as possible.  Journal writes to the new
1407    * journal can proceed immediately after the new file is opened.  The
1408    * fclose can then block without affecting new updates.
1409    */
1410   if (journal_fh != NULL)
1411   {
1412     old_fh = journal_fh;
1413     rename(journal_cur, journal_old);
1414     ++stats_journal_rotate;
1415   }
1416
1417   journal_fh = fopen(journal_cur, "a");
1418   pthread_mutex_unlock(&journal_lock);
1419
1420   if (old_fh != NULL)
1421     fclose(old_fh);
1422
1423   if (journal_fh == NULL)
1424   {
1425     RRDD_LOG(LOG_CRIT,
1426              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1427              journal_cur, rrd_strerror(errno));
1428
1429     RRDD_LOG(LOG_ERR,
1430              "JOURNALING DISABLED: All values will be flushed at shutdown");
1431     config_flush_at_shutdown = 1;
1432   }
1433
1434 } /* }}} static void journal_rotate */
1435
1436 static void journal_done(void) /* {{{ */
1437 {
1438   if (journal_cur == NULL)
1439     return;
1440
1441   pthread_mutex_lock(&journal_lock);
1442   if (journal_fh != NULL)
1443   {
1444     fclose(journal_fh);
1445     journal_fh = NULL;
1446   }
1447
1448   if (config_flush_at_shutdown)
1449   {
1450     RRDD_LOG(LOG_INFO, "removing journals");
1451     unlink(journal_old);
1452     unlink(journal_cur);
1453   }
1454   else
1455   {
1456     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1457              "journals will be used at next startup");
1458   }
1459
1460   pthread_mutex_unlock(&journal_lock);
1461
1462 } /* }}} static void journal_done */
1463
1464 static int journal_write(char *cmd, char *args) /* {{{ */
1465 {
1466   int chars;
1467
1468   if (journal_fh == NULL)
1469     return 0;
1470
1471   pthread_mutex_lock(&journal_lock);
1472   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1473   pthread_mutex_unlock(&journal_lock);
1474
1475   if (chars > 0)
1476   {
1477     pthread_mutex_lock(&stats_lock);
1478     stats_journal_bytes += chars;
1479     pthread_mutex_unlock(&stats_lock);
1480   }
1481
1482   return chars;
1483 } /* }}} static int journal_write */
1484
1485 static int journal_replay (const char *file) /* {{{ */
1486 {
1487   FILE *fh;
1488   int entry_cnt = 0;
1489   int fail_cnt = 0;
1490   uint64_t line = 0;
1491   char entry[CMD_MAX];
1492
1493   if (file == NULL) return 0;
1494
1495   fh = fopen(file, "r");
1496   if (fh == NULL)
1497   {
1498     if (errno != ENOENT)
1499       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1500                file, rrd_strerror(errno));
1501     return 0;
1502   }
1503   else
1504     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1505
1506   while(!feof(fh))
1507   {
1508     size_t entry_len;
1509
1510     ++line;
1511     if (fgets(entry, sizeof(entry), fh) == NULL)
1512       break;
1513     entry_len = strlen(entry);
1514
1515     /* check \n termination in case journal writing crashed mid-line */
1516     if (entry_len == 0)
1517       continue;
1518     else if (entry[entry_len - 1] != '\n')
1519     {
1520       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1521       ++fail_cnt;
1522       continue;
1523     }
1524
1525     entry[entry_len - 1] = '\0';
1526
1527     if (handle_request(-1, PRIV_HIGH, entry, entry_len) == 0)
1528       ++entry_cnt;
1529     else
1530       ++fail_cnt;
1531   }
1532
1533   fclose(fh);
1534
1535   if (entry_cnt > 0)
1536   {
1537     RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1538              entry_cnt, fail_cnt);
1539     return 1;
1540   }
1541   else
1542     return 0;
1543
1544 } /* }}} static int journal_replay */
1545
1546 static void *connection_thread_main (void *args) /* {{{ */
1547 {
1548   pthread_t self;
1549   listen_socket_t *sock;
1550   int i;
1551   int fd;
1552
1553   sock = (listen_socket_t *) args;
1554   fd = sock->fd;
1555
1556   pthread_mutex_lock (&connection_threads_lock);
1557   {
1558     pthread_t *temp;
1559
1560     temp = (pthread_t *) realloc (connection_threads,
1561         sizeof (pthread_t) * (connection_threads_num + 1));
1562     if (temp == NULL)
1563     {
1564       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1565     }
1566     else
1567     {
1568       connection_threads = temp;
1569       connection_threads[connection_threads_num] = pthread_self ();
1570       connection_threads_num++;
1571     }
1572   }
1573   pthread_mutex_unlock (&connection_threads_lock);
1574
1575   while (do_shutdown == 0)
1576   {
1577     char buffer[CMD_MAX];
1578
1579     struct pollfd pollfd;
1580     int status;
1581
1582     pollfd.fd = fd;
1583     pollfd.events = POLLIN | POLLPRI;
1584     pollfd.revents = 0;
1585
1586     status = poll (&pollfd, 1, /* timeout = */ 500);
1587     if (do_shutdown)
1588       break;
1589     else if (status == 0) /* timeout */
1590       continue;
1591     else if (status < 0) /* error */
1592     {
1593       status = errno;
1594       if (status == EINTR)
1595         continue;
1596       RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1597       continue;
1598     }
1599
1600     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1601     {
1602       close (fd);
1603       break;
1604     }
1605     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1606     {
1607       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1608           "poll(2) returned something unexpected: %#04hx",
1609           pollfd.revents);
1610       close (fd);
1611       break;
1612     }
1613
1614     status = (int) sread (fd, buffer, sizeof (buffer));
1615     if (status <= 0)
1616     {
1617       close (fd);
1618
1619       if (status < 0)
1620         RRDD_LOG(LOG_ERR, "connection_thread_main: sread failed.");
1621
1622       break;
1623     }
1624
1625     status = handle_request (fd, sock->privilege, buffer, status);
1626     if (status != 0)
1627       break;
1628   }
1629
1630   close(fd);
1631   free(args);
1632
1633   self = pthread_self ();
1634   /* Remove this thread from the connection threads list */
1635   pthread_mutex_lock (&connection_threads_lock);
1636   /* Find out own index in the array */
1637   for (i = 0; i < connection_threads_num; i++)
1638     if (pthread_equal (connection_threads[i], self) != 0)
1639       break;
1640   assert (i < connection_threads_num);
1641
1642   /* Move the trailing threads forward. */
1643   if (i < (connection_threads_num - 1))
1644   {
1645     memmove (connection_threads + i,
1646         connection_threads + i + 1,
1647         sizeof (pthread_t) * (connection_threads_num - i - 1));
1648   }
1649
1650   connection_threads_num--;
1651   pthread_mutex_unlock (&connection_threads_lock);
1652
1653   return (NULL);
1654 } /* }}} void *connection_thread_main */
1655
1656 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
1657 {
1658   int fd;
1659   struct sockaddr_un sa;
1660   listen_socket_t *temp;
1661   int status;
1662   const char *path;
1663
1664   path = sock->addr;
1665   if (strncmp(path, "unix:", strlen("unix:")) == 0)
1666     path += strlen("unix:");
1667
1668   temp = (listen_socket_t *) realloc (listen_fds,
1669       sizeof (listen_fds[0]) * (listen_fds_num + 1));
1670   if (temp == NULL)
1671   {
1672     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1673     return (-1);
1674   }
1675   listen_fds = temp;
1676   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
1677
1678   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
1679   if (fd < 0)
1680   {
1681     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
1682     return (-1);
1683   }
1684
1685   memset (&sa, 0, sizeof (sa));
1686   sa.sun_family = AF_UNIX;
1687   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
1688
1689   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
1690   if (status != 0)
1691   {
1692     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1693     close (fd);
1694     unlink (path);
1695     return (-1);
1696   }
1697
1698   status = listen (fd, /* backlog = */ 10);
1699   if (status != 0)
1700   {
1701     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
1702     close (fd);
1703     unlink (path);
1704     return (-1);
1705   }
1706
1707   listen_fds[listen_fds_num].fd = fd;
1708   listen_fds[listen_fds_num].family = PF_UNIX;
1709   strncpy(listen_fds[listen_fds_num].addr, path,
1710           sizeof (listen_fds[listen_fds_num].addr) - 1);
1711   listen_fds_num++;
1712
1713   return (0);
1714 } /* }}} int open_listen_socket_unix */
1715
1716 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
1717 {
1718   struct addrinfo ai_hints;
1719   struct addrinfo *ai_res;
1720   struct addrinfo *ai_ptr;
1721   char addr_copy[NI_MAXHOST];
1722   char *addr;
1723   char *port;
1724   int status;
1725
1726   strncpy (addr_copy, sock->addr, sizeof (addr_copy));
1727   addr_copy[sizeof (addr_copy) - 1] = 0;
1728   addr = addr_copy;
1729
1730   memset (&ai_hints, 0, sizeof (ai_hints));
1731   ai_hints.ai_flags = 0;
1732 #ifdef AI_ADDRCONFIG
1733   ai_hints.ai_flags |= AI_ADDRCONFIG;
1734 #endif
1735   ai_hints.ai_family = AF_UNSPEC;
1736   ai_hints.ai_socktype = SOCK_STREAM;
1737
1738   port = NULL;
1739   if (*addr == '[') /* IPv6+port format */
1740   {
1741     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
1742     addr++;
1743
1744     port = strchr (addr, ']');
1745     if (port == NULL)
1746     {
1747       RRDD_LOG (LOG_ERR, "open_listen_socket_network: Malformed address: %s",
1748           sock->addr);
1749       return (-1);
1750     }
1751     *port = 0;
1752     port++;
1753
1754     if (*port == ':')
1755       port++;
1756     else if (*port == 0)
1757       port = NULL;
1758     else
1759     {
1760       RRDD_LOG (LOG_ERR, "open_listen_socket_network: Garbage after address: %s",
1761           port);
1762       return (-1);
1763     }
1764   } /* if (*addr = ']') */
1765   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
1766   {
1767     port = rindex(addr, ':');
1768     if (port != NULL)
1769     {
1770       *port = 0;
1771       port++;
1772     }
1773   }
1774   ai_res = NULL;
1775   status = getaddrinfo (addr,
1776                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
1777                         &ai_hints, &ai_res);
1778   if (status != 0)
1779   {
1780     RRDD_LOG (LOG_ERR, "open_listen_socket_network: getaddrinfo(%s) failed: "
1781         "%s", addr, gai_strerror (status));
1782     return (-1);
1783   }
1784
1785   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
1786   {
1787     int fd;
1788     listen_socket_t *temp;
1789     int one = 1;
1790
1791     temp = (listen_socket_t *) realloc (listen_fds,
1792         sizeof (listen_fds[0]) * (listen_fds_num + 1));
1793     if (temp == NULL)
1794     {
1795       RRDD_LOG (LOG_ERR, "open_listen_socket_network: realloc failed.");
1796       continue;
1797     }
1798     listen_fds = temp;
1799     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
1800
1801     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
1802     if (fd < 0)
1803     {
1804       RRDD_LOG (LOG_ERR, "open_listen_socket_network: socket(2) failed.");
1805       continue;
1806     }
1807
1808     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
1809
1810     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
1811     if (status != 0)
1812     {
1813       RRDD_LOG (LOG_ERR, "open_listen_socket_network: bind(2) failed.");
1814       close (fd);
1815       continue;
1816     }
1817
1818     status = listen (fd, /* backlog = */ 10);
1819     if (status != 0)
1820     {
1821       RRDD_LOG (LOG_ERR, "open_listen_socket_network: listen(2) failed.");
1822       close (fd);
1823       return (-1);
1824     }
1825
1826     listen_fds[listen_fds_num].fd = fd;
1827     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
1828     listen_fds_num++;
1829   } /* for (ai_ptr) */
1830
1831   return (0);
1832 } /* }}} static int open_listen_socket_network */
1833
1834 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
1835 {
1836   assert(sock != NULL);
1837   assert(sock->addr != NULL);
1838
1839   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
1840       || sock->addr[0] == '/')
1841     return (open_listen_socket_unix(sock));
1842   else
1843     return (open_listen_socket_network(sock));
1844 } /* }}} int open_listen_socket */
1845
1846 static int close_listen_sockets (void) /* {{{ */
1847 {
1848   size_t i;
1849
1850   for (i = 0; i < listen_fds_num; i++)
1851   {
1852     close (listen_fds[i].fd);
1853
1854     if (listen_fds[i].family == PF_UNIX)
1855       unlink(listen_fds[i].addr);
1856   }
1857
1858   free (listen_fds);
1859   listen_fds = NULL;
1860   listen_fds_num = 0;
1861
1862   return (0);
1863 } /* }}} int close_listen_sockets */
1864
1865 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
1866 {
1867   struct pollfd *pollfds;
1868   int pollfds_num;
1869   int status;
1870   int i;
1871
1872   for (i = 0; i < config_listen_address_list_len; i++)
1873     open_listen_socket (config_listen_address_list[i]);
1874
1875   if (config_listen_address_list_len < 1)
1876   {
1877     listen_socket_t sock;
1878     memset(&sock, 0, sizeof(sock));
1879     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
1880     open_listen_socket (&sock);
1881   }
1882
1883   if (listen_fds_num < 1)
1884   {
1885     RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
1886         "could be opened. Sorry.");
1887     return (NULL);
1888   }
1889
1890   pollfds_num = listen_fds_num;
1891   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
1892   if (pollfds == NULL)
1893   {
1894     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1895     return (NULL);
1896   }
1897   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
1898
1899   RRDD_LOG(LOG_INFO, "listening for connections");
1900
1901   while (do_shutdown == 0)
1902   {
1903     assert (pollfds_num == ((int) listen_fds_num));
1904     for (i = 0; i < pollfds_num; i++)
1905     {
1906       pollfds[i].fd = listen_fds[i].fd;
1907       pollfds[i].events = POLLIN | POLLPRI;
1908       pollfds[i].revents = 0;
1909     }
1910
1911     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
1912     if (do_shutdown)
1913       break;
1914     else if (status == 0) /* timeout */
1915       continue;
1916     else if (status < 0) /* error */
1917     {
1918       status = errno;
1919       if (status != EINTR)
1920       {
1921         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
1922       }
1923       continue;
1924     }
1925
1926     for (i = 0; i < pollfds_num; i++)
1927     {
1928       listen_socket_t *client_sock;
1929       struct sockaddr_storage client_sa;
1930       socklen_t client_sa_size;
1931       pthread_t tid;
1932       pthread_attr_t attr;
1933
1934       if (pollfds[i].revents == 0)
1935         continue;
1936
1937       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
1938       {
1939         RRDD_LOG (LOG_ERR, "listen_thread_main: "
1940             "poll(2) returned something unexpected for listen FD #%i.",
1941             pollfds[i].fd);
1942         continue;
1943       }
1944
1945       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
1946       if (client_sock == NULL)
1947       {
1948         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1949         continue;
1950       }
1951       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
1952
1953       client_sa_size = sizeof (client_sa);
1954       client_sock->fd = accept (pollfds[i].fd,
1955           (struct sockaddr *) &client_sa, &client_sa_size);
1956       if (client_sock->fd < 0)
1957       {
1958         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
1959         free(client_sock);
1960         continue;
1961       }
1962
1963       pthread_attr_init (&attr);
1964       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
1965
1966       status = pthread_create (&tid, &attr, connection_thread_main,
1967                                client_sock);
1968       if (status != 0)
1969       {
1970         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
1971         close (client_sock->fd);
1972         free (client_sock);
1973         continue;
1974       }
1975     } /* for (pollfds_num) */
1976   } /* while (do_shutdown == 0) */
1977
1978   RRDD_LOG(LOG_INFO, "starting shutdown");
1979
1980   close_listen_sockets ();
1981
1982   pthread_mutex_lock (&connection_threads_lock);
1983   while (connection_threads_num > 0)
1984   {
1985     pthread_t wait_for;
1986
1987     wait_for = connection_threads[0];
1988
1989     pthread_mutex_unlock (&connection_threads_lock);
1990     pthread_join (wait_for, /* retval = */ NULL);
1991     pthread_mutex_lock (&connection_threads_lock);
1992   }
1993   pthread_mutex_unlock (&connection_threads_lock);
1994
1995   return (NULL);
1996 } /* }}} void *listen_thread_main */
1997
1998 static int daemonize (void) /* {{{ */
1999 {
2000   int status;
2001   int fd;
2002   char *base_dir;
2003
2004   fd = open_pidfile();
2005   if (fd < 0) return fd;
2006
2007   if (!stay_foreground)
2008   {
2009     pid_t child;
2010
2011     child = fork ();
2012     if (child < 0)
2013     {
2014       fprintf (stderr, "daemonize: fork(2) failed.\n");
2015       return (-1);
2016     }
2017     else if (child > 0)
2018     {
2019       return (1);
2020     }
2021
2022     /* Become session leader */
2023     setsid ();
2024
2025     /* Open the first three file descriptors to /dev/null */
2026     close (2);
2027     close (1);
2028     close (0);
2029
2030     open ("/dev/null", O_RDWR);
2031     dup (0);
2032     dup (0);
2033   } /* if (!stay_foreground) */
2034
2035   /* Change into the /tmp directory. */
2036   base_dir = (config_base_dir != NULL)
2037     ? config_base_dir
2038     : "/tmp";
2039   status = chdir (base_dir);
2040   if (status != 0)
2041   {
2042     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2043     return (-1);
2044   }
2045
2046   install_signal_handlers();
2047
2048   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2049   RRDD_LOG(LOG_INFO, "starting up");
2050
2051   cache_tree = g_tree_new ((GCompareFunc) strcmp);
2052   if (cache_tree == NULL)
2053   {
2054     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2055     return (-1);
2056   }
2057
2058   status = write_pidfile (fd);
2059   return status;
2060 } /* }}} int daemonize */
2061
2062 static int cleanup (void) /* {{{ */
2063 {
2064   do_shutdown++;
2065
2066   pthread_cond_signal (&cache_cond);
2067   pthread_join (queue_thread, /* return = */ NULL);
2068
2069   remove_pidfile ();
2070
2071   RRDD_LOG(LOG_INFO, "goodbye");
2072   closelog ();
2073
2074   return (0);
2075 } /* }}} int cleanup */
2076
2077 static int read_options (int argc, char **argv) /* {{{ */
2078 {
2079   int option;
2080   int status = 0;
2081
2082   while ((option = getopt(argc, argv, "gl:L:f:w:b:z:p:j:h?F")) != -1)
2083   {
2084     switch (option)
2085     {
2086       case 'g':
2087         stay_foreground=1;
2088         break;
2089
2090       case 'L':
2091       case 'l':
2092       {
2093         listen_socket_t **temp;
2094         listen_socket_t *new;
2095
2096         new = malloc(sizeof(listen_socket_t));
2097         if (new == NULL)
2098         {
2099           fprintf(stderr, "read_options: malloc failed.\n");
2100           return(2);
2101         }
2102         memset(new, 0, sizeof(listen_socket_t));
2103
2104         temp = (listen_socket_t **) realloc (config_listen_address_list,
2105             sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2106         if (temp == NULL)
2107         {
2108           fprintf (stderr, "read_options: realloc failed.\n");
2109           return (2);
2110         }
2111         config_listen_address_list = temp;
2112
2113         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2114         new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2115
2116         temp[config_listen_address_list_len] = new;
2117         config_listen_address_list_len++;
2118       }
2119       break;
2120
2121       case 'f':
2122       {
2123         int temp;
2124
2125         temp = atoi (optarg);
2126         if (temp > 0)
2127           config_flush_interval = temp;
2128         else
2129         {
2130           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2131           status = 3;
2132         }
2133       }
2134       break;
2135
2136       case 'w':
2137       {
2138         int temp;
2139
2140         temp = atoi (optarg);
2141         if (temp > 0)
2142           config_write_interval = temp;
2143         else
2144         {
2145           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2146           status = 2;
2147         }
2148       }
2149       break;
2150
2151       case 'z':
2152       {
2153         int temp;
2154
2155         temp = atoi(optarg);
2156         if (temp > 0)
2157           config_write_jitter = temp;
2158         else
2159         {
2160           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2161           status = 2;
2162         }
2163
2164         break;
2165       }
2166
2167       case 'b':
2168       {
2169         size_t len;
2170
2171         if (config_base_dir != NULL)
2172           free (config_base_dir);
2173         config_base_dir = strdup (optarg);
2174         if (config_base_dir == NULL)
2175         {
2176           fprintf (stderr, "read_options: strdup failed.\n");
2177           return (3);
2178         }
2179
2180         len = strlen (config_base_dir);
2181         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2182         {
2183           config_base_dir[len - 1] = 0;
2184           len--;
2185         }
2186
2187         if (len < 1)
2188         {
2189           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2190           return (4);
2191         }
2192       }
2193       break;
2194
2195       case 'p':
2196       {
2197         if (config_pid_file != NULL)
2198           free (config_pid_file);
2199         config_pid_file = strdup (optarg);
2200         if (config_pid_file == NULL)
2201         {
2202           fprintf (stderr, "read_options: strdup failed.\n");
2203           return (3);
2204         }
2205       }
2206       break;
2207
2208       case 'F':
2209         config_flush_at_shutdown = 1;
2210         break;
2211
2212       case 'j':
2213       {
2214         struct stat statbuf;
2215         const char *dir = optarg;
2216
2217         status = stat(dir, &statbuf);
2218         if (status != 0)
2219         {
2220           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2221           return 6;
2222         }
2223
2224         if (!S_ISDIR(statbuf.st_mode)
2225             || access(dir, R_OK|W_OK|X_OK) != 0)
2226         {
2227           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2228                   errno ? rrd_strerror(errno) : "");
2229           return 6;
2230         }
2231
2232         journal_cur = malloc(PATH_MAX + 1);
2233         journal_old = malloc(PATH_MAX + 1);
2234         if (journal_cur == NULL || journal_old == NULL)
2235         {
2236           fprintf(stderr, "malloc failure for journal files\n");
2237           return 6;
2238         }
2239         else 
2240         {
2241           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2242           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2243         }
2244       }
2245       break;
2246
2247       case 'h':
2248       case '?':
2249         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2250             "\n"
2251             "Usage: rrdcached [options]\n"
2252             "\n"
2253             "Valid options are:\n"
2254             "  -l <address>  Socket address to listen to.\n"
2255             "  -L <address>  Socket address to listen to ('FLUSH' only).\n"
2256             "  -w <seconds>  Interval in which to write data.\n"
2257             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2258             "  -f <seconds>  Interval in which to flush dead data.\n"
2259             "  -p <file>     Location of the PID-file.\n"
2260             "  -b <dir>      Base directory to change to.\n"
2261             "  -g            Do not fork and run in the foreground.\n"
2262             "  -j <dir>      Directory in which to create the journal files.\n"
2263             "  -F            Always flush all updates at shutdown\n"
2264             "\n"
2265             "For more information and a detailed description of all options "
2266             "please refer\n"
2267             "to the rrdcached(1) manual page.\n",
2268             VERSION);
2269         status = -1;
2270         break;
2271     } /* switch (option) */
2272   } /* while (getopt) */
2273
2274   /* advise the user when values are not sane */
2275   if (config_flush_interval < 2 * config_write_interval)
2276     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2277             " 2x write interval (-w) !\n");
2278   if (config_write_jitter > config_write_interval)
2279     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2280             " write interval (-w) !\n");
2281
2282   if (journal_cur == NULL)
2283     config_flush_at_shutdown = 1;
2284
2285   return (status);
2286 } /* }}} int read_options */
2287
2288 int main (int argc, char **argv)
2289 {
2290   int status;
2291
2292   status = read_options (argc, argv);
2293   if (status != 0)
2294   {
2295     if (status < 0)
2296       status = 0;
2297     return (status);
2298   }
2299
2300   status = daemonize ();
2301   if (status == 1)
2302   {
2303     struct sigaction sigchld;
2304
2305     memset (&sigchld, 0, sizeof (sigchld));
2306     sigchld.sa_handler = SIG_IGN;
2307     sigaction (SIGCHLD, &sigchld, NULL);
2308
2309     return (0);
2310   }
2311   else if (status != 0)
2312   {
2313     fprintf (stderr, "daemonize failed, exiting.\n");
2314     return (1);
2315   }
2316
2317   if (journal_cur != NULL)
2318   {
2319     int had_journal = 0;
2320
2321     pthread_mutex_lock(&journal_lock);
2322
2323     RRDD_LOG(LOG_INFO, "checking for journal files");
2324
2325     had_journal += journal_replay(journal_old);
2326     had_journal += journal_replay(journal_cur);
2327
2328     if (had_journal)
2329       flush_old_values(-1);
2330
2331     pthread_mutex_unlock(&journal_lock);
2332     journal_rotate();
2333
2334     RRDD_LOG(LOG_INFO, "journal processing complete");
2335   }
2336
2337   /* start the queue thread */
2338   memset (&queue_thread, 0, sizeof (queue_thread));
2339   status = pthread_create (&queue_thread,
2340                            NULL, /* attr */
2341                            queue_thread_main,
2342                            NULL); /* args */
2343   if (status != 0)
2344   {
2345     RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2346     cleanup();
2347     return (1);
2348   }
2349
2350   listen_thread_main (NULL);
2351   cleanup ();
2352
2353   return (0);
2354 } /* int main */
2355
2356 /*
2357  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2358  */