Now, moving a value to the head of the queue is O(1). Before it was
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
67
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
76
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
91
92 #include <glib-2.0/glib.h>
93 /* }}} */
94
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
96
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
100
101 /*
102  * Types
103  */
104 typedef enum
105 {
106   PRIV_LOW,
107   PRIV_HIGH
108 } socket_privilege;
109
110 struct listen_socket_s
111 {
112   int fd;
113   char addr[PATH_MAX + 1];
114   int family;
115   socket_privilege privilege;
116 };
117 typedef struct listen_socket_s listen_socket_t;
118
119 struct cache_item_s;
120 typedef struct cache_item_s cache_item_t;
121 struct cache_item_s
122 {
123   char *file;
124   char **values;
125   int values_num;
126   time_t last_flush_time;
127 #define CI_FLAGS_IN_TREE  (1<<0)
128 #define CI_FLAGS_IN_QUEUE (1<<1)
129   int flags;
130   pthread_cond_t  flushed;
131   cache_item_t *prev;
132   cache_item_t *next;
133 };
134
135 struct callback_flush_data_s
136 {
137   time_t now;
138   time_t abs_timeout;
139   char **keys;
140   size_t keys_num;
141 };
142 typedef struct callback_flush_data_s callback_flush_data_t;
143
144 enum queue_side_e
145 {
146   HEAD,
147   TAIL
148 };
149 typedef enum queue_side_e queue_side_t;
150
151 /* max length of socket command or response */
152 #define CMD_MAX 4096
153
154 /*
155  * Variables
156  */
157 static int stay_foreground = 0;
158
159 static listen_socket_t *listen_fds = NULL;
160 static size_t listen_fds_num = 0;
161
162 static int do_shutdown = 0;
163
164 static pthread_t queue_thread;
165
166 static pthread_t *connection_threads = NULL;
167 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
168 static int connection_threads_num = 0;
169
170 /* Cache stuff */
171 static GTree          *cache_tree = NULL;
172 static cache_item_t   *cache_queue_head = NULL;
173 static cache_item_t   *cache_queue_tail = NULL;
174 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
175 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
176
177 static int config_write_interval = 300;
178 static int config_write_jitter   = 0;
179 static int config_flush_interval = 3600;
180 static int config_flush_at_shutdown = 0;
181 static char *config_pid_file = NULL;
182 static char *config_base_dir = NULL;
183 static size_t _config_base_dir_len = 0;
184 static int config_write_base_only = 0;
185
186 static listen_socket_t **config_listen_address_list = NULL;
187 static int config_listen_address_list_len = 0;
188
189 static uint64_t stats_queue_length = 0;
190 static uint64_t stats_updates_received = 0;
191 static uint64_t stats_flush_received = 0;
192 static uint64_t stats_updates_written = 0;
193 static uint64_t stats_data_sets_written = 0;
194 static uint64_t stats_journal_bytes = 0;
195 static uint64_t stats_journal_rotate = 0;
196 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
197
198 /* Journaled updates */
199 static char *journal_cur = NULL;
200 static char *journal_old = NULL;
201 static FILE *journal_fh = NULL;
202 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
203 static int journal_write(char *cmd, char *args);
204 static void journal_done(void);
205 static void journal_rotate(void);
206
207 /* 
208  * Functions
209  */
210 static void sig_common (const char *sig) /* {{{ */
211 {
212   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
213   do_shutdown++;
214   pthread_cond_broadcast(&cache_cond);
215 } /* }}} void sig_common */
216
217 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
218 {
219   sig_common("INT");
220 } /* }}} void sig_int_handler */
221
222 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
223 {
224   sig_common("TERM");
225 } /* }}} void sig_term_handler */
226
227 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
228 {
229   config_flush_at_shutdown = 1;
230   sig_common("USR1");
231 } /* }}} void sig_usr1_handler */
232
233 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
234 {
235   config_flush_at_shutdown = 0;
236   sig_common("USR2");
237 } /* }}} void sig_usr2_handler */
238
239 static void install_signal_handlers(void) /* {{{ */
240 {
241   /* These structures are static, because `sigaction' behaves weird if the are
242    * overwritten.. */
243   static struct sigaction sa_int;
244   static struct sigaction sa_term;
245   static struct sigaction sa_pipe;
246   static struct sigaction sa_usr1;
247   static struct sigaction sa_usr2;
248
249   /* Install signal handlers */
250   memset (&sa_int, 0, sizeof (sa_int));
251   sa_int.sa_handler = sig_int_handler;
252   sigaction (SIGINT, &sa_int, NULL);
253
254   memset (&sa_term, 0, sizeof (sa_term));
255   sa_term.sa_handler = sig_term_handler;
256   sigaction (SIGTERM, &sa_term, NULL);
257
258   memset (&sa_pipe, 0, sizeof (sa_pipe));
259   sa_pipe.sa_handler = SIG_IGN;
260   sigaction (SIGPIPE, &sa_pipe, NULL);
261
262   memset (&sa_pipe, 0, sizeof (sa_usr1));
263   sa_usr1.sa_handler = sig_usr1_handler;
264   sigaction (SIGUSR1, &sa_usr1, NULL);
265
266   memset (&sa_usr2, 0, sizeof (sa_usr2));
267   sa_usr2.sa_handler = sig_usr2_handler;
268   sigaction (SIGUSR2, &sa_usr2, NULL);
269
270 } /* }}} void install_signal_handlers */
271
272 static int open_pidfile(void) /* {{{ */
273 {
274   int fd;
275   char *file;
276
277   file = (config_pid_file != NULL)
278     ? config_pid_file
279     : LOCALSTATEDIR "/run/rrdcached.pid";
280
281   fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
282   if (fd < 0)
283     fprintf(stderr, "FATAL: cannot create '%s' (%s)\n",
284             file, rrd_strerror(errno));
285
286   return(fd);
287 } /* }}} static int open_pidfile */
288
289 static int write_pidfile (int fd) /* {{{ */
290 {
291   pid_t pid;
292   FILE *fh;
293
294   pid = getpid ();
295
296   fh = fdopen (fd, "w");
297   if (fh == NULL)
298   {
299     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
300     close(fd);
301     return (-1);
302   }
303
304   fprintf (fh, "%i\n", (int) pid);
305   fclose (fh);
306
307   return (0);
308 } /* }}} int write_pidfile */
309
310 static int remove_pidfile (void) /* {{{ */
311 {
312   char *file;
313   int status;
314
315   file = (config_pid_file != NULL)
316     ? config_pid_file
317     : LOCALSTATEDIR "/run/rrdcached.pid";
318
319   status = unlink (file);
320   if (status == 0)
321     return (0);
322   return (errno);
323 } /* }}} int remove_pidfile */
324
325 static ssize_t sread (int fd, void *buffer_void, size_t buffer_size) /* {{{ */
326 {
327   char    *buffer;
328   size_t   buffer_used;
329   size_t   buffer_free;
330   ssize_t  status;
331
332   buffer       = (char *) buffer_void;
333   buffer_used  = 0;
334   buffer_free  = buffer_size;
335
336   while (buffer_free > 0)
337   {
338     status = read (fd, buffer + buffer_used, buffer_free);
339     if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
340       continue;
341
342     if (status < 0)
343       return (-1);
344
345     if (status == 0)
346       return (0);
347
348     assert ((0 > status) || (buffer_free >= (size_t) status));
349
350     buffer_free = buffer_free - status;
351     buffer_used = buffer_used + status;
352
353     if (buffer[buffer_used - 1] == '\n')
354       break;
355   }
356
357   assert (buffer_used > 0);
358
359   if (buffer[buffer_used - 1] != '\n')
360   {
361     errno = ENOBUFS;
362     return (-1);
363   }
364
365   buffer[buffer_used - 1] = 0;
366
367   /* Fix network line endings. */
368   if ((buffer_used > 1) && (buffer[buffer_used - 2] == '\r'))
369   {
370     buffer_used--;
371     buffer[buffer_used - 1] = 0;
372   }
373
374   return (buffer_used);
375 } /* }}} ssize_t sread */
376
377 static ssize_t swrite (int fd, const void *buf, size_t count) /* {{{ */
378 {
379   const char *ptr;
380   size_t      nleft;
381   ssize_t     status;
382
383   /* special case for journal replay */
384   if (fd < 0) return 0;
385
386   ptr   = (const char *) buf;
387   nleft = count;
388
389   while (nleft > 0)
390   {
391     status = write (fd, (const void *) ptr, nleft);
392
393     if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
394       continue;
395
396     if (status < 0)
397       return (status);
398
399     nleft -= status;
400     ptr   += status;
401   }
402
403   return (0);
404 } /* }}} ssize_t swrite */
405
406 static void wipe_ci_values(cache_item_t *ci, time_t when)
407 {
408   ci->values = NULL;
409   ci->values_num = 0;
410
411   ci->last_flush_time = when;
412   if (config_write_jitter > 0)
413     ci->last_flush_time += (random() % config_write_jitter);
414 }
415
416 /* remove_from_queue
417  * remove a "cache_item_t" item from the queue.
418  * must hold 'cache_lock' when calling this
419  */
420 static void remove_from_queue(cache_item_t *ci) /* {{{ */
421 {
422   if (ci == NULL) return;
423
424   if (ci->prev == NULL)
425     cache_queue_head = ci->next; /* reset head */
426   else
427     ci->prev->next = ci->next;
428
429   if (ci->next == NULL)
430     cache_queue_tail = ci->prev; /* reset the tail */
431   else
432     ci->next->prev = ci->prev;
433
434   ci->next = ci->prev = NULL;
435   ci->flags &= ~CI_FLAGS_IN_QUEUE;
436 } /* }}} static void remove_from_queue */
437
438 /*
439  * enqueue_cache_item:
440  * `cache_lock' must be acquired before calling this function!
441  */
442 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
443     queue_side_t side)
444 {
445   if (ci == NULL)
446     return (-1);
447
448   if (ci->values_num == 0)
449     return (0);
450
451   if (side == HEAD)
452   {
453     if (cache_queue_head == ci)
454       return 0;
455
456     /* remove from the double linked list */
457     if (ci->flags & CI_FLAGS_IN_QUEUE)
458       remove_from_queue(ci);
459
460     ci->prev = NULL;
461     ci->next = cache_queue_head;
462     if (ci->next != NULL)
463       ci->next->prev = ci;
464     cache_queue_head = ci;
465
466     if (cache_queue_tail == NULL)
467       cache_queue_tail = cache_queue_head;
468   }
469   else /* (side == TAIL) */
470   {
471     /* We don't move values back in the list.. */
472     if (ci->flags & CI_FLAGS_IN_QUEUE)
473       return (0);
474
475     assert (ci->next == NULL);
476     assert (ci->prev == NULL);
477
478     ci->prev = cache_queue_tail;
479
480     if (cache_queue_tail == NULL)
481       cache_queue_head = ci;
482     else
483       cache_queue_tail->next = ci;
484
485     cache_queue_tail = ci;
486   }
487
488   ci->flags |= CI_FLAGS_IN_QUEUE;
489
490   pthread_cond_broadcast(&cache_cond);
491   pthread_mutex_lock (&stats_lock);
492   stats_queue_length++;
493   pthread_mutex_unlock (&stats_lock);
494
495   return (0);
496 } /* }}} int enqueue_cache_item */
497
498 /*
499  * tree_callback_flush:
500  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
501  * while this is in progress.
502  */
503 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
504     gpointer data)
505 {
506   cache_item_t *ci;
507   callback_flush_data_t *cfd;
508
509   ci = (cache_item_t *) value;
510   cfd = (callback_flush_data_t *) data;
511
512   if ((ci->last_flush_time <= cfd->abs_timeout)
513       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
514       && (ci->values_num > 0))
515   {
516     enqueue_cache_item (ci, TAIL);
517   }
518   else if ((do_shutdown != 0)
519       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
520       && (ci->values_num > 0))
521   {
522     enqueue_cache_item (ci, TAIL);
523   }
524   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
525       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
526       && (ci->values_num <= 0))
527   {
528     char **temp;
529
530     temp = (char **) realloc (cfd->keys,
531         sizeof (char *) * (cfd->keys_num + 1));
532     if (temp == NULL)
533     {
534       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
535       return (FALSE);
536     }
537     cfd->keys = temp;
538     /* Make really sure this points to the _same_ place */
539     assert ((char *) key == ci->file);
540     cfd->keys[cfd->keys_num] = (char *) key;
541     cfd->keys_num++;
542   }
543
544   return (FALSE);
545 } /* }}} gboolean tree_callback_flush */
546
547 static int flush_old_values (int max_age)
548 {
549   callback_flush_data_t cfd;
550   size_t k;
551
552   memset (&cfd, 0, sizeof (cfd));
553   /* Pass the current time as user data so that we don't need to call
554    * `time' for each node. */
555   cfd.now = time (NULL);
556   cfd.keys = NULL;
557   cfd.keys_num = 0;
558
559   if (max_age > 0)
560     cfd.abs_timeout = cfd.now - max_age;
561   else
562     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
563
564   /* `tree_callback_flush' will return the keys of all values that haven't
565    * been touched in the last `config_flush_interval' seconds in `cfd'.
566    * The char*'s in this array point to the same memory as ci->file, so we
567    * don't need to free them separately. */
568   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
569
570   for (k = 0; k < cfd.keys_num; k++)
571   {
572     cache_item_t *ci;
573
574     /* This must not fail. */
575     ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
576     assert (ci != NULL);
577
578     /* If we end up here with values available, something's seriously
579      * messed up. */
580     assert (ci->values_num == 0);
581
582     /* Remove the node from the tree */
583     g_tree_remove (cache_tree, cfd.keys[k]);
584     cfd.keys[k] = NULL;
585
586     /* Now free and clean up `ci'. */
587     free (ci->file);
588     ci->file = NULL;
589     free (ci);
590     ci = NULL;
591   } /* for (k = 0; k < cfd.keys_num; k++) */
592
593   if (cfd.keys != NULL)
594   {
595     free (cfd.keys);
596     cfd.keys = NULL;
597   }
598
599   return (0);
600 } /* int flush_old_values */
601
602 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
603 {
604   struct timeval now;
605   struct timespec next_flush;
606   int final_flush = 0; /* make sure we only flush once on shutdown */
607
608   gettimeofday (&now, NULL);
609   next_flush.tv_sec = now.tv_sec + config_flush_interval;
610   next_flush.tv_nsec = 1000 * now.tv_usec;
611
612   pthread_mutex_lock (&cache_lock);
613   while ((do_shutdown == 0) || (cache_queue_head != NULL))
614   {
615     cache_item_t *ci;
616     char *file;
617     char **values;
618     int values_num;
619     int status;
620     int i;
621
622     /* First, check if it's time to do the cache flush. */
623     gettimeofday (&now, NULL);
624     if ((now.tv_sec > next_flush.tv_sec)
625         || ((now.tv_sec == next_flush.tv_sec)
626           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
627     {
628       /* Flush all values that haven't been written in the last
629        * `config_write_interval' seconds. */
630       flush_old_values (config_write_interval);
631
632       /* Determine the time of the next cache flush. */
633       while (next_flush.tv_sec <= now.tv_sec)
634         next_flush.tv_sec += config_flush_interval;
635
636       /* unlock the cache while we rotate so we don't block incoming
637        * updates if the fsync() blocks on disk I/O */
638       pthread_mutex_unlock(&cache_lock);
639       journal_rotate();
640       pthread_mutex_lock(&cache_lock);
641     }
642
643     /* Now, check if there's something to store away. If not, wait until
644      * something comes in or it's time to do the cache flush.  if we are
645      * shutting down, do not wait around.  */
646     if (cache_queue_head == NULL && !do_shutdown)
647     {
648       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
649       if ((status != 0) && (status != ETIMEDOUT))
650       {
651         RRDD_LOG (LOG_ERR, "queue_thread_main: "
652             "pthread_cond_timedwait returned %i.", status);
653       }
654     }
655
656     /* We're about to shut down */
657     if (do_shutdown != 0 && !final_flush++)
658     {
659       if (config_flush_at_shutdown)
660         flush_old_values (-1); /* flush everything */
661       else
662         break;
663     }
664
665     /* Check if a value has arrived. This may be NULL if we timed out or there
666      * was an interrupt such as a signal. */
667     if (cache_queue_head == NULL)
668       continue;
669
670     ci = cache_queue_head;
671
672     /* copy the relevant parts */
673     file = strdup (ci->file);
674     if (file == NULL)
675     {
676       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
677       continue;
678     }
679
680     assert(ci->values != NULL);
681     assert(ci->values_num > 0);
682
683     values = ci->values;
684     values_num = ci->values_num;
685
686     wipe_ci_values(ci, time(NULL));
687     remove_from_queue(ci);
688
689     pthread_mutex_lock (&stats_lock);
690     assert (stats_queue_length > 0);
691     stats_queue_length--;
692     pthread_mutex_unlock (&stats_lock);
693
694     pthread_mutex_unlock (&cache_lock);
695
696     rrd_clear_error ();
697     status = rrd_update_r (file, NULL, values_num, (void *) values);
698     if (status != 0)
699     {
700       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
701           "rrd_update_r (%s) failed with status %i. (%s)",
702           file, status, rrd_get_error());
703     }
704
705     journal_write("wrote", file);
706     pthread_cond_broadcast(&ci->flushed);
707
708     for (i = 0; i < values_num; i++)
709       free (values[i]);
710
711     free(values);
712     free(file);
713
714     if (status == 0)
715     {
716       pthread_mutex_lock (&stats_lock);
717       stats_updates_written++;
718       stats_data_sets_written += values_num;
719       pthread_mutex_unlock (&stats_lock);
720     }
721
722     pthread_mutex_lock (&cache_lock);
723
724     /* We're about to shut down */
725     if (do_shutdown != 0 && !final_flush++)
726     {
727       if (config_flush_at_shutdown)
728           flush_old_values (-1); /* flush everything */
729       else
730         break;
731     }
732   } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
733   pthread_mutex_unlock (&cache_lock);
734
735   if (config_flush_at_shutdown)
736   {
737     assert(cache_queue_head == NULL);
738     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
739   }
740
741   journal_done();
742
743   return (NULL);
744 } /* }}} void *queue_thread_main */
745
746 static int buffer_get_field (char **buffer_ret, /* {{{ */
747     size_t *buffer_size_ret, char **field_ret)
748 {
749   char *buffer;
750   size_t buffer_pos;
751   size_t buffer_size;
752   char *field;
753   size_t field_size;
754   int status;
755
756   buffer = *buffer_ret;
757   buffer_pos = 0;
758   buffer_size = *buffer_size_ret;
759   field = *buffer_ret;
760   field_size = 0;
761
762   if (buffer_size <= 0)
763     return (-1);
764
765   /* This is ensured by `handle_request'. */
766   assert (buffer[buffer_size - 1] == '\0');
767
768   status = -1;
769   while (buffer_pos < buffer_size)
770   {
771     /* Check for end-of-field or end-of-buffer */
772     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
773     {
774       field[field_size] = 0;
775       field_size++;
776       buffer_pos++;
777       status = 0;
778       break;
779     }
780     /* Handle escaped characters. */
781     else if (buffer[buffer_pos] == '\\')
782     {
783       if (buffer_pos >= (buffer_size - 1))
784         break;
785       buffer_pos++;
786       field[field_size] = buffer[buffer_pos];
787       field_size++;
788       buffer_pos++;
789     }
790     /* Normal operation */ 
791     else
792     {
793       field[field_size] = buffer[buffer_pos];
794       field_size++;
795       buffer_pos++;
796     }
797   } /* while (buffer_pos < buffer_size) */
798
799   if (status != 0)
800     return (status);
801
802   *buffer_ret = buffer + buffer_pos;
803   *buffer_size_ret = buffer_size - buffer_pos;
804   *field_ret = field;
805
806   return (0);
807 } /* }}} int buffer_get_field */
808
809 /* if we're restricting writes to the base directory,
810  * check whether the file falls within the dir
811  * returns 1 if OK, otherwise 0
812  */
813 static int check_file_access (const char *file, int fd) /* {{{ */
814 {
815   char error[CMD_MAX];
816   assert(file != NULL);
817
818   if (!config_write_base_only
819       || fd < 0 /* journal replay */
820       || config_base_dir == NULL)
821     return 1;
822
823   if (strstr(file, "../") != NULL) goto err;
824
825   /* relative paths without "../" are ok */
826   if (*file != '/') return 1;
827
828   /* file must be of the format base + "/" + <1+ char filename> */
829   if (strlen(file) < _config_base_dir_len + 2) goto err;
830   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
831   if (*(file + _config_base_dir_len) != '/') goto err;
832
833   return 1;
834
835 err:
836   snprintf(error, sizeof(error)-1, "-1 %s\n", rrd_strerror(EACCES));
837   swrite(fd, error, strlen(error));
838   return 0;
839 } /* }}} static int check_file_access */
840
841 static int flush_file (const char *filename) /* {{{ */
842 {
843   cache_item_t *ci;
844
845   pthread_mutex_lock (&cache_lock);
846
847   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
848   if (ci == NULL)
849   {
850     pthread_mutex_unlock (&cache_lock);
851     return (ENOENT);
852   }
853
854   if (ci->values_num > 0)
855   {
856     /* Enqueue at head */
857     enqueue_cache_item (ci, HEAD);
858     pthread_cond_wait(&ci->flushed, &cache_lock);
859   }
860
861   pthread_mutex_unlock(&cache_lock);
862
863   return (0);
864 } /* }}} int flush_file */
865
866 static int handle_request_help (int fd, /* {{{ */
867     char *buffer, size_t buffer_size)
868 {
869   int status;
870   char **help_text;
871   size_t help_text_len;
872   char *command;
873   size_t i;
874
875   char *help_help[] =
876   {
877     "5 Command overview\n",
878     "FLUSH <filename>\n",
879     "FLUSHALL\n",
880     "HELP [<command>]\n",
881     "UPDATE <filename> <values> [<values> ...]\n",
882     "STATS\n"
883   };
884   size_t help_help_len = sizeof (help_help) / sizeof (help_help[0]);
885
886   char *help_flush[] =
887   {
888     "4 Help for FLUSH\n",
889     "Usage: FLUSH <filename>\n",
890     "\n",
891     "Adds the given filename to the head of the update queue and returns\n",
892     "after is has been dequeued.\n"
893   };
894   size_t help_flush_len = sizeof (help_flush) / sizeof (help_flush[0]);
895
896   char *help_flushall[] =
897   {
898     "3 Help for FLUSHALL\n",
899     "Usage: FLUSHALL\n",
900     "\n",
901     "Triggers writing of all pending updates.  Returns immediately.\n"
902   };
903   size_t help_flushall_len = sizeof(help_flushall) / sizeof(help_flushall[0]);
904
905   char *help_update[] =
906   {
907     "9 Help for UPDATE\n",
908     "Usage: UPDATE <filename> <values> [<values> ...]\n"
909     "\n",
910     "Adds the given file to the internal cache if it is not yet known and\n",
911     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n",
912     "for details.\n",
913     "\n",
914     "Each <values> has the following form:\n",
915     "  <values> = <time>:<value>[:<value>[...]]\n",
916     "See the rrdupdate(1) manpage for details.\n"
917   };
918   size_t help_update_len = sizeof (help_update) / sizeof (help_update[0]);
919
920   char *help_stats[] =
921   {
922     "4 Help for STATS\n",
923     "Usage: STATS\n",
924     "\n",
925     "Returns some performance counters, see the rrdcached(1) manpage for\n",
926     "a description of the values.\n"
927   };
928   size_t help_stats_len = sizeof (help_stats) / sizeof (help_stats[0]);
929
930   status = buffer_get_field (&buffer, &buffer_size, &command);
931   if (status != 0)
932   {
933     help_text = help_help;
934     help_text_len = help_help_len;
935   }
936   else
937   {
938     if (strcasecmp (command, "update") == 0)
939     {
940       help_text = help_update;
941       help_text_len = help_update_len;
942     }
943     else if (strcasecmp (command, "flush") == 0)
944     {
945       help_text = help_flush;
946       help_text_len = help_flush_len;
947     }
948     else if (strcasecmp (command, "flushall") == 0)
949     {
950       help_text = help_flushall;
951       help_text_len = help_flushall_len;
952     }
953     else if (strcasecmp (command, "stats") == 0)
954     {
955       help_text = help_stats;
956       help_text_len = help_stats_len;
957     }
958     else
959     {
960       help_text = help_help;
961       help_text_len = help_help_len;
962     }
963   }
964
965   for (i = 0; i < help_text_len; i++)
966   {
967     status = swrite (fd, help_text[i], strlen (help_text[i]));
968     if (status < 0)
969     {
970       status = errno;
971       RRDD_LOG (LOG_ERR, "handle_request_help: swrite returned an error.");
972       return (status);
973     }
974   }
975
976   return (0);
977 } /* }}} int handle_request_help */
978
979 static int handle_request_stats (int fd, /* {{{ */
980     char *buffer __attribute__((unused)),
981     size_t buffer_size __attribute__((unused)))
982 {
983   int status;
984   char outbuf[CMD_MAX];
985
986   uint64_t copy_queue_length;
987   uint64_t copy_updates_received;
988   uint64_t copy_flush_received;
989   uint64_t copy_updates_written;
990   uint64_t copy_data_sets_written;
991   uint64_t copy_journal_bytes;
992   uint64_t copy_journal_rotate;
993
994   uint64_t tree_nodes_number;
995   uint64_t tree_depth;
996
997   pthread_mutex_lock (&stats_lock);
998   copy_queue_length       = stats_queue_length;
999   copy_updates_received   = stats_updates_received;
1000   copy_flush_received     = stats_flush_received;
1001   copy_updates_written    = stats_updates_written;
1002   copy_data_sets_written  = stats_data_sets_written;
1003   copy_journal_bytes      = stats_journal_bytes;
1004   copy_journal_rotate     = stats_journal_rotate;
1005   pthread_mutex_unlock (&stats_lock);
1006
1007   pthread_mutex_lock (&cache_lock);
1008   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1009   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1010   pthread_mutex_unlock (&cache_lock);
1011
1012 #define RRDD_STATS_SEND \
1013   outbuf[sizeof (outbuf) - 1] = 0; \
1014   status = swrite (fd, outbuf, strlen (outbuf)); \
1015   if (status < 0) \
1016   { \
1017     status = errno; \
1018     RRDD_LOG (LOG_INFO, "handle_request_stats: swrite returned an error."); \
1019     return (status); \
1020   }
1021
1022   strncpy (outbuf, "9 Statistics follow\n", sizeof (outbuf));
1023   RRDD_STATS_SEND;
1024
1025   snprintf (outbuf, sizeof (outbuf),
1026       "QueueLength: %"PRIu64"\n", copy_queue_length);
1027   RRDD_STATS_SEND;
1028
1029   snprintf (outbuf, sizeof (outbuf),
1030       "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1031   RRDD_STATS_SEND;
1032
1033   snprintf (outbuf, sizeof (outbuf),
1034       "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1035   RRDD_STATS_SEND;
1036
1037   snprintf (outbuf, sizeof (outbuf),
1038       "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1039   RRDD_STATS_SEND;
1040
1041   snprintf (outbuf, sizeof (outbuf),
1042       "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1043   RRDD_STATS_SEND;
1044
1045   snprintf (outbuf, sizeof (outbuf),
1046       "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1047   RRDD_STATS_SEND;
1048
1049   snprintf (outbuf, sizeof (outbuf),
1050       "TreeDepth: %"PRIu64"\n", tree_depth);
1051   RRDD_STATS_SEND;
1052
1053   snprintf (outbuf, sizeof(outbuf),
1054       "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1055   RRDD_STATS_SEND;
1056
1057   snprintf (outbuf, sizeof(outbuf),
1058       "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1059   RRDD_STATS_SEND;
1060
1061   return (0);
1062 #undef RRDD_STATS_SEND
1063 } /* }}} int handle_request_stats */
1064
1065 static int handle_request_flush (int fd, /* {{{ */
1066     char *buffer, size_t buffer_size)
1067 {
1068   char *file;
1069   int status;
1070   char result[CMD_MAX];
1071
1072   status = buffer_get_field (&buffer, &buffer_size, &file);
1073   if (status != 0)
1074   {
1075     strncpy (result, "-1 Usage: flush <filename>\n", sizeof (result));
1076   }
1077   else
1078   {
1079     pthread_mutex_lock(&stats_lock);
1080     stats_flush_received++;
1081     pthread_mutex_unlock(&stats_lock);
1082
1083     if (!check_file_access(file, fd)) return 0;
1084
1085     status = flush_file (file);
1086     if (status == 0)
1087       snprintf (result, sizeof (result), "0 Successfully flushed %s.\n", file);
1088     else if (status == ENOENT)
1089     {
1090       /* no file in our tree; see whether it exists at all */
1091       struct stat statbuf;
1092
1093       memset(&statbuf, 0, sizeof(statbuf));
1094       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1095         snprintf (result, sizeof (result), "0 Nothing to flush: %s.\n", file);
1096       else
1097         snprintf (result, sizeof (result), "-1 No such file: %s.\n", file);
1098     }
1099     else if (status < 0)
1100       strncpy (result, "-1 Internal error.\n", sizeof (result));
1101     else
1102       snprintf (result, sizeof (result), "-1 Failed with status %i.\n", status);
1103   }
1104   result[sizeof (result) - 1] = 0;
1105
1106   status = swrite (fd, result, strlen (result));
1107   if (status < 0)
1108   {
1109     status = errno;
1110     RRDD_LOG (LOG_INFO, "handle_request_flush: swrite returned an error.");
1111     return (status);
1112   }
1113
1114   return (0);
1115 } /* }}} int handle_request_flush */
1116
1117 static int handle_request_flushall(int fd) /* {{{ */
1118 {
1119   int status;
1120   char answer[] ="0 Started flush.\n";
1121
1122   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1123
1124   pthread_mutex_lock(&cache_lock);
1125   flush_old_values(-1);
1126   pthread_mutex_unlock(&cache_lock);
1127
1128   status = swrite(fd, answer, strlen(answer));
1129   if (status < 0)
1130   {
1131     status = errno;
1132     RRDD_LOG(LOG_INFO, "handle_request_flushall: swrite returned an error.");
1133   }
1134
1135   return (status);
1136 } /* }}} static int handle_request_flushall */
1137
1138 static int handle_request_update (int fd, /* {{{ */
1139     char *buffer, size_t buffer_size)
1140 {
1141   char *file;
1142   int values_num = 0;
1143   int status;
1144
1145   time_t now;
1146
1147   cache_item_t *ci;
1148   char answer[CMD_MAX];
1149
1150 #define RRDD_UPDATE_SEND \
1151   answer[sizeof (answer) - 1] = 0; \
1152   status = swrite (fd, answer, strlen (answer)); \
1153   if (status < 0) \
1154   { \
1155     status = errno; \
1156     RRDD_LOG (LOG_INFO, "handle_request_update: swrite returned an error."); \
1157     return (status); \
1158   }
1159
1160   now = time (NULL);
1161
1162   status = buffer_get_field (&buffer, &buffer_size, &file);
1163   if (status != 0)
1164   {
1165     strncpy (answer, "-1 Usage: UPDATE <filename> <values> [<values> ...]\n",
1166         sizeof (answer));
1167     RRDD_UPDATE_SEND;
1168     return (0);
1169   }
1170
1171   pthread_mutex_lock(&stats_lock);
1172   stats_updates_received++;
1173   pthread_mutex_unlock(&stats_lock);
1174
1175   if (!check_file_access(file, fd)) return 0;
1176
1177   pthread_mutex_lock (&cache_lock);
1178   ci = g_tree_lookup (cache_tree, file);
1179
1180   if (ci == NULL) /* {{{ */
1181   {
1182     struct stat statbuf;
1183
1184     /* don't hold the lock while we setup; stat(2) might block */
1185     pthread_mutex_unlock(&cache_lock);
1186
1187     memset (&statbuf, 0, sizeof (statbuf));
1188     status = stat (file, &statbuf);
1189     if (status != 0)
1190     {
1191       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1192
1193       status = errno;
1194       if (status == ENOENT)
1195         snprintf (answer, sizeof (answer), "-1 No such file: %s\n", file);
1196       else
1197         snprintf (answer, sizeof (answer), "-1 stat failed with error %i.\n",
1198             status);
1199       RRDD_UPDATE_SEND;
1200       return (0);
1201     }
1202     if (!S_ISREG (statbuf.st_mode))
1203     {
1204       snprintf (answer, sizeof (answer), "-1 Not a regular file: %s\n", file);
1205       RRDD_UPDATE_SEND;
1206       return (0);
1207     }
1208     if (access(file, R_OK|W_OK) != 0)
1209     {
1210       snprintf (answer, sizeof (answer), "-1 Cannot read/write %s: %s\n",
1211                 file, rrd_strerror(errno));
1212       RRDD_UPDATE_SEND;
1213       return (0);
1214     }
1215
1216     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1217     if (ci == NULL)
1218     {
1219       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1220
1221       strncpy (answer, "-1 malloc failed.\n", sizeof (answer));
1222       RRDD_UPDATE_SEND;
1223       return (0);
1224     }
1225     memset (ci, 0, sizeof (cache_item_t));
1226
1227     ci->file = strdup (file);
1228     if (ci->file == NULL)
1229     {
1230       free (ci);
1231       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1232
1233       strncpy (answer, "-1 strdup failed.\n", sizeof (answer));
1234       RRDD_UPDATE_SEND;
1235       return (0);
1236     }
1237
1238     wipe_ci_values(ci, now);
1239     ci->flags = CI_FLAGS_IN_TREE;
1240
1241     pthread_mutex_lock(&cache_lock);
1242     g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1243   } /* }}} */
1244   assert (ci != NULL);
1245
1246   while (buffer_size > 0)
1247   {
1248     char **temp;
1249     char *value;
1250
1251     status = buffer_get_field (&buffer, &buffer_size, &value);
1252     if (status != 0)
1253     {
1254       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1255       break;
1256     }
1257
1258     temp = (char **) realloc (ci->values,
1259         sizeof (char *) * (ci->values_num + 1));
1260     if (temp == NULL)
1261     {
1262       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1263       continue;
1264     }
1265     ci->values = temp;
1266
1267     ci->values[ci->values_num] = strdup (value);
1268     if (ci->values[ci->values_num] == NULL)
1269     {
1270       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1271       continue;
1272     }
1273     ci->values_num++;
1274
1275     values_num++;
1276   }
1277
1278   if (((now - ci->last_flush_time) >= config_write_interval)
1279       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1280       && (ci->values_num > 0))
1281   {
1282     enqueue_cache_item (ci, TAIL);
1283   }
1284
1285   pthread_mutex_unlock (&cache_lock);
1286
1287   if (values_num < 1)
1288   {
1289     strncpy (answer, "-1 No values updated.\n", sizeof (answer));
1290   }
1291   else
1292   {
1293     snprintf (answer, sizeof (answer), "0 Enqueued %i value%s\n", values_num,
1294         (values_num == 1) ? "" : "s");
1295   }
1296   RRDD_UPDATE_SEND;
1297   return (0);
1298 #undef RRDD_UPDATE_SEND
1299 } /* }}} int handle_request_update */
1300
1301 /* we came across a "WROTE" entry during journal replay.
1302  * throw away any values that we have accumulated for this file
1303  */
1304 static int handle_request_wrote (int fd __attribute__((unused)), /* {{{ */
1305                                  const char *buffer,
1306                                  size_t buffer_size __attribute__((unused)))
1307 {
1308   int i;
1309   cache_item_t *ci;
1310   const char *file = buffer;
1311
1312   pthread_mutex_lock(&cache_lock);
1313
1314   ci = g_tree_lookup(cache_tree, file);
1315   if (ci == NULL)
1316   {
1317     pthread_mutex_unlock(&cache_lock);
1318     return (0);
1319   }
1320
1321   if (ci->values)
1322   {
1323     for (i=0; i < ci->values_num; i++)
1324       free(ci->values[i]);
1325
1326     free(ci->values);
1327   }
1328
1329   wipe_ci_values(ci, time(NULL));
1330   remove_from_queue(ci);
1331
1332   pthread_mutex_unlock(&cache_lock);
1333   return (0);
1334 } /* }}} int handle_request_wrote */
1335
1336 /* returns 1 if we have the required privilege level */
1337 static int has_privilege (socket_privilege priv, /* {{{ */
1338                           socket_privilege required, int fd)
1339 {
1340   int status;
1341   char error[CMD_MAX];
1342
1343   if (priv >= required)
1344     return 1;
1345
1346   sprintf(error, "-1 %s\n", rrd_strerror(EACCES));
1347   status = swrite(fd, error, strlen(error));
1348
1349   if (status < 0)
1350     return status;
1351   else
1352     return 0;
1353 } /* }}} static int has_privilege */
1354
1355 /* if fd < 0, we are in journal replay mode */
1356 static int handle_request (int fd, socket_privilege privilege, /* {{{ */
1357                            char *buffer, size_t buffer_size)
1358 {
1359   char *buffer_ptr;
1360   char *command;
1361   int status;
1362
1363   assert (buffer[buffer_size - 1] == '\0');
1364
1365   buffer_ptr = buffer;
1366   command = NULL;
1367   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1368   if (status != 0)
1369   {
1370     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1371     return (-1);
1372   }
1373
1374   if (strcasecmp (command, "update") == 0)
1375   {
1376     status = has_privilege(privilege, PRIV_HIGH, fd);
1377     if (status <= 0)
1378       return status;
1379
1380     /* don't re-write updates in replay mode */
1381     if (fd >= 0)
1382       journal_write(command, buffer_ptr);
1383
1384     return (handle_request_update (fd, buffer_ptr, buffer_size));
1385   }
1386   else if (strcasecmp (command, "wrote") == 0 && fd < 0)
1387   {
1388     /* this is only valid in replay mode */
1389     return (handle_request_wrote (fd, buffer_ptr, buffer_size));
1390   }
1391   else if (strcasecmp (command, "flush") == 0)
1392   {
1393     return (handle_request_flush (fd, buffer_ptr, buffer_size));
1394   }
1395   else if (strcasecmp (command, "flushall") == 0)
1396   {
1397     status = has_privilege(privilege, PRIV_HIGH, fd);
1398     if (status <= 0)
1399       return status;
1400
1401     return (handle_request_flushall(fd));
1402   }
1403   else if (strcasecmp (command, "stats") == 0)
1404   {
1405     return (handle_request_stats (fd, buffer_ptr, buffer_size));
1406   }
1407   else if (strcasecmp (command, "help") == 0)
1408   {
1409     return (handle_request_help (fd, buffer_ptr, buffer_size));
1410   }
1411   else
1412   {
1413     char result[CMD_MAX];
1414
1415     snprintf (result, sizeof (result), "-1 Unknown command: %s\n", command);
1416     result[sizeof (result) - 1] = 0;
1417
1418     status = swrite (fd, result, strlen (result));
1419     if (status < 0)
1420     {
1421       RRDD_LOG (LOG_ERR, "handle_request: swrite failed.");
1422       return (-1);
1423     }
1424   }
1425
1426   return (0);
1427 } /* }}} int handle_request */
1428
1429 /* MUST NOT hold journal_lock before calling this */
1430 static void journal_rotate(void) /* {{{ */
1431 {
1432   FILE *old_fh = NULL;
1433
1434   if (journal_cur == NULL || journal_old == NULL)
1435     return;
1436
1437   pthread_mutex_lock(&journal_lock);
1438
1439   /* we rotate this way (rename before close) so that the we can release
1440    * the journal lock as fast as possible.  Journal writes to the new
1441    * journal can proceed immediately after the new file is opened.  The
1442    * fclose can then block without affecting new updates.
1443    */
1444   if (journal_fh != NULL)
1445   {
1446     old_fh = journal_fh;
1447     rename(journal_cur, journal_old);
1448     ++stats_journal_rotate;
1449   }
1450
1451   journal_fh = fopen(journal_cur, "a");
1452   pthread_mutex_unlock(&journal_lock);
1453
1454   if (old_fh != NULL)
1455     fclose(old_fh);
1456
1457   if (journal_fh == NULL)
1458   {
1459     RRDD_LOG(LOG_CRIT,
1460              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1461              journal_cur, rrd_strerror(errno));
1462
1463     RRDD_LOG(LOG_ERR,
1464              "JOURNALING DISABLED: All values will be flushed at shutdown");
1465     config_flush_at_shutdown = 1;
1466   }
1467
1468 } /* }}} static void journal_rotate */
1469
1470 static void journal_done(void) /* {{{ */
1471 {
1472   if (journal_cur == NULL)
1473     return;
1474
1475   pthread_mutex_lock(&journal_lock);
1476   if (journal_fh != NULL)
1477   {
1478     fclose(journal_fh);
1479     journal_fh = NULL;
1480   }
1481
1482   if (config_flush_at_shutdown)
1483   {
1484     RRDD_LOG(LOG_INFO, "removing journals");
1485     unlink(journal_old);
1486     unlink(journal_cur);
1487   }
1488   else
1489   {
1490     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1491              "journals will be used at next startup");
1492   }
1493
1494   pthread_mutex_unlock(&journal_lock);
1495
1496 } /* }}} static void journal_done */
1497
1498 static int journal_write(char *cmd, char *args) /* {{{ */
1499 {
1500   int chars;
1501
1502   if (journal_fh == NULL)
1503     return 0;
1504
1505   pthread_mutex_lock(&journal_lock);
1506   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1507   pthread_mutex_unlock(&journal_lock);
1508
1509   if (chars > 0)
1510   {
1511     pthread_mutex_lock(&stats_lock);
1512     stats_journal_bytes += chars;
1513     pthread_mutex_unlock(&stats_lock);
1514   }
1515
1516   return chars;
1517 } /* }}} static int journal_write */
1518
1519 static int journal_replay (const char *file) /* {{{ */
1520 {
1521   FILE *fh;
1522   int entry_cnt = 0;
1523   int fail_cnt = 0;
1524   uint64_t line = 0;
1525   char entry[CMD_MAX];
1526
1527   if (file == NULL) return 0;
1528
1529   fh = fopen(file, "r");
1530   if (fh == NULL)
1531   {
1532     if (errno != ENOENT)
1533       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1534                file, rrd_strerror(errno));
1535     return 0;
1536   }
1537   else
1538     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1539
1540   while(!feof(fh))
1541   {
1542     size_t entry_len;
1543
1544     ++line;
1545     if (fgets(entry, sizeof(entry), fh) == NULL)
1546       break;
1547     entry_len = strlen(entry);
1548
1549     /* check \n termination in case journal writing crashed mid-line */
1550     if (entry_len == 0)
1551       continue;
1552     else if (entry[entry_len - 1] != '\n')
1553     {
1554       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1555       ++fail_cnt;
1556       continue;
1557     }
1558
1559     entry[entry_len - 1] = '\0';
1560
1561     if (handle_request(-1, PRIV_HIGH, entry, entry_len) == 0)
1562       ++entry_cnt;
1563     else
1564       ++fail_cnt;
1565   }
1566
1567   fclose(fh);
1568
1569   if (entry_cnt > 0)
1570   {
1571     RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1572              entry_cnt, fail_cnt);
1573     return 1;
1574   }
1575   else
1576     return 0;
1577
1578 } /* }}} static int journal_replay */
1579
1580 static void *connection_thread_main (void *args) /* {{{ */
1581 {
1582   pthread_t self;
1583   listen_socket_t *sock;
1584   int i;
1585   int fd;
1586
1587   sock = (listen_socket_t *) args;
1588   fd = sock->fd;
1589
1590   pthread_mutex_lock (&connection_threads_lock);
1591   {
1592     pthread_t *temp;
1593
1594     temp = (pthread_t *) realloc (connection_threads,
1595         sizeof (pthread_t) * (connection_threads_num + 1));
1596     if (temp == NULL)
1597     {
1598       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1599     }
1600     else
1601     {
1602       connection_threads = temp;
1603       connection_threads[connection_threads_num] = pthread_self ();
1604       connection_threads_num++;
1605     }
1606   }
1607   pthread_mutex_unlock (&connection_threads_lock);
1608
1609   while (do_shutdown == 0)
1610   {
1611     char buffer[CMD_MAX];
1612
1613     struct pollfd pollfd;
1614     int status;
1615
1616     pollfd.fd = fd;
1617     pollfd.events = POLLIN | POLLPRI;
1618     pollfd.revents = 0;
1619
1620     status = poll (&pollfd, 1, /* timeout = */ 500);
1621     if (do_shutdown)
1622       break;
1623     else if (status == 0) /* timeout */
1624       continue;
1625     else if (status < 0) /* error */
1626     {
1627       status = errno;
1628       if (status == EINTR)
1629         continue;
1630       RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1631       continue;
1632     }
1633
1634     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1635     {
1636       close (fd);
1637       break;
1638     }
1639     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1640     {
1641       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1642           "poll(2) returned something unexpected: %#04hx",
1643           pollfd.revents);
1644       close (fd);
1645       break;
1646     }
1647
1648     status = (int) sread (fd, buffer, sizeof (buffer));
1649     if (status <= 0)
1650     {
1651       close (fd);
1652
1653       if (status < 0)
1654         RRDD_LOG(LOG_ERR, "connection_thread_main: sread failed.");
1655
1656       break;
1657     }
1658
1659     status = handle_request (fd, sock->privilege, buffer, status);
1660     if (status != 0)
1661       break;
1662   }
1663
1664   close(fd);
1665   free(args);
1666
1667   self = pthread_self ();
1668   /* Remove this thread from the connection threads list */
1669   pthread_mutex_lock (&connection_threads_lock);
1670   /* Find out own index in the array */
1671   for (i = 0; i < connection_threads_num; i++)
1672     if (pthread_equal (connection_threads[i], self) != 0)
1673       break;
1674   assert (i < connection_threads_num);
1675
1676   /* Move the trailing threads forward. */
1677   if (i < (connection_threads_num - 1))
1678   {
1679     memmove (connection_threads + i,
1680         connection_threads + i + 1,
1681         sizeof (pthread_t) * (connection_threads_num - i - 1));
1682   }
1683
1684   connection_threads_num--;
1685   pthread_mutex_unlock (&connection_threads_lock);
1686
1687   return (NULL);
1688 } /* }}} void *connection_thread_main */
1689
1690 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
1691 {
1692   int fd;
1693   struct sockaddr_un sa;
1694   listen_socket_t *temp;
1695   int status;
1696   const char *path;
1697
1698   path = sock->addr;
1699   if (strncmp(path, "unix:", strlen("unix:")) == 0)
1700     path += strlen("unix:");
1701
1702   temp = (listen_socket_t *) realloc (listen_fds,
1703       sizeof (listen_fds[0]) * (listen_fds_num + 1));
1704   if (temp == NULL)
1705   {
1706     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1707     return (-1);
1708   }
1709   listen_fds = temp;
1710   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
1711
1712   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
1713   if (fd < 0)
1714   {
1715     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
1716     return (-1);
1717   }
1718
1719   memset (&sa, 0, sizeof (sa));
1720   sa.sun_family = AF_UNIX;
1721   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
1722
1723   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
1724   if (status != 0)
1725   {
1726     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1727     close (fd);
1728     unlink (path);
1729     return (-1);
1730   }
1731
1732   status = listen (fd, /* backlog = */ 10);
1733   if (status != 0)
1734   {
1735     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
1736     close (fd);
1737     unlink (path);
1738     return (-1);
1739   }
1740
1741   listen_fds[listen_fds_num].fd = fd;
1742   listen_fds[listen_fds_num].family = PF_UNIX;
1743   strncpy(listen_fds[listen_fds_num].addr, path,
1744           sizeof (listen_fds[listen_fds_num].addr) - 1);
1745   listen_fds_num++;
1746
1747   return (0);
1748 } /* }}} int open_listen_socket_unix */
1749
1750 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
1751 {
1752   struct addrinfo ai_hints;
1753   struct addrinfo *ai_res;
1754   struct addrinfo *ai_ptr;
1755   char addr_copy[NI_MAXHOST];
1756   char *addr;
1757   char *port;
1758   int status;
1759
1760   strncpy (addr_copy, sock->addr, sizeof (addr_copy));
1761   addr_copy[sizeof (addr_copy) - 1] = 0;
1762   addr = addr_copy;
1763
1764   memset (&ai_hints, 0, sizeof (ai_hints));
1765   ai_hints.ai_flags = 0;
1766 #ifdef AI_ADDRCONFIG
1767   ai_hints.ai_flags |= AI_ADDRCONFIG;
1768 #endif
1769   ai_hints.ai_family = AF_UNSPEC;
1770   ai_hints.ai_socktype = SOCK_STREAM;
1771
1772   port = NULL;
1773   if (*addr == '[') /* IPv6+port format */
1774   {
1775     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
1776     addr++;
1777
1778     port = strchr (addr, ']');
1779     if (port == NULL)
1780     {
1781       RRDD_LOG (LOG_ERR, "open_listen_socket_network: Malformed address: %s",
1782           sock->addr);
1783       return (-1);
1784     }
1785     *port = 0;
1786     port++;
1787
1788     if (*port == ':')
1789       port++;
1790     else if (*port == 0)
1791       port = NULL;
1792     else
1793     {
1794       RRDD_LOG (LOG_ERR, "open_listen_socket_network: Garbage after address: %s",
1795           port);
1796       return (-1);
1797     }
1798   } /* if (*addr = ']') */
1799   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
1800   {
1801     port = rindex(addr, ':');
1802     if (port != NULL)
1803     {
1804       *port = 0;
1805       port++;
1806     }
1807   }
1808   ai_res = NULL;
1809   status = getaddrinfo (addr,
1810                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
1811                         &ai_hints, &ai_res);
1812   if (status != 0)
1813   {
1814     RRDD_LOG (LOG_ERR, "open_listen_socket_network: getaddrinfo(%s) failed: "
1815         "%s", addr, gai_strerror (status));
1816     return (-1);
1817   }
1818
1819   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
1820   {
1821     int fd;
1822     listen_socket_t *temp;
1823     int one = 1;
1824
1825     temp = (listen_socket_t *) realloc (listen_fds,
1826         sizeof (listen_fds[0]) * (listen_fds_num + 1));
1827     if (temp == NULL)
1828     {
1829       RRDD_LOG (LOG_ERR, "open_listen_socket_network: realloc failed.");
1830       continue;
1831     }
1832     listen_fds = temp;
1833     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
1834
1835     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
1836     if (fd < 0)
1837     {
1838       RRDD_LOG (LOG_ERR, "open_listen_socket_network: socket(2) failed.");
1839       continue;
1840     }
1841
1842     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
1843
1844     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
1845     if (status != 0)
1846     {
1847       RRDD_LOG (LOG_ERR, "open_listen_socket_network: bind(2) failed.");
1848       close (fd);
1849       continue;
1850     }
1851
1852     status = listen (fd, /* backlog = */ 10);
1853     if (status != 0)
1854     {
1855       RRDD_LOG (LOG_ERR, "open_listen_socket_network: listen(2) failed.");
1856       close (fd);
1857       return (-1);
1858     }
1859
1860     listen_fds[listen_fds_num].fd = fd;
1861     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
1862     listen_fds_num++;
1863   } /* for (ai_ptr) */
1864
1865   return (0);
1866 } /* }}} static int open_listen_socket_network */
1867
1868 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
1869 {
1870   assert(sock != NULL);
1871   assert(sock->addr != NULL);
1872
1873   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
1874       || sock->addr[0] == '/')
1875     return (open_listen_socket_unix(sock));
1876   else
1877     return (open_listen_socket_network(sock));
1878 } /* }}} int open_listen_socket */
1879
1880 static int close_listen_sockets (void) /* {{{ */
1881 {
1882   size_t i;
1883
1884   for (i = 0; i < listen_fds_num; i++)
1885   {
1886     close (listen_fds[i].fd);
1887
1888     if (listen_fds[i].family == PF_UNIX)
1889       unlink(listen_fds[i].addr);
1890   }
1891
1892   free (listen_fds);
1893   listen_fds = NULL;
1894   listen_fds_num = 0;
1895
1896   return (0);
1897 } /* }}} int close_listen_sockets */
1898
1899 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
1900 {
1901   struct pollfd *pollfds;
1902   int pollfds_num;
1903   int status;
1904   int i;
1905
1906   for (i = 0; i < config_listen_address_list_len; i++)
1907     open_listen_socket (config_listen_address_list[i]);
1908
1909   if (config_listen_address_list_len < 1)
1910   {
1911     listen_socket_t sock;
1912     memset(&sock, 0, sizeof(sock));
1913     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
1914     open_listen_socket (&sock);
1915   }
1916
1917   if (listen_fds_num < 1)
1918   {
1919     RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
1920         "could be opened. Sorry.");
1921     return (NULL);
1922   }
1923
1924   pollfds_num = listen_fds_num;
1925   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
1926   if (pollfds == NULL)
1927   {
1928     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1929     return (NULL);
1930   }
1931   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
1932
1933   RRDD_LOG(LOG_INFO, "listening for connections");
1934
1935   while (do_shutdown == 0)
1936   {
1937     assert (pollfds_num == ((int) listen_fds_num));
1938     for (i = 0; i < pollfds_num; i++)
1939     {
1940       pollfds[i].fd = listen_fds[i].fd;
1941       pollfds[i].events = POLLIN | POLLPRI;
1942       pollfds[i].revents = 0;
1943     }
1944
1945     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
1946     if (do_shutdown)
1947       break;
1948     else if (status == 0) /* timeout */
1949       continue;
1950     else if (status < 0) /* error */
1951     {
1952       status = errno;
1953       if (status != EINTR)
1954       {
1955         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
1956       }
1957       continue;
1958     }
1959
1960     for (i = 0; i < pollfds_num; i++)
1961     {
1962       listen_socket_t *client_sock;
1963       struct sockaddr_storage client_sa;
1964       socklen_t client_sa_size;
1965       pthread_t tid;
1966       pthread_attr_t attr;
1967
1968       if (pollfds[i].revents == 0)
1969         continue;
1970
1971       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
1972       {
1973         RRDD_LOG (LOG_ERR, "listen_thread_main: "
1974             "poll(2) returned something unexpected for listen FD #%i.",
1975             pollfds[i].fd);
1976         continue;
1977       }
1978
1979       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
1980       if (client_sock == NULL)
1981       {
1982         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1983         continue;
1984       }
1985       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
1986
1987       client_sa_size = sizeof (client_sa);
1988       client_sock->fd = accept (pollfds[i].fd,
1989           (struct sockaddr *) &client_sa, &client_sa_size);
1990       if (client_sock->fd < 0)
1991       {
1992         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
1993         free(client_sock);
1994         continue;
1995       }
1996
1997       pthread_attr_init (&attr);
1998       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
1999
2000       status = pthread_create (&tid, &attr, connection_thread_main,
2001                                client_sock);
2002       if (status != 0)
2003       {
2004         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2005         close (client_sock->fd);
2006         free (client_sock);
2007         continue;
2008       }
2009     } /* for (pollfds_num) */
2010   } /* while (do_shutdown == 0) */
2011
2012   RRDD_LOG(LOG_INFO, "starting shutdown");
2013
2014   close_listen_sockets ();
2015
2016   pthread_mutex_lock (&connection_threads_lock);
2017   while (connection_threads_num > 0)
2018   {
2019     pthread_t wait_for;
2020
2021     wait_for = connection_threads[0];
2022
2023     pthread_mutex_unlock (&connection_threads_lock);
2024     pthread_join (wait_for, /* retval = */ NULL);
2025     pthread_mutex_lock (&connection_threads_lock);
2026   }
2027   pthread_mutex_unlock (&connection_threads_lock);
2028
2029   return (NULL);
2030 } /* }}} void *listen_thread_main */
2031
2032 static int daemonize (void) /* {{{ */
2033 {
2034   int status;
2035   int fd;
2036   char *base_dir;
2037
2038   fd = open_pidfile();
2039   if (fd < 0) return fd;
2040
2041   if (!stay_foreground)
2042   {
2043     pid_t child;
2044
2045     child = fork ();
2046     if (child < 0)
2047     {
2048       fprintf (stderr, "daemonize: fork(2) failed.\n");
2049       return (-1);
2050     }
2051     else if (child > 0)
2052     {
2053       return (1);
2054     }
2055
2056     /* Become session leader */
2057     setsid ();
2058
2059     /* Open the first three file descriptors to /dev/null */
2060     close (2);
2061     close (1);
2062     close (0);
2063
2064     open ("/dev/null", O_RDWR);
2065     dup (0);
2066     dup (0);
2067   } /* if (!stay_foreground) */
2068
2069   /* Change into the /tmp directory. */
2070   base_dir = (config_base_dir != NULL)
2071     ? config_base_dir
2072     : "/tmp";
2073   status = chdir (base_dir);
2074   if (status != 0)
2075   {
2076     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2077     return (-1);
2078   }
2079
2080   install_signal_handlers();
2081
2082   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2083   RRDD_LOG(LOG_INFO, "starting up");
2084
2085   cache_tree = g_tree_new ((GCompareFunc) strcmp);
2086   if (cache_tree == NULL)
2087   {
2088     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2089     return (-1);
2090   }
2091
2092   status = write_pidfile (fd);
2093   return status;
2094 } /* }}} int daemonize */
2095
2096 static int cleanup (void) /* {{{ */
2097 {
2098   do_shutdown++;
2099
2100   pthread_cond_signal (&cache_cond);
2101   pthread_join (queue_thread, /* return = */ NULL);
2102
2103   remove_pidfile ();
2104
2105   RRDD_LOG(LOG_INFO, "goodbye");
2106   closelog ();
2107
2108   return (0);
2109 } /* }}} int cleanup */
2110
2111 static int read_options (int argc, char **argv) /* {{{ */
2112 {
2113   int option;
2114   int status = 0;
2115
2116   while ((option = getopt(argc, argv, "gl:L:f:w:b:Bz:p:j:h?F")) != -1)
2117   {
2118     switch (option)
2119     {
2120       case 'g':
2121         stay_foreground=1;
2122         break;
2123
2124       case 'L':
2125       case 'l':
2126       {
2127         listen_socket_t **temp;
2128         listen_socket_t *new;
2129
2130         new = malloc(sizeof(listen_socket_t));
2131         if (new == NULL)
2132         {
2133           fprintf(stderr, "read_options: malloc failed.\n");
2134           return(2);
2135         }
2136         memset(new, 0, sizeof(listen_socket_t));
2137
2138         temp = (listen_socket_t **) realloc (config_listen_address_list,
2139             sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2140         if (temp == NULL)
2141         {
2142           fprintf (stderr, "read_options: realloc failed.\n");
2143           return (2);
2144         }
2145         config_listen_address_list = temp;
2146
2147         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2148         new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2149
2150         temp[config_listen_address_list_len] = new;
2151         config_listen_address_list_len++;
2152       }
2153       break;
2154
2155       case 'f':
2156       {
2157         int temp;
2158
2159         temp = atoi (optarg);
2160         if (temp > 0)
2161           config_flush_interval = temp;
2162         else
2163         {
2164           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2165           status = 3;
2166         }
2167       }
2168       break;
2169
2170       case 'w':
2171       {
2172         int temp;
2173
2174         temp = atoi (optarg);
2175         if (temp > 0)
2176           config_write_interval = temp;
2177         else
2178         {
2179           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2180           status = 2;
2181         }
2182       }
2183       break;
2184
2185       case 'z':
2186       {
2187         int temp;
2188
2189         temp = atoi(optarg);
2190         if (temp > 0)
2191           config_write_jitter = temp;
2192         else
2193         {
2194           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2195           status = 2;
2196         }
2197
2198         break;
2199       }
2200
2201       case 'B':
2202         config_write_base_only = 1;
2203         break;
2204
2205       case 'b':
2206       {
2207         size_t len;
2208
2209         if (config_base_dir != NULL)
2210           free (config_base_dir);
2211         config_base_dir = strdup (optarg);
2212         if (config_base_dir == NULL)
2213         {
2214           fprintf (stderr, "read_options: strdup failed.\n");
2215           return (3);
2216         }
2217
2218         len = strlen (config_base_dir);
2219         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2220         {
2221           config_base_dir[len - 1] = 0;
2222           len--;
2223         }
2224
2225         if (len < 1)
2226         {
2227           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2228           return (4);
2229         }
2230
2231         _config_base_dir_len = len;
2232       }
2233       break;
2234
2235       case 'p':
2236       {
2237         if (config_pid_file != NULL)
2238           free (config_pid_file);
2239         config_pid_file = strdup (optarg);
2240         if (config_pid_file == NULL)
2241         {
2242           fprintf (stderr, "read_options: strdup failed.\n");
2243           return (3);
2244         }
2245       }
2246       break;
2247
2248       case 'F':
2249         config_flush_at_shutdown = 1;
2250         break;
2251
2252       case 'j':
2253       {
2254         struct stat statbuf;
2255         const char *dir = optarg;
2256
2257         status = stat(dir, &statbuf);
2258         if (status != 0)
2259         {
2260           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2261           return 6;
2262         }
2263
2264         if (!S_ISDIR(statbuf.st_mode)
2265             || access(dir, R_OK|W_OK|X_OK) != 0)
2266         {
2267           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2268                   errno ? rrd_strerror(errno) : "");
2269           return 6;
2270         }
2271
2272         journal_cur = malloc(PATH_MAX + 1);
2273         journal_old = malloc(PATH_MAX + 1);
2274         if (journal_cur == NULL || journal_old == NULL)
2275         {
2276           fprintf(stderr, "malloc failure for journal files\n");
2277           return 6;
2278         }
2279         else 
2280         {
2281           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2282           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2283         }
2284       }
2285       break;
2286
2287       case 'h':
2288       case '?':
2289         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2290             "\n"
2291             "Usage: rrdcached [options]\n"
2292             "\n"
2293             "Valid options are:\n"
2294             "  -l <address>  Socket address to listen to.\n"
2295             "  -L <address>  Socket address to listen to ('FLUSH' only).\n"
2296             "  -w <seconds>  Interval in which to write data.\n"
2297             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2298             "  -f <seconds>  Interval in which to flush dead data.\n"
2299             "  -p <file>     Location of the PID-file.\n"
2300             "  -b <dir>      Base directory to change to.\n"
2301             "  -B            Restrict file access to paths within -b <dir>\n"
2302             "  -g            Do not fork and run in the foreground.\n"
2303             "  -j <dir>      Directory in which to create the journal files.\n"
2304             "  -F            Always flush all updates at shutdown\n"
2305             "\n"
2306             "For more information and a detailed description of all options "
2307             "please refer\n"
2308             "to the rrdcached(1) manual page.\n",
2309             VERSION);
2310         status = -1;
2311         break;
2312     } /* switch (option) */
2313   } /* while (getopt) */
2314
2315   /* advise the user when values are not sane */
2316   if (config_flush_interval < 2 * config_write_interval)
2317     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2318             " 2x write interval (-w) !\n");
2319   if (config_write_jitter > config_write_interval)
2320     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2321             " write interval (-w) !\n");
2322
2323   if (config_write_base_only && config_base_dir == NULL)
2324     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2325             "  Consult the rrdcached documentation\n");
2326
2327   if (journal_cur == NULL)
2328     config_flush_at_shutdown = 1;
2329
2330   return (status);
2331 } /* }}} int read_options */
2332
2333 int main (int argc, char **argv)
2334 {
2335   int status;
2336
2337   status = read_options (argc, argv);
2338   if (status != 0)
2339   {
2340     if (status < 0)
2341       status = 0;
2342     return (status);
2343   }
2344
2345   status = daemonize ();
2346   if (status == 1)
2347   {
2348     struct sigaction sigchld;
2349
2350     memset (&sigchld, 0, sizeof (sigchld));
2351     sigchld.sa_handler = SIG_IGN;
2352     sigaction (SIGCHLD, &sigchld, NULL);
2353
2354     return (0);
2355   }
2356   else if (status != 0)
2357   {
2358     fprintf (stderr, "daemonize failed, exiting.\n");
2359     return (1);
2360   }
2361
2362   if (journal_cur != NULL)
2363   {
2364     int had_journal = 0;
2365
2366     pthread_mutex_lock(&journal_lock);
2367
2368     RRDD_LOG(LOG_INFO, "checking for journal files");
2369
2370     had_journal += journal_replay(journal_old);
2371     had_journal += journal_replay(journal_cur);
2372
2373     if (had_journal)
2374       flush_old_values(-1);
2375
2376     pthread_mutex_unlock(&journal_lock);
2377     journal_rotate();
2378
2379     RRDD_LOG(LOG_INFO, "journal processing complete");
2380   }
2381
2382   /* start the queue thread */
2383   memset (&queue_thread, 0, sizeof (queue_thread));
2384   status = pthread_create (&queue_thread,
2385                            NULL, /* attr */
2386                            queue_thread_main,
2387                            NULL); /* args */
2388   if (status != 0)
2389   {
2390     RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2391     cleanup();
2392     return (1);
2393   }
2394
2395   listen_thread_main (NULL);
2396   cleanup ();
2397
2398   return (0);
2399 } /* int main */
2400
2401 /*
2402  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2403  */