When -B is specified, the daemon will only operate on files within the
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
67
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
76
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
91
92 #include <glib-2.0/glib.h>
93 /* }}} */
94
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
96
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
100
101 /*
102  * Types
103  */
104 typedef enum
105 {
106   PRIV_LOW,
107   PRIV_HIGH
108 } socket_privilege;
109
110 struct listen_socket_s
111 {
112   int fd;
113   char addr[PATH_MAX + 1];
114   int family;
115   socket_privilege privilege;
116 };
117 typedef struct listen_socket_s listen_socket_t;
118
119 struct cache_item_s;
120 typedef struct cache_item_s cache_item_t;
121 struct cache_item_s
122 {
123   char *file;
124   char **values;
125   int values_num;
126   time_t last_flush_time;
127 #define CI_FLAGS_IN_TREE  (1<<0)
128 #define CI_FLAGS_IN_QUEUE (1<<1)
129   int flags;
130   pthread_cond_t  flushed;
131   cache_item_t *next;
132 };
133
134 struct callback_flush_data_s
135 {
136   time_t now;
137   time_t abs_timeout;
138   char **keys;
139   size_t keys_num;
140 };
141 typedef struct callback_flush_data_s callback_flush_data_t;
142
143 enum queue_side_e
144 {
145   HEAD,
146   TAIL
147 };
148 typedef enum queue_side_e queue_side_t;
149
150 /* max length of socket command or response */
151 #define CMD_MAX 4096
152
153 /*
154  * Variables
155  */
156 static int stay_foreground = 0;
157
158 static listen_socket_t *listen_fds = NULL;
159 static size_t listen_fds_num = 0;
160
161 static int do_shutdown = 0;
162
163 static pthread_t queue_thread;
164
165 static pthread_t *connection_threads = NULL;
166 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
167 static int connection_threads_num = 0;
168
169 /* Cache stuff */
170 static GTree          *cache_tree = NULL;
171 static cache_item_t   *cache_queue_head = NULL;
172 static cache_item_t   *cache_queue_tail = NULL;
173 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
174 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
175
176 static int config_write_interval = 300;
177 static int config_write_jitter   = 0;
178 static int config_flush_interval = 3600;
179 static int config_flush_at_shutdown = 0;
180 static char *config_pid_file = NULL;
181 static char *config_base_dir = NULL;
182 static size_t _config_base_dir_len = 0;
183 static int config_write_base_only = 0;
184
185 static listen_socket_t **config_listen_address_list = NULL;
186 static int config_listen_address_list_len = 0;
187
188 static uint64_t stats_queue_length = 0;
189 static uint64_t stats_updates_received = 0;
190 static uint64_t stats_flush_received = 0;
191 static uint64_t stats_updates_written = 0;
192 static uint64_t stats_data_sets_written = 0;
193 static uint64_t stats_journal_bytes = 0;
194 static uint64_t stats_journal_rotate = 0;
195 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
196
197 /* Journaled updates */
198 static char *journal_cur = NULL;
199 static char *journal_old = NULL;
200 static FILE *journal_fh = NULL;
201 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
202 static int journal_write(char *cmd, char *args);
203 static void journal_done(void);
204 static void journal_rotate(void);
205
206 /* 
207  * Functions
208  */
209 static void sig_common (const char *sig) /* {{{ */
210 {
211   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
212   do_shutdown++;
213   pthread_cond_broadcast(&cache_cond);
214 } /* }}} void sig_common */
215
216 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
217 {
218   sig_common("INT");
219 } /* }}} void sig_int_handler */
220
221 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
222 {
223   sig_common("TERM");
224 } /* }}} void sig_term_handler */
225
226 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
227 {
228   config_flush_at_shutdown = 1;
229   sig_common("USR1");
230 } /* }}} void sig_usr1_handler */
231
232 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
233 {
234   config_flush_at_shutdown = 0;
235   sig_common("USR2");
236 } /* }}} void sig_usr2_handler */
237
238 static void install_signal_handlers(void) /* {{{ */
239 {
240   /* These structures are static, because `sigaction' behaves weird if the are
241    * overwritten.. */
242   static struct sigaction sa_int;
243   static struct sigaction sa_term;
244   static struct sigaction sa_pipe;
245   static struct sigaction sa_usr1;
246   static struct sigaction sa_usr2;
247
248   /* Install signal handlers */
249   memset (&sa_int, 0, sizeof (sa_int));
250   sa_int.sa_handler = sig_int_handler;
251   sigaction (SIGINT, &sa_int, NULL);
252
253   memset (&sa_term, 0, sizeof (sa_term));
254   sa_term.sa_handler = sig_term_handler;
255   sigaction (SIGTERM, &sa_term, NULL);
256
257   memset (&sa_pipe, 0, sizeof (sa_pipe));
258   sa_pipe.sa_handler = SIG_IGN;
259   sigaction (SIGPIPE, &sa_pipe, NULL);
260
261   memset (&sa_pipe, 0, sizeof (sa_usr1));
262   sa_usr1.sa_handler = sig_usr1_handler;
263   sigaction (SIGUSR1, &sa_usr1, NULL);
264
265   memset (&sa_usr2, 0, sizeof (sa_usr2));
266   sa_usr2.sa_handler = sig_usr2_handler;
267   sigaction (SIGUSR2, &sa_usr2, NULL);
268
269 } /* }}} void install_signal_handlers */
270
271 static int open_pidfile(void) /* {{{ */
272 {
273   int fd;
274   char *file;
275
276   file = (config_pid_file != NULL)
277     ? config_pid_file
278     : LOCALSTATEDIR "/run/rrdcached.pid";
279
280   fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
281   if (fd < 0)
282     fprintf(stderr, "FATAL: cannot create '%s' (%s)\n",
283             file, rrd_strerror(errno));
284
285   return(fd);
286 } /* }}} static int open_pidfile */
287
288 static int write_pidfile (int fd) /* {{{ */
289 {
290   pid_t pid;
291   FILE *fh;
292
293   pid = getpid ();
294
295   fh = fdopen (fd, "w");
296   if (fh == NULL)
297   {
298     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
299     close(fd);
300     return (-1);
301   }
302
303   fprintf (fh, "%i\n", (int) pid);
304   fclose (fh);
305
306   return (0);
307 } /* }}} int write_pidfile */
308
309 static int remove_pidfile (void) /* {{{ */
310 {
311   char *file;
312   int status;
313
314   file = (config_pid_file != NULL)
315     ? config_pid_file
316     : LOCALSTATEDIR "/run/rrdcached.pid";
317
318   status = unlink (file);
319   if (status == 0)
320     return (0);
321   return (errno);
322 } /* }}} int remove_pidfile */
323
324 static ssize_t sread (int fd, void *buffer_void, size_t buffer_size) /* {{{ */
325 {
326   char    *buffer;
327   size_t   buffer_used;
328   size_t   buffer_free;
329   ssize_t  status;
330
331   buffer       = (char *) buffer_void;
332   buffer_used  = 0;
333   buffer_free  = buffer_size;
334
335   while (buffer_free > 0)
336   {
337     status = read (fd, buffer + buffer_used, buffer_free);
338     if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
339       continue;
340
341     if (status < 0)
342       return (-1);
343
344     if (status == 0)
345       return (0);
346
347     assert ((0 > status) || (buffer_free >= (size_t) status));
348
349     buffer_free = buffer_free - status;
350     buffer_used = buffer_used + status;
351
352     if (buffer[buffer_used - 1] == '\n')
353       break;
354   }
355
356   assert (buffer_used > 0);
357
358   if (buffer[buffer_used - 1] != '\n')
359   {
360     errno = ENOBUFS;
361     return (-1);
362   }
363
364   buffer[buffer_used - 1] = 0;
365
366   /* Fix network line endings. */
367   if ((buffer_used > 1) && (buffer[buffer_used - 2] == '\r'))
368   {
369     buffer_used--;
370     buffer[buffer_used - 1] = 0;
371   }
372
373   return (buffer_used);
374 } /* }}} ssize_t sread */
375
376 static ssize_t swrite (int fd, const void *buf, size_t count) /* {{{ */
377 {
378   const char *ptr;
379   size_t      nleft;
380   ssize_t     status;
381
382   /* special case for journal replay */
383   if (fd < 0) return 0;
384
385   ptr   = (const char *) buf;
386   nleft = count;
387
388   while (nleft > 0)
389   {
390     status = write (fd, (const void *) ptr, nleft);
391
392     if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
393       continue;
394
395     if (status < 0)
396       return (status);
397
398     nleft -= status;
399     ptr   += status;
400   }
401
402   return (0);
403 } /* }}} ssize_t swrite */
404
405 static void _wipe_ci_values(cache_item_t *ci, time_t when)
406 {
407   ci->values = NULL;
408   ci->values_num = 0;
409
410   ci->last_flush_time = when;
411   if (config_write_jitter > 0)
412     ci->last_flush_time += (random() % config_write_jitter);
413
414   ci->flags &= ~(CI_FLAGS_IN_QUEUE);
415 }
416
417 /*
418  * enqueue_cache_item:
419  * `cache_lock' must be acquired before calling this function!
420  */
421 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
422     queue_side_t side)
423 {
424   int did_insert = 0;
425
426   if (ci == NULL)
427     return (-1);
428
429   if (ci->values_num == 0)
430     return (0);
431
432   if (side == HEAD)
433   {
434     if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
435     {
436       assert (ci->next == NULL);
437       ci->next = cache_queue_head;
438       cache_queue_head = ci;
439
440       if (cache_queue_tail == NULL)
441         cache_queue_tail = cache_queue_head;
442
443       did_insert = 1;
444     }
445     else if (cache_queue_head == ci)
446     {
447       /* do nothing */
448     }
449     else /* enqueued, but not first entry */
450     {
451       cache_item_t *prev;
452
453       /* find previous entry */
454       for (prev = cache_queue_head; prev != NULL; prev = prev->next)
455         if (prev->next == ci)
456           break;
457       assert (prev != NULL);
458
459       /* move to the front */
460       prev->next = ci->next;
461       ci->next = cache_queue_head;
462       cache_queue_head = ci;
463
464       /* check if we need to adapt the tail */
465       if (cache_queue_tail == ci)
466         cache_queue_tail = prev;
467     }
468   }
469   else /* (side == TAIL) */
470   {
471     /* We don't move values back in the list.. */
472     if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
473       return (0);
474
475     assert (ci->next == NULL);
476
477     if (cache_queue_tail == NULL)
478       cache_queue_head = ci;
479     else
480       cache_queue_tail->next = ci;
481     cache_queue_tail = ci;
482
483     did_insert = 1;
484   }
485
486   ci->flags |= CI_FLAGS_IN_QUEUE;
487
488   if (did_insert)
489   {
490     pthread_cond_broadcast(&cache_cond);
491     pthread_mutex_lock (&stats_lock);
492     stats_queue_length++;
493     pthread_mutex_unlock (&stats_lock);
494   }
495
496   return (0);
497 } /* }}} int enqueue_cache_item */
498
499 /*
500  * tree_callback_flush:
501  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
502  * while this is in progress.
503  */
504 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
505     gpointer data)
506 {
507   cache_item_t *ci;
508   callback_flush_data_t *cfd;
509
510   ci = (cache_item_t *) value;
511   cfd = (callback_flush_data_t *) data;
512
513   if ((ci->last_flush_time <= cfd->abs_timeout)
514       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
515       && (ci->values_num > 0))
516   {
517     enqueue_cache_item (ci, TAIL);
518   }
519   else if ((do_shutdown != 0)
520       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
521       && (ci->values_num > 0))
522   {
523     enqueue_cache_item (ci, TAIL);
524   }
525   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
526       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
527       && (ci->values_num <= 0))
528   {
529     char **temp;
530
531     temp = (char **) realloc (cfd->keys,
532         sizeof (char *) * (cfd->keys_num + 1));
533     if (temp == NULL)
534     {
535       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
536       return (FALSE);
537     }
538     cfd->keys = temp;
539     /* Make really sure this points to the _same_ place */
540     assert ((char *) key == ci->file);
541     cfd->keys[cfd->keys_num] = (char *) key;
542     cfd->keys_num++;
543   }
544
545   return (FALSE);
546 } /* }}} gboolean tree_callback_flush */
547
548 static int flush_old_values (int max_age)
549 {
550   callback_flush_data_t cfd;
551   size_t k;
552
553   memset (&cfd, 0, sizeof (cfd));
554   /* Pass the current time as user data so that we don't need to call
555    * `time' for each node. */
556   cfd.now = time (NULL);
557   cfd.keys = NULL;
558   cfd.keys_num = 0;
559
560   if (max_age > 0)
561     cfd.abs_timeout = cfd.now - max_age;
562   else
563     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
564
565   /* `tree_callback_flush' will return the keys of all values that haven't
566    * been touched in the last `config_flush_interval' seconds in `cfd'.
567    * The char*'s in this array point to the same memory as ci->file, so we
568    * don't need to free them separately. */
569   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
570
571   for (k = 0; k < cfd.keys_num; k++)
572   {
573     cache_item_t *ci;
574
575     /* This must not fail. */
576     ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
577     assert (ci != NULL);
578
579     /* If we end up here with values available, something's seriously
580      * messed up. */
581     assert (ci->values_num == 0);
582
583     /* Remove the node from the tree */
584     g_tree_remove (cache_tree, cfd.keys[k]);
585     cfd.keys[k] = NULL;
586
587     /* Now free and clean up `ci'. */
588     free (ci->file);
589     ci->file = NULL;
590     free (ci);
591     ci = NULL;
592   } /* for (k = 0; k < cfd.keys_num; k++) */
593
594   if (cfd.keys != NULL)
595   {
596     free (cfd.keys);
597     cfd.keys = NULL;
598   }
599
600   return (0);
601 } /* int flush_old_values */
602
603 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
604 {
605   struct timeval now;
606   struct timespec next_flush;
607   int final_flush = 0; /* make sure we only flush once on shutdown */
608
609   gettimeofday (&now, NULL);
610   next_flush.tv_sec = now.tv_sec + config_flush_interval;
611   next_flush.tv_nsec = 1000 * now.tv_usec;
612
613   pthread_mutex_lock (&cache_lock);
614   while ((do_shutdown == 0) || (cache_queue_head != NULL))
615   {
616     cache_item_t *ci;
617     char *file;
618     char **values;
619     int values_num;
620     int status;
621     int i;
622
623     /* First, check if it's time to do the cache flush. */
624     gettimeofday (&now, NULL);
625     if ((now.tv_sec > next_flush.tv_sec)
626         || ((now.tv_sec == next_flush.tv_sec)
627           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
628     {
629       /* Flush all values that haven't been written in the last
630        * `config_write_interval' seconds. */
631       flush_old_values (config_write_interval);
632
633       /* Determine the time of the next cache flush. */
634       while (next_flush.tv_sec <= now.tv_sec)
635         next_flush.tv_sec += config_flush_interval;
636
637       /* unlock the cache while we rotate so we don't block incoming
638        * updates if the fsync() blocks on disk I/O */
639       pthread_mutex_unlock(&cache_lock);
640       journal_rotate();
641       pthread_mutex_lock(&cache_lock);
642     }
643
644     /* Now, check if there's something to store away. If not, wait until
645      * something comes in or it's time to do the cache flush.  if we are
646      * shutting down, do not wait around.  */
647     if (cache_queue_head == NULL && !do_shutdown)
648     {
649       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
650       if ((status != 0) && (status != ETIMEDOUT))
651       {
652         RRDD_LOG (LOG_ERR, "queue_thread_main: "
653             "pthread_cond_timedwait returned %i.", status);
654       }
655     }
656
657     /* We're about to shut down */
658     if (do_shutdown != 0 && !final_flush++)
659     {
660       if (config_flush_at_shutdown)
661         flush_old_values (-1); /* flush everything */
662       else
663         break;
664     }
665
666     /* Check if a value has arrived. This may be NULL if we timed out or there
667      * was an interrupt such as a signal. */
668     if (cache_queue_head == NULL)
669       continue;
670
671     ci = cache_queue_head;
672
673     /* copy the relevant parts */
674     file = strdup (ci->file);
675     if (file == NULL)
676     {
677       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
678       continue;
679     }
680
681     assert(ci->values != NULL);
682     assert(ci->values_num > 0);
683
684     values = ci->values;
685     values_num = ci->values_num;
686
687     _wipe_ci_values(ci, time(NULL));
688
689     cache_queue_head = ci->next;
690     if (cache_queue_head == NULL)
691       cache_queue_tail = NULL;
692     ci->next = NULL;
693
694     pthread_mutex_lock (&stats_lock);
695     assert (stats_queue_length > 0);
696     stats_queue_length--;
697     pthread_mutex_unlock (&stats_lock);
698
699     pthread_mutex_unlock (&cache_lock);
700
701     rrd_clear_error ();
702     status = rrd_update_r (file, NULL, values_num, (void *) values);
703     if (status != 0)
704     {
705       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
706           "rrd_update_r (%s) failed with status %i. (%s)",
707           file, status, rrd_get_error());
708     }
709
710     journal_write("wrote", file);
711     pthread_cond_broadcast(&ci->flushed);
712
713     for (i = 0; i < values_num; i++)
714       free (values[i]);
715
716     free(values);
717     free(file);
718
719     if (status == 0)
720     {
721       pthread_mutex_lock (&stats_lock);
722       stats_updates_written++;
723       stats_data_sets_written += values_num;
724       pthread_mutex_unlock (&stats_lock);
725     }
726
727     pthread_mutex_lock (&cache_lock);
728
729     /* We're about to shut down */
730     if (do_shutdown != 0 && !final_flush++)
731     {
732       if (config_flush_at_shutdown)
733           flush_old_values (-1); /* flush everything */
734       else
735         break;
736     }
737   } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
738   pthread_mutex_unlock (&cache_lock);
739
740   if (config_flush_at_shutdown)
741   {
742     assert(cache_queue_head == NULL);
743     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
744   }
745
746   journal_done();
747
748   return (NULL);
749 } /* }}} void *queue_thread_main */
750
751 static int buffer_get_field (char **buffer_ret, /* {{{ */
752     size_t *buffer_size_ret, char **field_ret)
753 {
754   char *buffer;
755   size_t buffer_pos;
756   size_t buffer_size;
757   char *field;
758   size_t field_size;
759   int status;
760
761   buffer = *buffer_ret;
762   buffer_pos = 0;
763   buffer_size = *buffer_size_ret;
764   field = *buffer_ret;
765   field_size = 0;
766
767   if (buffer_size <= 0)
768     return (-1);
769
770   /* This is ensured by `handle_request'. */
771   assert (buffer[buffer_size - 1] == '\0');
772
773   status = -1;
774   while (buffer_pos < buffer_size)
775   {
776     /* Check for end-of-field or end-of-buffer */
777     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
778     {
779       field[field_size] = 0;
780       field_size++;
781       buffer_pos++;
782       status = 0;
783       break;
784     }
785     /* Handle escaped characters. */
786     else if (buffer[buffer_pos] == '\\')
787     {
788       if (buffer_pos >= (buffer_size - 1))
789         break;
790       buffer_pos++;
791       field[field_size] = buffer[buffer_pos];
792       field_size++;
793       buffer_pos++;
794     }
795     /* Normal operation */ 
796     else
797     {
798       field[field_size] = buffer[buffer_pos];
799       field_size++;
800       buffer_pos++;
801     }
802   } /* while (buffer_pos < buffer_size) */
803
804   if (status != 0)
805     return (status);
806
807   *buffer_ret = buffer + buffer_pos;
808   *buffer_size_ret = buffer_size - buffer_pos;
809   *field_ret = field;
810
811   return (0);
812 } /* }}} int buffer_get_field */
813
814 /* if we're restricting writes to the base directory,
815  * check whether the file falls within the dir
816  * returns 1 if OK, otherwise 0
817  */
818 static int check_file_access (const char *file, int fd) /* {{{ */
819 {
820   char error[CMD_MAX];
821   assert(file != NULL);
822
823   if (!config_write_base_only
824       || fd < 0 /* journal replay */
825       || config_base_dir == NULL)
826     return 1;
827
828   if (strstr(file, "../") != NULL) goto err;
829
830   /* relative paths without "../" are ok */
831   if (*file != '/') return 1;
832
833   /* file must be of the format base + "/" + <1+ char filename> */
834   if (strlen(file) < _config_base_dir_len + 2) goto err;
835   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
836   if (*(file + _config_base_dir_len) != '/') goto err;
837
838   return 1;
839
840 err:
841   snprintf(error, sizeof(error)-1, "-1 %s\n", rrd_strerror(EACCES));
842   swrite(fd, error, strlen(error));
843   return 0;
844 } /* }}} static int check_file_access */
845
846 static int flush_file (const char *filename) /* {{{ */
847 {
848   cache_item_t *ci;
849
850   pthread_mutex_lock (&cache_lock);
851
852   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
853   if (ci == NULL)
854   {
855     pthread_mutex_unlock (&cache_lock);
856     return (ENOENT);
857   }
858
859   if (ci->values_num > 0)
860   {
861     /* Enqueue at head */
862     enqueue_cache_item (ci, HEAD);
863     pthread_cond_wait(&ci->flushed, &cache_lock);
864   }
865
866   pthread_mutex_unlock(&cache_lock);
867
868   return (0);
869 } /* }}} int flush_file */
870
871 static int handle_request_help (int fd, /* {{{ */
872     char *buffer, size_t buffer_size)
873 {
874   int status;
875   char **help_text;
876   size_t help_text_len;
877   char *command;
878   size_t i;
879
880   char *help_help[] =
881   {
882     "5 Command overview\n",
883     "FLUSH <filename>\n",
884     "FLUSHALL\n",
885     "HELP [<command>]\n",
886     "UPDATE <filename> <values> [<values> ...]\n",
887     "STATS\n"
888   };
889   size_t help_help_len = sizeof (help_help) / sizeof (help_help[0]);
890
891   char *help_flush[] =
892   {
893     "4 Help for FLUSH\n",
894     "Usage: FLUSH <filename>\n",
895     "\n",
896     "Adds the given filename to the head of the update queue and returns\n",
897     "after is has been dequeued.\n"
898   };
899   size_t help_flush_len = sizeof (help_flush) / sizeof (help_flush[0]);
900
901   char *help_flushall[] =
902   {
903     "3 Help for FLUSHALL\n",
904     "Usage: FLUSHALL\n",
905     "\n",
906     "Triggers writing of all pending updates.  Returns immediately.\n"
907   };
908   size_t help_flushall_len = sizeof(help_flushall) / sizeof(help_flushall[0]);
909
910   char *help_update[] =
911   {
912     "9 Help for UPDATE\n",
913     "Usage: UPDATE <filename> <values> [<values> ...]\n"
914     "\n",
915     "Adds the given file to the internal cache if it is not yet known and\n",
916     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n",
917     "for details.\n",
918     "\n",
919     "Each <values> has the following form:\n",
920     "  <values> = <time>:<value>[:<value>[...]]\n",
921     "See the rrdupdate(1) manpage for details.\n"
922   };
923   size_t help_update_len = sizeof (help_update) / sizeof (help_update[0]);
924
925   char *help_stats[] =
926   {
927     "4 Help for STATS\n",
928     "Usage: STATS\n",
929     "\n",
930     "Returns some performance counters, see the rrdcached(1) manpage for\n",
931     "a description of the values.\n"
932   };
933   size_t help_stats_len = sizeof (help_stats) / sizeof (help_stats[0]);
934
935   status = buffer_get_field (&buffer, &buffer_size, &command);
936   if (status != 0)
937   {
938     help_text = help_help;
939     help_text_len = help_help_len;
940   }
941   else
942   {
943     if (strcasecmp (command, "update") == 0)
944     {
945       help_text = help_update;
946       help_text_len = help_update_len;
947     }
948     else if (strcasecmp (command, "flush") == 0)
949     {
950       help_text = help_flush;
951       help_text_len = help_flush_len;
952     }
953     else if (strcasecmp (command, "flushall") == 0)
954     {
955       help_text = help_flushall;
956       help_text_len = help_flushall_len;
957     }
958     else if (strcasecmp (command, "stats") == 0)
959     {
960       help_text = help_stats;
961       help_text_len = help_stats_len;
962     }
963     else
964     {
965       help_text = help_help;
966       help_text_len = help_help_len;
967     }
968   }
969
970   for (i = 0; i < help_text_len; i++)
971   {
972     status = swrite (fd, help_text[i], strlen (help_text[i]));
973     if (status < 0)
974     {
975       status = errno;
976       RRDD_LOG (LOG_ERR, "handle_request_help: swrite returned an error.");
977       return (status);
978     }
979   }
980
981   return (0);
982 } /* }}} int handle_request_help */
983
984 static int handle_request_stats (int fd, /* {{{ */
985     char *buffer __attribute__((unused)),
986     size_t buffer_size __attribute__((unused)))
987 {
988   int status;
989   char outbuf[CMD_MAX];
990
991   uint64_t copy_queue_length;
992   uint64_t copy_updates_received;
993   uint64_t copy_flush_received;
994   uint64_t copy_updates_written;
995   uint64_t copy_data_sets_written;
996   uint64_t copy_journal_bytes;
997   uint64_t copy_journal_rotate;
998
999   uint64_t tree_nodes_number;
1000   uint64_t tree_depth;
1001
1002   pthread_mutex_lock (&stats_lock);
1003   copy_queue_length       = stats_queue_length;
1004   copy_updates_received   = stats_updates_received;
1005   copy_flush_received     = stats_flush_received;
1006   copy_updates_written    = stats_updates_written;
1007   copy_data_sets_written  = stats_data_sets_written;
1008   copy_journal_bytes      = stats_journal_bytes;
1009   copy_journal_rotate     = stats_journal_rotate;
1010   pthread_mutex_unlock (&stats_lock);
1011
1012   pthread_mutex_lock (&cache_lock);
1013   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1014   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1015   pthread_mutex_unlock (&cache_lock);
1016
1017 #define RRDD_STATS_SEND \
1018   outbuf[sizeof (outbuf) - 1] = 0; \
1019   status = swrite (fd, outbuf, strlen (outbuf)); \
1020   if (status < 0) \
1021   { \
1022     status = errno; \
1023     RRDD_LOG (LOG_INFO, "handle_request_stats: swrite returned an error."); \
1024     return (status); \
1025   }
1026
1027   strncpy (outbuf, "9 Statistics follow\n", sizeof (outbuf));
1028   RRDD_STATS_SEND;
1029
1030   snprintf (outbuf, sizeof (outbuf),
1031       "QueueLength: %"PRIu64"\n", copy_queue_length);
1032   RRDD_STATS_SEND;
1033
1034   snprintf (outbuf, sizeof (outbuf),
1035       "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1036   RRDD_STATS_SEND;
1037
1038   snprintf (outbuf, sizeof (outbuf),
1039       "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1040   RRDD_STATS_SEND;
1041
1042   snprintf (outbuf, sizeof (outbuf),
1043       "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1044   RRDD_STATS_SEND;
1045
1046   snprintf (outbuf, sizeof (outbuf),
1047       "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1048   RRDD_STATS_SEND;
1049
1050   snprintf (outbuf, sizeof (outbuf),
1051       "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1052   RRDD_STATS_SEND;
1053
1054   snprintf (outbuf, sizeof (outbuf),
1055       "TreeDepth: %"PRIu64"\n", tree_depth);
1056   RRDD_STATS_SEND;
1057
1058   snprintf (outbuf, sizeof(outbuf),
1059       "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1060   RRDD_STATS_SEND;
1061
1062   snprintf (outbuf, sizeof(outbuf),
1063       "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1064   RRDD_STATS_SEND;
1065
1066   return (0);
1067 #undef RRDD_STATS_SEND
1068 } /* }}} int handle_request_stats */
1069
1070 static int handle_request_flush (int fd, /* {{{ */
1071     char *buffer, size_t buffer_size)
1072 {
1073   char *file;
1074   int status;
1075   char result[CMD_MAX];
1076
1077   status = buffer_get_field (&buffer, &buffer_size, &file);
1078   if (status != 0)
1079   {
1080     strncpy (result, "-1 Usage: flush <filename>\n", sizeof (result));
1081   }
1082   else
1083   {
1084     pthread_mutex_lock(&stats_lock);
1085     stats_flush_received++;
1086     pthread_mutex_unlock(&stats_lock);
1087
1088     if (!check_file_access(file, fd)) return 0;
1089
1090     status = flush_file (file);
1091     if (status == 0)
1092       snprintf (result, sizeof (result), "0 Successfully flushed %s.\n", file);
1093     else if (status == ENOENT)
1094     {
1095       /* no file in our tree; see whether it exists at all */
1096       struct stat statbuf;
1097
1098       memset(&statbuf, 0, sizeof(statbuf));
1099       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1100         snprintf (result, sizeof (result), "0 Nothing to flush: %s.\n", file);
1101       else
1102         snprintf (result, sizeof (result), "-1 No such file: %s.\n", file);
1103     }
1104     else if (status < 0)
1105       strncpy (result, "-1 Internal error.\n", sizeof (result));
1106     else
1107       snprintf (result, sizeof (result), "-1 Failed with status %i.\n", status);
1108   }
1109   result[sizeof (result) - 1] = 0;
1110
1111   status = swrite (fd, result, strlen (result));
1112   if (status < 0)
1113   {
1114     status = errno;
1115     RRDD_LOG (LOG_INFO, "handle_request_flush: swrite returned an error.");
1116     return (status);
1117   }
1118
1119   return (0);
1120 } /* }}} int handle_request_flush */
1121
1122 static int handle_request_flushall(int fd) /* {{{ */
1123 {
1124   int status;
1125   char answer[] ="0 Started flush.\n";
1126
1127   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1128
1129   pthread_mutex_lock(&cache_lock);
1130   flush_old_values(-1);
1131   pthread_mutex_unlock(&cache_lock);
1132
1133   status = swrite(fd, answer, strlen(answer));
1134   if (status < 0)
1135   {
1136     status = errno;
1137     RRDD_LOG(LOG_INFO, "handle_request_flushall: swrite returned an error.");
1138   }
1139
1140   return (status);
1141 } /* }}} static int handle_request_flushall */
1142
1143 static int handle_request_update (int fd, /* {{{ */
1144     char *buffer, size_t buffer_size)
1145 {
1146   char *file;
1147   int values_num = 0;
1148   int status;
1149
1150   time_t now;
1151
1152   cache_item_t *ci;
1153   char answer[CMD_MAX];
1154
1155 #define RRDD_UPDATE_SEND \
1156   answer[sizeof (answer) - 1] = 0; \
1157   status = swrite (fd, answer, strlen (answer)); \
1158   if (status < 0) \
1159   { \
1160     status = errno; \
1161     RRDD_LOG (LOG_INFO, "handle_request_update: swrite returned an error."); \
1162     return (status); \
1163   }
1164
1165   now = time (NULL);
1166
1167   status = buffer_get_field (&buffer, &buffer_size, &file);
1168   if (status != 0)
1169   {
1170     strncpy (answer, "-1 Usage: UPDATE <filename> <values> [<values> ...]\n",
1171         sizeof (answer));
1172     RRDD_UPDATE_SEND;
1173     return (0);
1174   }
1175
1176   pthread_mutex_lock(&stats_lock);
1177   stats_updates_received++;
1178   pthread_mutex_unlock(&stats_lock);
1179
1180   if (!check_file_access(file, fd)) return 0;
1181
1182   pthread_mutex_lock (&cache_lock);
1183   ci = g_tree_lookup (cache_tree, file);
1184
1185   if (ci == NULL) /* {{{ */
1186   {
1187     struct stat statbuf;
1188
1189     /* don't hold the lock while we setup; stat(2) might block */
1190     pthread_mutex_unlock(&cache_lock);
1191
1192     memset (&statbuf, 0, sizeof (statbuf));
1193     status = stat (file, &statbuf);
1194     if (status != 0)
1195     {
1196       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1197
1198       status = errno;
1199       if (status == ENOENT)
1200         snprintf (answer, sizeof (answer), "-1 No such file: %s\n", file);
1201       else
1202         snprintf (answer, sizeof (answer), "-1 stat failed with error %i.\n",
1203             status);
1204       RRDD_UPDATE_SEND;
1205       return (0);
1206     }
1207     if (!S_ISREG (statbuf.st_mode))
1208     {
1209       snprintf (answer, sizeof (answer), "-1 Not a regular file: %s\n", file);
1210       RRDD_UPDATE_SEND;
1211       return (0);
1212     }
1213     if (access(file, R_OK|W_OK) != 0)
1214     {
1215       snprintf (answer, sizeof (answer), "-1 Cannot read/write %s: %s\n",
1216                 file, rrd_strerror(errno));
1217       RRDD_UPDATE_SEND;
1218       return (0);
1219     }
1220
1221     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1222     if (ci == NULL)
1223     {
1224       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1225
1226       strncpy (answer, "-1 malloc failed.\n", sizeof (answer));
1227       RRDD_UPDATE_SEND;
1228       return (0);
1229     }
1230     memset (ci, 0, sizeof (cache_item_t));
1231
1232     ci->file = strdup (file);
1233     if (ci->file == NULL)
1234     {
1235       free (ci);
1236       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1237
1238       strncpy (answer, "-1 strdup failed.\n", sizeof (answer));
1239       RRDD_UPDATE_SEND;
1240       return (0);
1241     }
1242
1243     _wipe_ci_values(ci, now);
1244     ci->flags = CI_FLAGS_IN_TREE;
1245
1246     pthread_mutex_lock(&cache_lock);
1247     g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1248   } /* }}} */
1249   assert (ci != NULL);
1250
1251   while (buffer_size > 0)
1252   {
1253     char **temp;
1254     char *value;
1255
1256     status = buffer_get_field (&buffer, &buffer_size, &value);
1257     if (status != 0)
1258     {
1259       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1260       break;
1261     }
1262
1263     temp = (char **) realloc (ci->values,
1264         sizeof (char *) * (ci->values_num + 1));
1265     if (temp == NULL)
1266     {
1267       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1268       continue;
1269     }
1270     ci->values = temp;
1271
1272     ci->values[ci->values_num] = strdup (value);
1273     if (ci->values[ci->values_num] == NULL)
1274     {
1275       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1276       continue;
1277     }
1278     ci->values_num++;
1279
1280     values_num++;
1281   }
1282
1283   if (((now - ci->last_flush_time) >= config_write_interval)
1284       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1285       && (ci->values_num > 0))
1286   {
1287     enqueue_cache_item (ci, TAIL);
1288   }
1289
1290   pthread_mutex_unlock (&cache_lock);
1291
1292   if (values_num < 1)
1293   {
1294     strncpy (answer, "-1 No values updated.\n", sizeof (answer));
1295   }
1296   else
1297   {
1298     snprintf (answer, sizeof (answer), "0 Enqueued %i value%s\n", values_num,
1299         (values_num == 1) ? "" : "s");
1300   }
1301   RRDD_UPDATE_SEND;
1302   return (0);
1303 #undef RRDD_UPDATE_SEND
1304 } /* }}} int handle_request_update */
1305
1306 /* we came across a "WROTE" entry during journal replay.
1307  * throw away any values that we have accumulated for this file
1308  */
1309 static int handle_request_wrote (int fd __attribute__((unused)), /* {{{ */
1310                                  const char *buffer,
1311                                  size_t buffer_size __attribute__((unused)))
1312 {
1313   int i;
1314   cache_item_t *ci;
1315   const char *file = buffer;
1316
1317   pthread_mutex_lock(&cache_lock);
1318
1319   ci = g_tree_lookup(cache_tree, file);
1320   if (ci == NULL)
1321   {
1322     pthread_mutex_unlock(&cache_lock);
1323     return (0);
1324   }
1325
1326   if (ci->values)
1327   {
1328     for (i=0; i < ci->values_num; i++)
1329       free(ci->values[i]);
1330
1331     free(ci->values);
1332   }
1333
1334   _wipe_ci_values(ci, time(NULL));
1335
1336   pthread_mutex_unlock(&cache_lock);
1337   return (0);
1338 } /* }}} int handle_request_wrote */
1339
1340 /* returns 1 if we have the required privilege level */
1341 static int has_privilege (socket_privilege priv, /* {{{ */
1342                           socket_privilege required, int fd)
1343 {
1344   int status;
1345   char error[CMD_MAX];
1346
1347   if (priv >= required)
1348     return 1;
1349
1350   sprintf(error, "-1 %s\n", rrd_strerror(EACCES));
1351   status = swrite(fd, error, strlen(error));
1352
1353   if (status < 0)
1354     return status;
1355   else
1356     return 0;
1357 } /* }}} static int has_privilege */
1358
1359 /* if fd < 0, we are in journal replay mode */
1360 static int handle_request (int fd, socket_privilege privilege, /* {{{ */
1361                            char *buffer, size_t buffer_size)
1362 {
1363   char *buffer_ptr;
1364   char *command;
1365   int status;
1366
1367   assert (buffer[buffer_size - 1] == '\0');
1368
1369   buffer_ptr = buffer;
1370   command = NULL;
1371   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1372   if (status != 0)
1373   {
1374     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1375     return (-1);
1376   }
1377
1378   if (strcasecmp (command, "update") == 0)
1379   {
1380     status = has_privilege(privilege, PRIV_HIGH, fd);
1381     if (status <= 0)
1382       return status;
1383
1384     /* don't re-write updates in replay mode */
1385     if (fd >= 0)
1386       journal_write(command, buffer_ptr);
1387
1388     return (handle_request_update (fd, buffer_ptr, buffer_size));
1389   }
1390   else if (strcasecmp (command, "wrote") == 0 && fd < 0)
1391   {
1392     /* this is only valid in replay mode */
1393     return (handle_request_wrote (fd, buffer_ptr, buffer_size));
1394   }
1395   else if (strcasecmp (command, "flush") == 0)
1396   {
1397     return (handle_request_flush (fd, buffer_ptr, buffer_size));
1398   }
1399   else if (strcasecmp (command, "flushall") == 0)
1400   {
1401     status = has_privilege(privilege, PRIV_HIGH, fd);
1402     if (status <= 0)
1403       return status;
1404
1405     return (handle_request_flushall(fd));
1406   }
1407   else if (strcasecmp (command, "stats") == 0)
1408   {
1409     return (handle_request_stats (fd, buffer_ptr, buffer_size));
1410   }
1411   else if (strcasecmp (command, "help") == 0)
1412   {
1413     return (handle_request_help (fd, buffer_ptr, buffer_size));
1414   }
1415   else
1416   {
1417     char result[CMD_MAX];
1418
1419     snprintf (result, sizeof (result), "-1 Unknown command: %s\n", command);
1420     result[sizeof (result) - 1] = 0;
1421
1422     status = swrite (fd, result, strlen (result));
1423     if (status < 0)
1424     {
1425       RRDD_LOG (LOG_ERR, "handle_request: swrite failed.");
1426       return (-1);
1427     }
1428   }
1429
1430   return (0);
1431 } /* }}} int handle_request */
1432
1433 /* MUST NOT hold journal_lock before calling this */
1434 static void journal_rotate(void) /* {{{ */
1435 {
1436   FILE *old_fh = NULL;
1437
1438   if (journal_cur == NULL || journal_old == NULL)
1439     return;
1440
1441   pthread_mutex_lock(&journal_lock);
1442
1443   /* we rotate this way (rename before close) so that the we can release
1444    * the journal lock as fast as possible.  Journal writes to the new
1445    * journal can proceed immediately after the new file is opened.  The
1446    * fclose can then block without affecting new updates.
1447    */
1448   if (journal_fh != NULL)
1449   {
1450     old_fh = journal_fh;
1451     rename(journal_cur, journal_old);
1452     ++stats_journal_rotate;
1453   }
1454
1455   journal_fh = fopen(journal_cur, "a");
1456   pthread_mutex_unlock(&journal_lock);
1457
1458   if (old_fh != NULL)
1459     fclose(old_fh);
1460
1461   if (journal_fh == NULL)
1462   {
1463     RRDD_LOG(LOG_CRIT,
1464              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1465              journal_cur, rrd_strerror(errno));
1466
1467     RRDD_LOG(LOG_ERR,
1468              "JOURNALING DISABLED: All values will be flushed at shutdown");
1469     config_flush_at_shutdown = 1;
1470   }
1471
1472 } /* }}} static void journal_rotate */
1473
1474 static void journal_done(void) /* {{{ */
1475 {
1476   if (journal_cur == NULL)
1477     return;
1478
1479   pthread_mutex_lock(&journal_lock);
1480   if (journal_fh != NULL)
1481   {
1482     fclose(journal_fh);
1483     journal_fh = NULL;
1484   }
1485
1486   if (config_flush_at_shutdown)
1487   {
1488     RRDD_LOG(LOG_INFO, "removing journals");
1489     unlink(journal_old);
1490     unlink(journal_cur);
1491   }
1492   else
1493   {
1494     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1495              "journals will be used at next startup");
1496   }
1497
1498   pthread_mutex_unlock(&journal_lock);
1499
1500 } /* }}} static void journal_done */
1501
1502 static int journal_write(char *cmd, char *args) /* {{{ */
1503 {
1504   int chars;
1505
1506   if (journal_fh == NULL)
1507     return 0;
1508
1509   pthread_mutex_lock(&journal_lock);
1510   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1511   pthread_mutex_unlock(&journal_lock);
1512
1513   if (chars > 0)
1514   {
1515     pthread_mutex_lock(&stats_lock);
1516     stats_journal_bytes += chars;
1517     pthread_mutex_unlock(&stats_lock);
1518   }
1519
1520   return chars;
1521 } /* }}} static int journal_write */
1522
1523 static int journal_replay (const char *file) /* {{{ */
1524 {
1525   FILE *fh;
1526   int entry_cnt = 0;
1527   int fail_cnt = 0;
1528   uint64_t line = 0;
1529   char entry[CMD_MAX];
1530
1531   if (file == NULL) return 0;
1532
1533   fh = fopen(file, "r");
1534   if (fh == NULL)
1535   {
1536     if (errno != ENOENT)
1537       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1538                file, rrd_strerror(errno));
1539     return 0;
1540   }
1541   else
1542     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1543
1544   while(!feof(fh))
1545   {
1546     size_t entry_len;
1547
1548     ++line;
1549     if (fgets(entry, sizeof(entry), fh) == NULL)
1550       break;
1551     entry_len = strlen(entry);
1552
1553     /* check \n termination in case journal writing crashed mid-line */
1554     if (entry_len == 0)
1555       continue;
1556     else if (entry[entry_len - 1] != '\n')
1557     {
1558       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1559       ++fail_cnt;
1560       continue;
1561     }
1562
1563     entry[entry_len - 1] = '\0';
1564
1565     if (handle_request(-1, PRIV_HIGH, entry, entry_len) == 0)
1566       ++entry_cnt;
1567     else
1568       ++fail_cnt;
1569   }
1570
1571   fclose(fh);
1572
1573   if (entry_cnt > 0)
1574   {
1575     RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1576              entry_cnt, fail_cnt);
1577     return 1;
1578   }
1579   else
1580     return 0;
1581
1582 } /* }}} static int journal_replay */
1583
1584 static void *connection_thread_main (void *args) /* {{{ */
1585 {
1586   pthread_t self;
1587   listen_socket_t *sock;
1588   int i;
1589   int fd;
1590
1591   sock = (listen_socket_t *) args;
1592   fd = sock->fd;
1593
1594   pthread_mutex_lock (&connection_threads_lock);
1595   {
1596     pthread_t *temp;
1597
1598     temp = (pthread_t *) realloc (connection_threads,
1599         sizeof (pthread_t) * (connection_threads_num + 1));
1600     if (temp == NULL)
1601     {
1602       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1603     }
1604     else
1605     {
1606       connection_threads = temp;
1607       connection_threads[connection_threads_num] = pthread_self ();
1608       connection_threads_num++;
1609     }
1610   }
1611   pthread_mutex_unlock (&connection_threads_lock);
1612
1613   while (do_shutdown == 0)
1614   {
1615     char buffer[CMD_MAX];
1616
1617     struct pollfd pollfd;
1618     int status;
1619
1620     pollfd.fd = fd;
1621     pollfd.events = POLLIN | POLLPRI;
1622     pollfd.revents = 0;
1623
1624     status = poll (&pollfd, 1, /* timeout = */ 500);
1625     if (do_shutdown)
1626       break;
1627     else if (status == 0) /* timeout */
1628       continue;
1629     else if (status < 0) /* error */
1630     {
1631       status = errno;
1632       if (status == EINTR)
1633         continue;
1634       RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1635       continue;
1636     }
1637
1638     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1639     {
1640       close (fd);
1641       break;
1642     }
1643     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1644     {
1645       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1646           "poll(2) returned something unexpected: %#04hx",
1647           pollfd.revents);
1648       close (fd);
1649       break;
1650     }
1651
1652     status = (int) sread (fd, buffer, sizeof (buffer));
1653     if (status <= 0)
1654     {
1655       close (fd);
1656
1657       if (status < 0)
1658         RRDD_LOG(LOG_ERR, "connection_thread_main: sread failed.");
1659
1660       break;
1661     }
1662
1663     status = handle_request (fd, sock->privilege, buffer, status);
1664     if (status != 0)
1665       break;
1666   }
1667
1668   close(fd);
1669   free(args);
1670
1671   self = pthread_self ();
1672   /* Remove this thread from the connection threads list */
1673   pthread_mutex_lock (&connection_threads_lock);
1674   /* Find out own index in the array */
1675   for (i = 0; i < connection_threads_num; i++)
1676     if (pthread_equal (connection_threads[i], self) != 0)
1677       break;
1678   assert (i < connection_threads_num);
1679
1680   /* Move the trailing threads forward. */
1681   if (i < (connection_threads_num - 1))
1682   {
1683     memmove (connection_threads + i,
1684         connection_threads + i + 1,
1685         sizeof (pthread_t) * (connection_threads_num - i - 1));
1686   }
1687
1688   connection_threads_num--;
1689   pthread_mutex_unlock (&connection_threads_lock);
1690
1691   return (NULL);
1692 } /* }}} void *connection_thread_main */
1693
1694 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
1695 {
1696   int fd;
1697   struct sockaddr_un sa;
1698   listen_socket_t *temp;
1699   int status;
1700   const char *path;
1701
1702   path = sock->addr;
1703   if (strncmp(path, "unix:", strlen("unix:")) == 0)
1704     path += strlen("unix:");
1705
1706   temp = (listen_socket_t *) realloc (listen_fds,
1707       sizeof (listen_fds[0]) * (listen_fds_num + 1));
1708   if (temp == NULL)
1709   {
1710     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1711     return (-1);
1712   }
1713   listen_fds = temp;
1714   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
1715
1716   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
1717   if (fd < 0)
1718   {
1719     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
1720     return (-1);
1721   }
1722
1723   memset (&sa, 0, sizeof (sa));
1724   sa.sun_family = AF_UNIX;
1725   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
1726
1727   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
1728   if (status != 0)
1729   {
1730     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1731     close (fd);
1732     unlink (path);
1733     return (-1);
1734   }
1735
1736   status = listen (fd, /* backlog = */ 10);
1737   if (status != 0)
1738   {
1739     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
1740     close (fd);
1741     unlink (path);
1742     return (-1);
1743   }
1744
1745   listen_fds[listen_fds_num].fd = fd;
1746   listen_fds[listen_fds_num].family = PF_UNIX;
1747   strncpy(listen_fds[listen_fds_num].addr, path,
1748           sizeof (listen_fds[listen_fds_num].addr) - 1);
1749   listen_fds_num++;
1750
1751   return (0);
1752 } /* }}} int open_listen_socket_unix */
1753
1754 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
1755 {
1756   struct addrinfo ai_hints;
1757   struct addrinfo *ai_res;
1758   struct addrinfo *ai_ptr;
1759   char addr_copy[NI_MAXHOST];
1760   char *addr;
1761   char *port;
1762   int status;
1763
1764   strncpy (addr_copy, sock->addr, sizeof (addr_copy));
1765   addr_copy[sizeof (addr_copy) - 1] = 0;
1766   addr = addr_copy;
1767
1768   memset (&ai_hints, 0, sizeof (ai_hints));
1769   ai_hints.ai_flags = 0;
1770 #ifdef AI_ADDRCONFIG
1771   ai_hints.ai_flags |= AI_ADDRCONFIG;
1772 #endif
1773   ai_hints.ai_family = AF_UNSPEC;
1774   ai_hints.ai_socktype = SOCK_STREAM;
1775
1776   port = NULL;
1777   if (*addr == '[') /* IPv6+port format */
1778   {
1779     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
1780     addr++;
1781
1782     port = strchr (addr, ']');
1783     if (port == NULL)
1784     {
1785       RRDD_LOG (LOG_ERR, "open_listen_socket_network: Malformed address: %s",
1786           sock->addr);
1787       return (-1);
1788     }
1789     *port = 0;
1790     port++;
1791
1792     if (*port == ':')
1793       port++;
1794     else if (*port == 0)
1795       port = NULL;
1796     else
1797     {
1798       RRDD_LOG (LOG_ERR, "open_listen_socket_network: Garbage after address: %s",
1799           port);
1800       return (-1);
1801     }
1802   } /* if (*addr = ']') */
1803   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
1804   {
1805     port = rindex(addr, ':');
1806     if (port != NULL)
1807     {
1808       *port = 0;
1809       port++;
1810     }
1811   }
1812   ai_res = NULL;
1813   status = getaddrinfo (addr,
1814                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
1815                         &ai_hints, &ai_res);
1816   if (status != 0)
1817   {
1818     RRDD_LOG (LOG_ERR, "open_listen_socket_network: getaddrinfo(%s) failed: "
1819         "%s", addr, gai_strerror (status));
1820     return (-1);
1821   }
1822
1823   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
1824   {
1825     int fd;
1826     listen_socket_t *temp;
1827     int one = 1;
1828
1829     temp = (listen_socket_t *) realloc (listen_fds,
1830         sizeof (listen_fds[0]) * (listen_fds_num + 1));
1831     if (temp == NULL)
1832     {
1833       RRDD_LOG (LOG_ERR, "open_listen_socket_network: realloc failed.");
1834       continue;
1835     }
1836     listen_fds = temp;
1837     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
1838
1839     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
1840     if (fd < 0)
1841     {
1842       RRDD_LOG (LOG_ERR, "open_listen_socket_network: socket(2) failed.");
1843       continue;
1844     }
1845
1846     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
1847
1848     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
1849     if (status != 0)
1850     {
1851       RRDD_LOG (LOG_ERR, "open_listen_socket_network: bind(2) failed.");
1852       close (fd);
1853       continue;
1854     }
1855
1856     status = listen (fd, /* backlog = */ 10);
1857     if (status != 0)
1858     {
1859       RRDD_LOG (LOG_ERR, "open_listen_socket_network: listen(2) failed.");
1860       close (fd);
1861       return (-1);
1862     }
1863
1864     listen_fds[listen_fds_num].fd = fd;
1865     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
1866     listen_fds_num++;
1867   } /* for (ai_ptr) */
1868
1869   return (0);
1870 } /* }}} static int open_listen_socket_network */
1871
1872 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
1873 {
1874   assert(sock != NULL);
1875   assert(sock->addr != NULL);
1876
1877   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
1878       || sock->addr[0] == '/')
1879     return (open_listen_socket_unix(sock));
1880   else
1881     return (open_listen_socket_network(sock));
1882 } /* }}} int open_listen_socket */
1883
1884 static int close_listen_sockets (void) /* {{{ */
1885 {
1886   size_t i;
1887
1888   for (i = 0; i < listen_fds_num; i++)
1889   {
1890     close (listen_fds[i].fd);
1891
1892     if (listen_fds[i].family == PF_UNIX)
1893       unlink(listen_fds[i].addr);
1894   }
1895
1896   free (listen_fds);
1897   listen_fds = NULL;
1898   listen_fds_num = 0;
1899
1900   return (0);
1901 } /* }}} int close_listen_sockets */
1902
1903 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
1904 {
1905   struct pollfd *pollfds;
1906   int pollfds_num;
1907   int status;
1908   int i;
1909
1910   for (i = 0; i < config_listen_address_list_len; i++)
1911     open_listen_socket (config_listen_address_list[i]);
1912
1913   if (config_listen_address_list_len < 1)
1914   {
1915     listen_socket_t sock;
1916     memset(&sock, 0, sizeof(sock));
1917     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
1918     open_listen_socket (&sock);
1919   }
1920
1921   if (listen_fds_num < 1)
1922   {
1923     RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
1924         "could be opened. Sorry.");
1925     return (NULL);
1926   }
1927
1928   pollfds_num = listen_fds_num;
1929   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
1930   if (pollfds == NULL)
1931   {
1932     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1933     return (NULL);
1934   }
1935   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
1936
1937   RRDD_LOG(LOG_INFO, "listening for connections");
1938
1939   while (do_shutdown == 0)
1940   {
1941     assert (pollfds_num == ((int) listen_fds_num));
1942     for (i = 0; i < pollfds_num; i++)
1943     {
1944       pollfds[i].fd = listen_fds[i].fd;
1945       pollfds[i].events = POLLIN | POLLPRI;
1946       pollfds[i].revents = 0;
1947     }
1948
1949     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
1950     if (do_shutdown)
1951       break;
1952     else if (status == 0) /* timeout */
1953       continue;
1954     else if (status < 0) /* error */
1955     {
1956       status = errno;
1957       if (status != EINTR)
1958       {
1959         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
1960       }
1961       continue;
1962     }
1963
1964     for (i = 0; i < pollfds_num; i++)
1965     {
1966       listen_socket_t *client_sock;
1967       struct sockaddr_storage client_sa;
1968       socklen_t client_sa_size;
1969       pthread_t tid;
1970       pthread_attr_t attr;
1971
1972       if (pollfds[i].revents == 0)
1973         continue;
1974
1975       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
1976       {
1977         RRDD_LOG (LOG_ERR, "listen_thread_main: "
1978             "poll(2) returned something unexpected for listen FD #%i.",
1979             pollfds[i].fd);
1980         continue;
1981       }
1982
1983       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
1984       if (client_sock == NULL)
1985       {
1986         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1987         continue;
1988       }
1989       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
1990
1991       client_sa_size = sizeof (client_sa);
1992       client_sock->fd = accept (pollfds[i].fd,
1993           (struct sockaddr *) &client_sa, &client_sa_size);
1994       if (client_sock->fd < 0)
1995       {
1996         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
1997         free(client_sock);
1998         continue;
1999       }
2000
2001       pthread_attr_init (&attr);
2002       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2003
2004       status = pthread_create (&tid, &attr, connection_thread_main,
2005                                client_sock);
2006       if (status != 0)
2007       {
2008         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2009         close (client_sock->fd);
2010         free (client_sock);
2011         continue;
2012       }
2013     } /* for (pollfds_num) */
2014   } /* while (do_shutdown == 0) */
2015
2016   RRDD_LOG(LOG_INFO, "starting shutdown");
2017
2018   close_listen_sockets ();
2019
2020   pthread_mutex_lock (&connection_threads_lock);
2021   while (connection_threads_num > 0)
2022   {
2023     pthread_t wait_for;
2024
2025     wait_for = connection_threads[0];
2026
2027     pthread_mutex_unlock (&connection_threads_lock);
2028     pthread_join (wait_for, /* retval = */ NULL);
2029     pthread_mutex_lock (&connection_threads_lock);
2030   }
2031   pthread_mutex_unlock (&connection_threads_lock);
2032
2033   return (NULL);
2034 } /* }}} void *listen_thread_main */
2035
2036 static int daemonize (void) /* {{{ */
2037 {
2038   int status;
2039   int fd;
2040   char *base_dir;
2041
2042   fd = open_pidfile();
2043   if (fd < 0) return fd;
2044
2045   if (!stay_foreground)
2046   {
2047     pid_t child;
2048
2049     child = fork ();
2050     if (child < 0)
2051     {
2052       fprintf (stderr, "daemonize: fork(2) failed.\n");
2053       return (-1);
2054     }
2055     else if (child > 0)
2056     {
2057       return (1);
2058     }
2059
2060     /* Become session leader */
2061     setsid ();
2062
2063     /* Open the first three file descriptors to /dev/null */
2064     close (2);
2065     close (1);
2066     close (0);
2067
2068     open ("/dev/null", O_RDWR);
2069     dup (0);
2070     dup (0);
2071   } /* if (!stay_foreground) */
2072
2073   /* Change into the /tmp directory. */
2074   base_dir = (config_base_dir != NULL)
2075     ? config_base_dir
2076     : "/tmp";
2077   status = chdir (base_dir);
2078   if (status != 0)
2079   {
2080     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2081     return (-1);
2082   }
2083
2084   install_signal_handlers();
2085
2086   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2087   RRDD_LOG(LOG_INFO, "starting up");
2088
2089   cache_tree = g_tree_new ((GCompareFunc) strcmp);
2090   if (cache_tree == NULL)
2091   {
2092     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2093     return (-1);
2094   }
2095
2096   status = write_pidfile (fd);
2097   return status;
2098 } /* }}} int daemonize */
2099
2100 static int cleanup (void) /* {{{ */
2101 {
2102   do_shutdown++;
2103
2104   pthread_cond_signal (&cache_cond);
2105   pthread_join (queue_thread, /* return = */ NULL);
2106
2107   remove_pidfile ();
2108
2109   RRDD_LOG(LOG_INFO, "goodbye");
2110   closelog ();
2111
2112   return (0);
2113 } /* }}} int cleanup */
2114
2115 static int read_options (int argc, char **argv) /* {{{ */
2116 {
2117   int option;
2118   int status = 0;
2119
2120   while ((option = getopt(argc, argv, "gl:L:f:w:b:Bz:p:j:h?F")) != -1)
2121   {
2122     switch (option)
2123     {
2124       case 'g':
2125         stay_foreground=1;
2126         break;
2127
2128       case 'L':
2129       case 'l':
2130       {
2131         listen_socket_t **temp;
2132         listen_socket_t *new;
2133
2134         new = malloc(sizeof(listen_socket_t));
2135         if (new == NULL)
2136         {
2137           fprintf(stderr, "read_options: malloc failed.\n");
2138           return(2);
2139         }
2140         memset(new, 0, sizeof(listen_socket_t));
2141
2142         temp = (listen_socket_t **) realloc (config_listen_address_list,
2143             sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2144         if (temp == NULL)
2145         {
2146           fprintf (stderr, "read_options: realloc failed.\n");
2147           return (2);
2148         }
2149         config_listen_address_list = temp;
2150
2151         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2152         new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2153
2154         temp[config_listen_address_list_len] = new;
2155         config_listen_address_list_len++;
2156       }
2157       break;
2158
2159       case 'f':
2160       {
2161         int temp;
2162
2163         temp = atoi (optarg);
2164         if (temp > 0)
2165           config_flush_interval = temp;
2166         else
2167         {
2168           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2169           status = 3;
2170         }
2171       }
2172       break;
2173
2174       case 'w':
2175       {
2176         int temp;
2177
2178         temp = atoi (optarg);
2179         if (temp > 0)
2180           config_write_interval = temp;
2181         else
2182         {
2183           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2184           status = 2;
2185         }
2186       }
2187       break;
2188
2189       case 'z':
2190       {
2191         int temp;
2192
2193         temp = atoi(optarg);
2194         if (temp > 0)
2195           config_write_jitter = temp;
2196         else
2197         {
2198           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2199           status = 2;
2200         }
2201
2202         break;
2203       }
2204
2205       case 'B':
2206         config_write_base_only = 1;
2207         break;
2208
2209       case 'b':
2210       {
2211         size_t len;
2212
2213         if (config_base_dir != NULL)
2214           free (config_base_dir);
2215         config_base_dir = strdup (optarg);
2216         if (config_base_dir == NULL)
2217         {
2218           fprintf (stderr, "read_options: strdup failed.\n");
2219           return (3);
2220         }
2221
2222         len = strlen (config_base_dir);
2223         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2224         {
2225           config_base_dir[len - 1] = 0;
2226           len--;
2227         }
2228
2229         if (len < 1)
2230         {
2231           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2232           return (4);
2233         }
2234
2235         _config_base_dir_len = len;
2236       }
2237       break;
2238
2239       case 'p':
2240       {
2241         if (config_pid_file != NULL)
2242           free (config_pid_file);
2243         config_pid_file = strdup (optarg);
2244         if (config_pid_file == NULL)
2245         {
2246           fprintf (stderr, "read_options: strdup failed.\n");
2247           return (3);
2248         }
2249       }
2250       break;
2251
2252       case 'F':
2253         config_flush_at_shutdown = 1;
2254         break;
2255
2256       case 'j':
2257       {
2258         struct stat statbuf;
2259         const char *dir = optarg;
2260
2261         status = stat(dir, &statbuf);
2262         if (status != 0)
2263         {
2264           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2265           return 6;
2266         }
2267
2268         if (!S_ISDIR(statbuf.st_mode)
2269             || access(dir, R_OK|W_OK|X_OK) != 0)
2270         {
2271           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2272                   errno ? rrd_strerror(errno) : "");
2273           return 6;
2274         }
2275
2276         journal_cur = malloc(PATH_MAX + 1);
2277         journal_old = malloc(PATH_MAX + 1);
2278         if (journal_cur == NULL || journal_old == NULL)
2279         {
2280           fprintf(stderr, "malloc failure for journal files\n");
2281           return 6;
2282         }
2283         else 
2284         {
2285           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2286           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2287         }
2288       }
2289       break;
2290
2291       case 'h':
2292       case '?':
2293         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2294             "\n"
2295             "Usage: rrdcached [options]\n"
2296             "\n"
2297             "Valid options are:\n"
2298             "  -l <address>  Socket address to listen to.\n"
2299             "  -L <address>  Socket address to listen to ('FLUSH' only).\n"
2300             "  -w <seconds>  Interval in which to write data.\n"
2301             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2302             "  -f <seconds>  Interval in which to flush dead data.\n"
2303             "  -p <file>     Location of the PID-file.\n"
2304             "  -b <dir>      Base directory to change to.\n"
2305             "  -B            Restrict file access to paths within -b <dir>\n"
2306             "  -g            Do not fork and run in the foreground.\n"
2307             "  -j <dir>      Directory in which to create the journal files.\n"
2308             "  -F            Always flush all updates at shutdown\n"
2309             "\n"
2310             "For more information and a detailed description of all options "
2311             "please refer\n"
2312             "to the rrdcached(1) manual page.\n",
2313             VERSION);
2314         status = -1;
2315         break;
2316     } /* switch (option) */
2317   } /* while (getopt) */
2318
2319   /* advise the user when values are not sane */
2320   if (config_flush_interval < 2 * config_write_interval)
2321     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2322             " 2x write interval (-w) !\n");
2323   if (config_write_jitter > config_write_interval)
2324     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2325             " write interval (-w) !\n");
2326
2327   if (config_write_base_only && config_base_dir == NULL)
2328     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2329             "  Consult the rrdcached documentation\n");
2330
2331   if (journal_cur == NULL)
2332     config_flush_at_shutdown = 1;
2333
2334   return (status);
2335 } /* }}} int read_options */
2336
2337 int main (int argc, char **argv)
2338 {
2339   int status;
2340
2341   status = read_options (argc, argv);
2342   if (status != 0)
2343   {
2344     if (status < 0)
2345       status = 0;
2346     return (status);
2347   }
2348
2349   status = daemonize ();
2350   if (status == 1)
2351   {
2352     struct sigaction sigchld;
2353
2354     memset (&sigchld, 0, sizeof (sigchld));
2355     sigchld.sa_handler = SIG_IGN;
2356     sigaction (SIGCHLD, &sigchld, NULL);
2357
2358     return (0);
2359   }
2360   else if (status != 0)
2361   {
2362     fprintf (stderr, "daemonize failed, exiting.\n");
2363     return (1);
2364   }
2365
2366   if (journal_cur != NULL)
2367   {
2368     int had_journal = 0;
2369
2370     pthread_mutex_lock(&journal_lock);
2371
2372     RRDD_LOG(LOG_INFO, "checking for journal files");
2373
2374     had_journal += journal_replay(journal_old);
2375     had_journal += journal_replay(journal_cur);
2376
2377     if (had_journal)
2378       flush_old_values(-1);
2379
2380     pthread_mutex_unlock(&journal_lock);
2381     journal_rotate();
2382
2383     RRDD_LOG(LOG_INFO, "journal processing complete");
2384   }
2385
2386   /* start the queue thread */
2387   memset (&queue_thread, 0, sizeof (queue_thread));
2388   status = pthread_create (&queue_thread,
2389                            NULL, /* attr */
2390                            queue_thread_main,
2391                            NULL); /* args */
2392   if (status != 0)
2393   {
2394     RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2395     cleanup();
2396     return (1);
2397   }
2398
2399   listen_thread_main (NULL);
2400   cleanup ();
2401
2402   return (0);
2403 } /* int main */
2404
2405 /*
2406  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2407  */