rrdcached: Let -s affect the following sockets only. This way, it's possible
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008,2009 Florian octo Forster
4  * Copyright (C) 2008,2009 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
73
74 #include "rrd.h"
75 #include "rrd_client.h"
76
77 #include <stdlib.h>
78
79 #ifndef WIN32
80 #ifdef HAVE_STDINT_H
81 #  include <stdint.h>
82 #endif
83 #include <unistd.h>
84 #include <strings.h>
85 #include <inttypes.h>
86 #include <sys/socket.h>
87
88 #else
89
90 #endif
91 #include <stdio.h>
92 #include <string.h>
93
94 #include <sys/types.h>
95 #include <sys/stat.h>
96 #include <dirent.h>
97 #include <fcntl.h>
98 #include <signal.h>
99 #include <sys/un.h>
100 #include <netdb.h>
101 #include <poll.h>
102 #include <syslog.h>
103 #include <pthread.h>
104 #include <errno.h>
105 #include <assert.h>
106 #include <sys/time.h>
107 #include <time.h>
108 #include <libgen.h>
109 #include <grp.h>
110
111 #include <glib-2.0/glib.h>
112 /* }}} */
113
114 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
115
116 #ifndef __GNUC__
117 # define __attribute__(x) /**/
118 #endif
119
120 /*
121  * Types
122  */
123 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
124
125 struct listen_socket_s
126 {
127   int fd;
128   char addr[PATH_MAX + 1];
129   int family;
130
131   /* state for BATCH processing */
132   time_t batch_start;
133   int batch_cmd;
134
135   /* buffered IO */
136   char *rbuf;
137   off_t next_cmd;
138   off_t next_read;
139
140   char *wbuf;
141   ssize_t wbuf_len;
142
143   uint32_t permissions;
144
145   gid_t socket_group;
146 };
147 typedef struct listen_socket_s listen_socket_t;
148
149 struct command_s;
150 typedef struct command_s command_t;
151 /* note: guard against "unused" warnings in the handlers */
152 #define DISPATCH_PROTO  listen_socket_t *sock   __attribute__((unused)),\
153                         time_t now              __attribute__((unused)),\
154                         char  *buffer           __attribute__((unused)),\
155                         size_t buffer_size      __attribute__((unused))
156
157 #define HANDLER_PROTO   command_t *cmd          __attribute__((unused)),\
158                         DISPATCH_PROTO
159
160 struct command_s {
161   char   *cmd;
162   int (*handler)(HANDLER_PROTO);
163
164   char  context;                /* where we expect to see it */
165 #define CMD_CONTEXT_CLIENT      (1<<0)
166 #define CMD_CONTEXT_BATCH       (1<<1)
167 #define CMD_CONTEXT_JOURNAL     (1<<2)
168 #define CMD_CONTEXT_ANY         (0x7f)
169
170   char *syntax;
171   char *help;
172 };
173
174 struct cache_item_s;
175 typedef struct cache_item_s cache_item_t;
176 struct cache_item_s
177 {
178   char *file;
179   char **values;
180   size_t values_num;
181   time_t last_flush_time;
182   time_t last_update_stamp;
183 #define CI_FLAGS_IN_TREE  (1<<0)
184 #define CI_FLAGS_IN_QUEUE (1<<1)
185   int flags;
186   pthread_cond_t  flushed;
187   cache_item_t *prev;
188   cache_item_t *next;
189 };
190
191 struct callback_flush_data_s
192 {
193   time_t now;
194   time_t abs_timeout;
195   char **keys;
196   size_t keys_num;
197 };
198 typedef struct callback_flush_data_s callback_flush_data_t;
199
200 enum queue_side_e
201 {
202   HEAD,
203   TAIL
204 };
205 typedef enum queue_side_e queue_side_t;
206
207 /* describe a set of journal files */
208 typedef struct {
209   char **files;
210   size_t files_num;
211 } journal_set;
212
213 /* max length of socket command or response */
214 #define CMD_MAX 4096
215 #define RBUF_SIZE (CMD_MAX*2)
216
217 /*
218  * Variables
219  */
220 static int stay_foreground = 0;
221 static uid_t daemon_uid;
222
223 static listen_socket_t *listen_fds = NULL;
224 static size_t listen_fds_num = 0;
225
226 enum {
227   RUNNING,              /* normal operation */
228   FLUSHING,             /* flushing remaining values */
229   SHUTDOWN              /* shutting down */
230 } state = RUNNING;
231
232 static pthread_t *queue_threads;
233 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
234 static int config_queue_threads = 4;
235
236 static pthread_t flush_thread;
237 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
238
239 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
240 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
241 static int connection_threads_num = 0;
242
243 /* Cache stuff */
244 static GTree          *cache_tree = NULL;
245 static cache_item_t   *cache_queue_head = NULL;
246 static cache_item_t   *cache_queue_tail = NULL;
247 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
248
249 static int config_write_interval = 300;
250 static int config_write_jitter   = 0;
251 static int config_flush_interval = 3600;
252 static int config_flush_at_shutdown = 0;
253 static char *config_pid_file = NULL;
254 static char *config_base_dir = NULL;
255 static size_t _config_base_dir_len = 0;
256 static int config_write_base_only = 0;
257
258 static listen_socket_t **config_listen_address_list = NULL;
259 static size_t config_listen_address_list_len = 0;
260
261 static uint64_t stats_queue_length = 0;
262 static uint64_t stats_updates_received = 0;
263 static uint64_t stats_flush_received = 0;
264 static uint64_t stats_updates_written = 0;
265 static uint64_t stats_data_sets_written = 0;
266 static uint64_t stats_journal_bytes = 0;
267 static uint64_t stats_journal_rotate = 0;
268 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
269
270 /* Journaled updates */
271 #define JOURNAL_REPLAY(s) ((s) == NULL)
272 #define JOURNAL_BASE "rrd.journal"
273 static journal_set *journal_cur = NULL;
274 static journal_set *journal_old = NULL;
275 static char *journal_dir = NULL;
276 static FILE *journal_fh = NULL;         /* current journal file handle */
277 static long  journal_size = 0;          /* current journal size */
278 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
279 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
280 static int journal_write(char *cmd, char *args);
281 static void journal_done(void);
282 static void journal_rotate(void);
283
284 /* prototypes for forward refernces */
285 static int handle_request_help (HANDLER_PROTO);
286
287 /* 
288  * Functions
289  */
290 static void sig_common (const char *sig) /* {{{ */
291 {
292   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
293   state = FLUSHING;
294   pthread_cond_broadcast(&flush_cond);
295   pthread_cond_broadcast(&queue_cond);
296 } /* }}} void sig_common */
297
298 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
299 {
300   sig_common("INT");
301 } /* }}} void sig_int_handler */
302
303 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
304 {
305   sig_common("TERM");
306 } /* }}} void sig_term_handler */
307
308 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
309 {
310   config_flush_at_shutdown = 1;
311   sig_common("USR1");
312 } /* }}} void sig_usr1_handler */
313
314 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
315 {
316   config_flush_at_shutdown = 0;
317   sig_common("USR2");
318 } /* }}} void sig_usr2_handler */
319
320 static void install_signal_handlers(void) /* {{{ */
321 {
322   /* These structures are static, because `sigaction' behaves weird if the are
323    * overwritten.. */
324   static struct sigaction sa_int;
325   static struct sigaction sa_term;
326   static struct sigaction sa_pipe;
327   static struct sigaction sa_usr1;
328   static struct sigaction sa_usr2;
329
330   /* Install signal handlers */
331   memset (&sa_int, 0, sizeof (sa_int));
332   sa_int.sa_handler = sig_int_handler;
333   sigaction (SIGINT, &sa_int, NULL);
334
335   memset (&sa_term, 0, sizeof (sa_term));
336   sa_term.sa_handler = sig_term_handler;
337   sigaction (SIGTERM, &sa_term, NULL);
338
339   memset (&sa_pipe, 0, sizeof (sa_pipe));
340   sa_pipe.sa_handler = SIG_IGN;
341   sigaction (SIGPIPE, &sa_pipe, NULL);
342
343   memset (&sa_pipe, 0, sizeof (sa_usr1));
344   sa_usr1.sa_handler = sig_usr1_handler;
345   sigaction (SIGUSR1, &sa_usr1, NULL);
346
347   memset (&sa_usr2, 0, sizeof (sa_usr2));
348   sa_usr2.sa_handler = sig_usr2_handler;
349   sigaction (SIGUSR2, &sa_usr2, NULL);
350
351 } /* }}} void install_signal_handlers */
352
353 static int open_pidfile(char *action, int oflag) /* {{{ */
354 {
355   int fd;
356   const char *file;
357   char *file_copy, *dir;
358
359   file = (config_pid_file != NULL)
360     ? config_pid_file
361     : LOCALSTATEDIR "/run/rrdcached.pid";
362
363   /* dirname may modify its argument */
364   file_copy = strdup(file);
365   if (file_copy == NULL)
366   {
367     fprintf(stderr, "rrdcached: strdup(): %s\n",
368         rrd_strerror(errno));
369     return -1;
370   }
371
372   dir = dirname(file_copy);
373   if (rrd_mkdir_p(dir, 0777) != 0)
374   {
375     fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
376         dir, rrd_strerror(errno));
377     return -1;
378   }
379
380   free(file_copy);
381
382   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
383   if (fd < 0)
384     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
385             action, file, rrd_strerror(errno));
386
387   return(fd);
388 } /* }}} static int open_pidfile */
389
390 /* check existing pid file to see whether a daemon is running */
391 static int check_pidfile(void)
392 {
393   int pid_fd;
394   pid_t pid;
395   char pid_str[16];
396
397   pid_fd = open_pidfile("open", O_RDWR);
398   if (pid_fd < 0)
399     return pid_fd;
400
401   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
402     return -1;
403
404   pid = atoi(pid_str);
405   if (pid <= 0)
406     return -1;
407
408   /* another running process that we can signal COULD be
409    * a competing rrdcached */
410   if (pid != getpid() && kill(pid, 0) == 0)
411   {
412     fprintf(stderr,
413             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
414     close(pid_fd);
415     return -1;
416   }
417
418   lseek(pid_fd, 0, SEEK_SET);
419   if (ftruncate(pid_fd, 0) == -1)
420   {
421     fprintf(stderr,
422             "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
423     close(pid_fd);
424     return -1;
425   }
426
427   fprintf(stderr,
428           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
429           "rrdcached: starting normally.\n", pid);
430
431   return pid_fd;
432 } /* }}} static int check_pidfile */
433
434 static int write_pidfile (int fd) /* {{{ */
435 {
436   pid_t pid;
437   FILE *fh;
438
439   pid = getpid ();
440
441   fh = fdopen (fd, "w");
442   if (fh == NULL)
443   {
444     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
445     close(fd);
446     return (-1);
447   }
448
449   fprintf (fh, "%i\n", (int) pid);
450   fclose (fh);
451
452   return (0);
453 } /* }}} int write_pidfile */
454
455 static int remove_pidfile (void) /* {{{ */
456 {
457   char *file;
458   int status;
459
460   file = (config_pid_file != NULL)
461     ? config_pid_file
462     : LOCALSTATEDIR "/run/rrdcached.pid";
463
464   status = unlink (file);
465   if (status == 0)
466     return (0);
467   return (errno);
468 } /* }}} int remove_pidfile */
469
470 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
471 {
472   char *eol;
473
474   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
475                sock->next_read - sock->next_cmd);
476
477   if (eol == NULL)
478   {
479     /* no commands left, move remainder back to front of rbuf */
480     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
481             sock->next_read - sock->next_cmd);
482     sock->next_read -= sock->next_cmd;
483     sock->next_cmd = 0;
484     *len = 0;
485     return NULL;
486   }
487   else
488   {
489     char *cmd = sock->rbuf + sock->next_cmd;
490     *eol = '\0';
491
492     sock->next_cmd = eol - sock->rbuf + 1;
493
494     if (eol > sock->rbuf && *(eol-1) == '\r')
495       *(--eol) = '\0'; /* handle "\r\n" EOL */
496
497     *len = eol - cmd;
498
499     return cmd;
500   }
501
502   /* NOTREACHED */
503   assert(1==0);
504 } /* }}} char *next_cmd */
505
506 /* add the characters directly to the write buffer */
507 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
508 {
509   char *new_buf;
510
511   assert(sock != NULL);
512
513   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
514   if (new_buf == NULL)
515   {
516     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
517     return -1;
518   }
519
520   strncpy(new_buf + sock->wbuf_len, str, len + 1);
521
522   sock->wbuf = new_buf;
523   sock->wbuf_len += len;
524
525   return 0;
526 } /* }}} static int add_to_wbuf */
527
528 /* add the text to the "extra" info that's sent after the status line */
529 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
530 {
531   va_list argp;
532   char buffer[CMD_MAX];
533   int len;
534
535   if (JOURNAL_REPLAY(sock)) return 0;
536   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
537
538   va_start(argp, fmt);
539 #ifdef HAVE_VSNPRINTF
540   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
541 #else
542   len = vsprintf(buffer, fmt, argp);
543 #endif
544   va_end(argp);
545   if (len < 0)
546   {
547     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
548     return -1;
549   }
550
551   return add_to_wbuf(sock, buffer, len);
552 } /* }}} static int add_response_info */
553
554 static int count_lines(char *str) /* {{{ */
555 {
556   int lines = 0;
557
558   if (str != NULL)
559   {
560     while ((str = strchr(str, '\n')) != NULL)
561     {
562       ++lines;
563       ++str;
564     }
565   }
566
567   return lines;
568 } /* }}} static int count_lines */
569
570 /* send the response back to the user.
571  * returns 0 on success, -1 on error
572  * write buffer is always zeroed after this call */
573 static int send_response (listen_socket_t *sock, response_code rc,
574                           char *fmt, ...) /* {{{ */
575 {
576   va_list argp;
577   char buffer[CMD_MAX];
578   int lines;
579   ssize_t wrote;
580   int rclen, len;
581
582   if (JOURNAL_REPLAY(sock)) return rc;
583
584   if (sock->batch_start)
585   {
586     if (rc == RESP_OK)
587       return rc; /* no response on success during BATCH */
588     lines = sock->batch_cmd;
589   }
590   else if (rc == RESP_OK)
591     lines = count_lines(sock->wbuf);
592   else
593     lines = -1;
594
595   rclen = sprintf(buffer, "%d ", lines);
596   va_start(argp, fmt);
597 #ifdef HAVE_VSNPRINTF
598   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
599 #else
600   len = vsprintf(buffer+rclen, fmt, argp);
601 #endif
602   va_end(argp);
603   if (len < 0)
604     return -1;
605
606   len += rclen;
607
608   /* append the result to the wbuf, don't write to the user */
609   if (sock->batch_start)
610     return add_to_wbuf(sock, buffer, len);
611
612   /* first write must be complete */
613   if (len != write(sock->fd, buffer, len))
614   {
615     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
616     return -1;
617   }
618
619   if (sock->wbuf != NULL && rc == RESP_OK)
620   {
621     wrote = 0;
622     while (wrote < sock->wbuf_len)
623     {
624       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
625       if (wb <= 0)
626       {
627         RRDD_LOG(LOG_INFO, "send_response: could not write results");
628         return -1;
629       }
630       wrote += wb;
631     }
632   }
633
634   free(sock->wbuf); sock->wbuf = NULL;
635   sock->wbuf_len = 0;
636
637   return 0;
638 } /* }}} */
639
640 static void wipe_ci_values(cache_item_t *ci, time_t when)
641 {
642   ci->values = NULL;
643   ci->values_num = 0;
644
645   ci->last_flush_time = when;
646   if (config_write_jitter > 0)
647     ci->last_flush_time += (rrd_random() % config_write_jitter);
648 }
649
650 /* remove_from_queue
651  * remove a "cache_item_t" item from the queue.
652  * must hold 'cache_lock' when calling this
653  */
654 static void remove_from_queue(cache_item_t *ci) /* {{{ */
655 {
656   if (ci == NULL) return;
657   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
658
659   if (ci->prev == NULL)
660     cache_queue_head = ci->next; /* reset head */
661   else
662     ci->prev->next = ci->next;
663
664   if (ci->next == NULL)
665     cache_queue_tail = ci->prev; /* reset the tail */
666   else
667     ci->next->prev = ci->prev;
668
669   ci->next = ci->prev = NULL;
670   ci->flags &= ~CI_FLAGS_IN_QUEUE;
671
672   pthread_mutex_lock (&stats_lock);
673   assert (stats_queue_length > 0);
674   stats_queue_length--;
675   pthread_mutex_unlock (&stats_lock);
676
677 } /* }}} static void remove_from_queue */
678
679 /* free the resources associated with the cache_item_t
680  * must hold cache_lock when calling this function
681  */
682 static void *free_cache_item(cache_item_t *ci) /* {{{ */
683 {
684   if (ci == NULL) return NULL;
685
686   remove_from_queue(ci);
687
688   for (size_t i=0; i < ci->values_num; i++)
689     free(ci->values[i]);
690
691   free (ci->values);
692   free (ci->file);
693
694   /* in case anyone is waiting */
695   pthread_cond_broadcast(&ci->flushed);
696   pthread_cond_destroy(&ci->flushed);
697
698   free (ci);
699
700   return NULL;
701 } /* }}} static void *free_cache_item */
702
703 /*
704  * enqueue_cache_item:
705  * `cache_lock' must be acquired before calling this function!
706  */
707 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
708     queue_side_t side)
709 {
710   if (ci == NULL)
711     return (-1);
712
713   if (ci->values_num == 0)
714     return (0);
715
716   if (side == HEAD)
717   {
718     if (cache_queue_head == ci)
719       return 0;
720
721     /* remove if further down in queue */
722     remove_from_queue(ci);
723
724     ci->prev = NULL;
725     ci->next = cache_queue_head;
726     if (ci->next != NULL)
727       ci->next->prev = ci;
728     cache_queue_head = ci;
729
730     if (cache_queue_tail == NULL)
731       cache_queue_tail = cache_queue_head;
732   }
733   else /* (side == TAIL) */
734   {
735     /* We don't move values back in the list.. */
736     if (ci->flags & CI_FLAGS_IN_QUEUE)
737       return (0);
738
739     assert (ci->next == NULL);
740     assert (ci->prev == NULL);
741
742     ci->prev = cache_queue_tail;
743
744     if (cache_queue_tail == NULL)
745       cache_queue_head = ci;
746     else
747       cache_queue_tail->next = ci;
748
749     cache_queue_tail = ci;
750   }
751
752   ci->flags |= CI_FLAGS_IN_QUEUE;
753
754   pthread_cond_signal(&queue_cond);
755   pthread_mutex_lock (&stats_lock);
756   stats_queue_length++;
757   pthread_mutex_unlock (&stats_lock);
758
759   return (0);
760 } /* }}} int enqueue_cache_item */
761
762 /*
763  * tree_callback_flush:
764  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
765  * while this is in progress.
766  */
767 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
768     gpointer data)
769 {
770   cache_item_t *ci;
771   callback_flush_data_t *cfd;
772
773   ci = (cache_item_t *) value;
774   cfd = (callback_flush_data_t *) data;
775
776   if (ci->flags & CI_FLAGS_IN_QUEUE)
777     return FALSE;
778
779   if (ci->values_num > 0
780       && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
781   {
782     enqueue_cache_item (ci, TAIL);
783   }
784   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
785       && (ci->values_num <= 0))
786   {
787     assert ((char *) key == ci->file);
788     if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
789     {
790       RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
791       return (FALSE);
792     }
793   }
794
795   return (FALSE);
796 } /* }}} gboolean tree_callback_flush */
797
798 static int flush_old_values (int max_age)
799 {
800   callback_flush_data_t cfd;
801   size_t k;
802
803   memset (&cfd, 0, sizeof (cfd));
804   /* Pass the current time as user data so that we don't need to call
805    * `time' for each node. */
806   cfd.now = time (NULL);
807   cfd.keys = NULL;
808   cfd.keys_num = 0;
809
810   if (max_age > 0)
811     cfd.abs_timeout = cfd.now - max_age;
812   else
813     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
814
815   /* `tree_callback_flush' will return the keys of all values that haven't
816    * been touched in the last `config_flush_interval' seconds in `cfd'.
817    * The char*'s in this array point to the same memory as ci->file, so we
818    * don't need to free them separately. */
819   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
820
821   for (k = 0; k < cfd.keys_num; k++)
822   {
823     /* should never fail, since we have held the cache_lock
824      * the entire time */
825     assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
826   }
827
828   if (cfd.keys != NULL)
829   {
830     free (cfd.keys);
831     cfd.keys = NULL;
832   }
833
834   return (0);
835 } /* int flush_old_values */
836
837 static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
838 {
839   struct timeval now;
840   struct timespec next_flush;
841   int status;
842
843   gettimeofday (&now, NULL);
844   next_flush.tv_sec = now.tv_sec + config_flush_interval;
845   next_flush.tv_nsec = 1000 * now.tv_usec;
846
847   pthread_mutex_lock(&cache_lock);
848
849   while (state == RUNNING)
850   {
851     gettimeofday (&now, NULL);
852     if ((now.tv_sec > next_flush.tv_sec)
853         || ((now.tv_sec == next_flush.tv_sec)
854           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
855     {
856       RRDD_LOG(LOG_DEBUG, "flushing old values");
857
858       /* Determine the time of the next cache flush. */
859       next_flush.tv_sec = now.tv_sec + config_flush_interval;
860
861       /* Flush all values that haven't been written in the last
862        * `config_write_interval' seconds. */
863       flush_old_values (config_write_interval);
864
865       /* unlock the cache while we rotate so we don't block incoming
866        * updates if the fsync() blocks on disk I/O */
867       pthread_mutex_unlock(&cache_lock);
868       journal_rotate();
869       pthread_mutex_lock(&cache_lock);
870     }
871
872     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
873     if (status != 0 && status != ETIMEDOUT)
874     {
875       RRDD_LOG (LOG_ERR, "flush_thread_main: "
876                 "pthread_cond_timedwait returned %i.", status);
877     }
878   }
879
880   if (config_flush_at_shutdown)
881     flush_old_values (-1); /* flush everything */
882
883   state = SHUTDOWN;
884
885   pthread_mutex_unlock(&cache_lock);
886
887   return NULL;
888 } /* void *flush_thread_main */
889
890 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
891 {
892   pthread_mutex_lock (&cache_lock);
893
894   while (state != SHUTDOWN
895          || (cache_queue_head != NULL && config_flush_at_shutdown))
896   {
897     cache_item_t *ci;
898     char *file;
899     char **values;
900     size_t values_num;
901     int status;
902
903     /* Now, check if there's something to store away. If not, wait until
904      * something comes in. */
905     if (cache_queue_head == NULL)
906     {
907       status = pthread_cond_wait (&queue_cond, &cache_lock);
908       if ((status != 0) && (status != ETIMEDOUT))
909       {
910         RRDD_LOG (LOG_ERR, "queue_thread_main: "
911             "pthread_cond_wait returned %i.", status);
912       }
913     }
914
915     /* Check if a value has arrived. This may be NULL if we timed out or there
916      * was an interrupt such as a signal. */
917     if (cache_queue_head == NULL)
918       continue;
919
920     ci = cache_queue_head;
921
922     /* copy the relevant parts */
923     file = strdup (ci->file);
924     if (file == NULL)
925     {
926       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
927       continue;
928     }
929
930     assert(ci->values != NULL);
931     assert(ci->values_num > 0);
932
933     values = ci->values;
934     values_num = ci->values_num;
935
936     wipe_ci_values(ci, time(NULL));
937     remove_from_queue(ci);
938
939     pthread_mutex_unlock (&cache_lock);
940
941     rrd_clear_error ();
942     status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
943     if (status != 0)
944     {
945       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
946           "rrd_update_r (%s) failed with status %i. (%s)",
947           file, status, rrd_get_error());
948     }
949
950     journal_write("wrote", file);
951
952     /* Search again in the tree.  It's possible someone issued a "FORGET"
953      * while we were writing the update values. */
954     pthread_mutex_lock(&cache_lock);
955     ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
956     if (ci)
957       pthread_cond_broadcast(&ci->flushed);
958     pthread_mutex_unlock(&cache_lock);
959
960     if (status == 0)
961     {
962       pthread_mutex_lock (&stats_lock);
963       stats_updates_written++;
964       stats_data_sets_written += values_num;
965       pthread_mutex_unlock (&stats_lock);
966     }
967
968     rrd_free_ptrs((void ***) &values, &values_num);
969     free(file);
970
971     pthread_mutex_lock (&cache_lock);
972   }
973   pthread_mutex_unlock (&cache_lock);
974
975   return (NULL);
976 } /* }}} void *queue_thread_main */
977
978 static int buffer_get_field (char **buffer_ret, /* {{{ */
979     size_t *buffer_size_ret, char **field_ret)
980 {
981   char *buffer;
982   size_t buffer_pos;
983   size_t buffer_size;
984   char *field;
985   size_t field_size;
986   int status;
987
988   buffer = *buffer_ret;
989   buffer_pos = 0;
990   buffer_size = *buffer_size_ret;
991   field = *buffer_ret;
992   field_size = 0;
993
994   if (buffer_size <= 0)
995     return (-1);
996
997   /* This is ensured by `handle_request'. */
998   assert (buffer[buffer_size - 1] == '\0');
999
1000   status = -1;
1001   while (buffer_pos < buffer_size)
1002   {
1003     /* Check for end-of-field or end-of-buffer */
1004     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1005     {
1006       field[field_size] = 0;
1007       field_size++;
1008       buffer_pos++;
1009       status = 0;
1010       break;
1011     }
1012     /* Handle escaped characters. */
1013     else if (buffer[buffer_pos] == '\\')
1014     {
1015       if (buffer_pos >= (buffer_size - 1))
1016         break;
1017       buffer_pos++;
1018       field[field_size] = buffer[buffer_pos];
1019       field_size++;
1020       buffer_pos++;
1021     }
1022     /* Normal operation */ 
1023     else
1024     {
1025       field[field_size] = buffer[buffer_pos];
1026       field_size++;
1027       buffer_pos++;
1028     }
1029   } /* while (buffer_pos < buffer_size) */
1030
1031   if (status != 0)
1032     return (status);
1033
1034   *buffer_ret = buffer + buffer_pos;
1035   *buffer_size_ret = buffer_size - buffer_pos;
1036   *field_ret = field;
1037
1038   return (0);
1039 } /* }}} int buffer_get_field */
1040
1041 /* if we're restricting writes to the base directory,
1042  * check whether the file falls within the dir
1043  * returns 1 if OK, otherwise 0
1044  */
1045 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1046 {
1047   assert(file != NULL);
1048
1049   if (!config_write_base_only
1050       || JOURNAL_REPLAY(sock)
1051       || config_base_dir == NULL)
1052     return 1;
1053
1054   if (strstr(file, "../") != NULL) goto err;
1055
1056   /* relative paths without "../" are ok */
1057   if (*file != '/') return 1;
1058
1059   /* file must be of the format base + "/" + <1+ char filename> */
1060   if (strlen(file) < _config_base_dir_len + 2) goto err;
1061   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1062   if (*(file + _config_base_dir_len) != '/') goto err;
1063
1064   return 1;
1065
1066 err:
1067   if (sock != NULL && sock->fd >= 0)
1068     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1069
1070   return 0;
1071 } /* }}} static int check_file_access */
1072
1073 /* when using a base dir, convert relative paths to absolute paths.
1074  * if necessary, modifies the "filename" pointer to point
1075  * to the new path created in "tmp".  "tmp" is provided
1076  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1077  *
1078  * this allows us to optimize for the expected case (absolute path)
1079  * with a no-op.
1080  */
1081 static void get_abs_path(char **filename, char *tmp)
1082 {
1083   assert(tmp != NULL);
1084   assert(filename != NULL && *filename != NULL);
1085
1086   if (config_base_dir == NULL || **filename == '/')
1087     return;
1088
1089   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1090   *filename = tmp;
1091 } /* }}} static int get_abs_path */
1092
1093 static int flush_file (const char *filename) /* {{{ */
1094 {
1095   cache_item_t *ci;
1096
1097   pthread_mutex_lock (&cache_lock);
1098
1099   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1100   if (ci == NULL)
1101   {
1102     pthread_mutex_unlock (&cache_lock);
1103     return (ENOENT);
1104   }
1105
1106   if (ci->values_num > 0)
1107   {
1108     /* Enqueue at head */
1109     enqueue_cache_item (ci, HEAD);
1110     pthread_cond_wait(&ci->flushed, &cache_lock);
1111   }
1112
1113   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1114    * may have been purged during our cond_wait() */
1115
1116   pthread_mutex_unlock(&cache_lock);
1117
1118   return (0);
1119 } /* }}} int flush_file */
1120
1121 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1122 {
1123   char *err = "Syntax error.\n";
1124
1125   if (cmd && cmd->syntax)
1126     err = cmd->syntax;
1127
1128   return send_response(sock, RESP_ERR, "Usage: %s", err);
1129 } /* }}} static int syntax_error() */
1130
1131 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1132 {
1133   uint64_t copy_queue_length;
1134   uint64_t copy_updates_received;
1135   uint64_t copy_flush_received;
1136   uint64_t copy_updates_written;
1137   uint64_t copy_data_sets_written;
1138   uint64_t copy_journal_bytes;
1139   uint64_t copy_journal_rotate;
1140
1141   uint64_t tree_nodes_number;
1142   uint64_t tree_depth;
1143
1144   pthread_mutex_lock (&stats_lock);
1145   copy_queue_length       = stats_queue_length;
1146   copy_updates_received   = stats_updates_received;
1147   copy_flush_received     = stats_flush_received;
1148   copy_updates_written    = stats_updates_written;
1149   copy_data_sets_written  = stats_data_sets_written;
1150   copy_journal_bytes      = stats_journal_bytes;
1151   copy_journal_rotate     = stats_journal_rotate;
1152   pthread_mutex_unlock (&stats_lock);
1153
1154   pthread_mutex_lock (&cache_lock);
1155   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1156   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1157   pthread_mutex_unlock (&cache_lock);
1158
1159   add_response_info(sock,
1160                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1161   add_response_info(sock,
1162                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1163   add_response_info(sock,
1164                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1165   add_response_info(sock,
1166                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1167   add_response_info(sock,
1168                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1169   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1170   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1171   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1172   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1173
1174   send_response(sock, RESP_OK, "Statistics follow\n");
1175
1176   return (0);
1177 } /* }}} int handle_request_stats */
1178
1179 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1180 {
1181   char *file, file_tmp[PATH_MAX];
1182   int status;
1183
1184   status = buffer_get_field (&buffer, &buffer_size, &file);
1185   if (status != 0)
1186   {
1187     return syntax_error(sock,cmd);
1188   }
1189   else
1190   {
1191     pthread_mutex_lock(&stats_lock);
1192     stats_flush_received++;
1193     pthread_mutex_unlock(&stats_lock);
1194
1195     get_abs_path(&file, file_tmp);
1196     if (!check_file_access(file, sock)) return 0;
1197
1198     status = flush_file (file);
1199     if (status == 0)
1200       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1201     else if (status == ENOENT)
1202     {
1203       /* no file in our tree; see whether it exists at all */
1204       struct stat statbuf;
1205
1206       memset(&statbuf, 0, sizeof(statbuf));
1207       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1208         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1209       else
1210         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1211     }
1212     else if (status < 0)
1213       return send_response(sock, RESP_ERR, "Internal error.\n");
1214     else
1215       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1216   }
1217
1218   /* NOTREACHED */
1219   assert(1==0);
1220 } /* }}} int handle_request_flush */
1221
1222 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1223 {
1224   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1225
1226   pthread_mutex_lock(&cache_lock);
1227   flush_old_values(-1);
1228   pthread_mutex_unlock(&cache_lock);
1229
1230   return send_response(sock, RESP_OK, "Started flush.\n");
1231 } /* }}} static int handle_request_flushall */
1232
1233 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1234 {
1235   int status;
1236   char *file, file_tmp[PATH_MAX];
1237   cache_item_t *ci;
1238
1239   status = buffer_get_field(&buffer, &buffer_size, &file);
1240   if (status != 0)
1241     return syntax_error(sock,cmd);
1242
1243   get_abs_path(&file, file_tmp);
1244
1245   pthread_mutex_lock(&cache_lock);
1246   ci = g_tree_lookup(cache_tree, file);
1247   if (ci == NULL)
1248   {
1249     pthread_mutex_unlock(&cache_lock);
1250     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1251   }
1252
1253   for (size_t i=0; i < ci->values_num; i++)
1254     add_response_info(sock, "%s\n", ci->values[i]);
1255
1256   pthread_mutex_unlock(&cache_lock);
1257   return send_response(sock, RESP_OK, "updates pending\n");
1258 } /* }}} static int handle_request_pending */
1259
1260 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1261 {
1262   int status;
1263   gboolean found;
1264   char *file, file_tmp[PATH_MAX];
1265
1266   status = buffer_get_field(&buffer, &buffer_size, &file);
1267   if (status != 0)
1268     return syntax_error(sock,cmd);
1269
1270   get_abs_path(&file, file_tmp);
1271   if (!check_file_access(file, sock)) return 0;
1272
1273   pthread_mutex_lock(&cache_lock);
1274   found = g_tree_remove(cache_tree, file);
1275   pthread_mutex_unlock(&cache_lock);
1276
1277   if (found == TRUE)
1278   {
1279     if (!JOURNAL_REPLAY(sock))
1280       journal_write("forget", file);
1281
1282     return send_response(sock, RESP_OK, "Gone!\n");
1283   }
1284   else
1285     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1286
1287   /* NOTREACHED */
1288   assert(1==0);
1289 } /* }}} static int handle_request_forget */
1290
1291 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1292 {
1293   cache_item_t *ci;
1294
1295   pthread_mutex_lock(&cache_lock);
1296
1297   ci = cache_queue_head;
1298   while (ci != NULL)
1299   {
1300     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1301     ci = ci->next;
1302   }
1303
1304   pthread_mutex_unlock(&cache_lock);
1305
1306   return send_response(sock, RESP_OK, "in queue.\n");
1307 } /* }}} int handle_request_queue */
1308
1309 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1310 {
1311   char *file, file_tmp[PATH_MAX];
1312   int values_num = 0;
1313   int status;
1314   char orig_buf[CMD_MAX];
1315
1316   cache_item_t *ci;
1317
1318   /* save it for the journal later */
1319   if (!JOURNAL_REPLAY(sock))
1320     strncpy(orig_buf, buffer, buffer_size);
1321
1322   status = buffer_get_field (&buffer, &buffer_size, &file);
1323   if (status != 0)
1324     return syntax_error(sock,cmd);
1325
1326   pthread_mutex_lock(&stats_lock);
1327   stats_updates_received++;
1328   pthread_mutex_unlock(&stats_lock);
1329
1330   get_abs_path(&file, file_tmp);
1331   if (!check_file_access(file, sock)) return 0;
1332
1333   pthread_mutex_lock (&cache_lock);
1334   ci = g_tree_lookup (cache_tree, file);
1335
1336   if (ci == NULL) /* {{{ */
1337   {
1338     struct stat statbuf;
1339     cache_item_t *tmp;
1340
1341     /* don't hold the lock while we setup; stat(2) might block */
1342     pthread_mutex_unlock(&cache_lock);
1343
1344     memset (&statbuf, 0, sizeof (statbuf));
1345     status = stat (file, &statbuf);
1346     if (status != 0)
1347     {
1348       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1349
1350       status = errno;
1351       if (status == ENOENT)
1352         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1353       else
1354         return send_response(sock, RESP_ERR,
1355                              "stat failed with error %i.\n", status);
1356     }
1357     if (!S_ISREG (statbuf.st_mode))
1358       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1359
1360     if (access(file, R_OK|W_OK) != 0)
1361       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1362                            file, rrd_strerror(errno));
1363
1364     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1365     if (ci == NULL)
1366     {
1367       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1368
1369       return send_response(sock, RESP_ERR, "malloc failed.\n");
1370     }
1371     memset (ci, 0, sizeof (cache_item_t));
1372
1373     ci->file = strdup (file);
1374     if (ci->file == NULL)
1375     {
1376       free (ci);
1377       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1378
1379       return send_response(sock, RESP_ERR, "strdup failed.\n");
1380     }
1381
1382     wipe_ci_values(ci, now);
1383     ci->flags = CI_FLAGS_IN_TREE;
1384     pthread_cond_init(&ci->flushed, NULL);
1385
1386     pthread_mutex_lock(&cache_lock);
1387
1388     /* another UPDATE might have added this entry in the meantime */
1389     tmp = g_tree_lookup (cache_tree, file);
1390     if (tmp == NULL)
1391       g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1392     else
1393     {
1394       free_cache_item (ci);
1395       ci = tmp;
1396     }
1397
1398     /* state may have changed while we were unlocked */
1399     if (state == SHUTDOWN)
1400       return -1;
1401   } /* }}} */
1402   assert (ci != NULL);
1403
1404   /* don't re-write updates in replay mode */
1405   if (!JOURNAL_REPLAY(sock))
1406     journal_write("update", orig_buf);
1407
1408   while (buffer_size > 0)
1409   {
1410     char *value;
1411     time_t stamp;
1412     char *eostamp;
1413
1414     status = buffer_get_field (&buffer, &buffer_size, &value);
1415     if (status != 0)
1416     {
1417       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1418       break;
1419     }
1420
1421     /* make sure update time is always moving forward */
1422     stamp = strtol(value, &eostamp, 10);
1423     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1424     {
1425       pthread_mutex_unlock(&cache_lock);
1426       return send_response(sock, RESP_ERR,
1427                            "Cannot find timestamp in '%s'!\n", value);
1428     }
1429     else if (stamp <= ci->last_update_stamp)
1430     {
1431       pthread_mutex_unlock(&cache_lock);
1432       return send_response(sock, RESP_ERR,
1433                            "illegal attempt to update using time %ld when last"
1434                            " update time is %ld (minimum one second step)\n",
1435                            stamp, ci->last_update_stamp);
1436     }
1437     else
1438       ci->last_update_stamp = stamp;
1439
1440     if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
1441     {
1442       RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1443       continue;
1444     }
1445
1446     values_num++;
1447   }
1448
1449   if (((now - ci->last_flush_time) >= config_write_interval)
1450       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1451       && (ci->values_num > 0))
1452   {
1453     enqueue_cache_item (ci, TAIL);
1454   }
1455
1456   pthread_mutex_unlock (&cache_lock);
1457
1458   if (values_num < 1)
1459     return send_response(sock, RESP_ERR, "No values updated.\n");
1460   else
1461     return send_response(sock, RESP_OK,
1462                          "errors, enqueued %i value(s).\n", values_num);
1463
1464   /* NOTREACHED */
1465   assert(1==0);
1466
1467 } /* }}} int handle_request_update */
1468
1469 /* we came across a "WROTE" entry during journal replay.
1470  * throw away any values that we have accumulated for this file
1471  */
1472 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1473 {
1474   cache_item_t *ci;
1475   const char *file = buffer;
1476
1477   pthread_mutex_lock(&cache_lock);
1478
1479   ci = g_tree_lookup(cache_tree, file);
1480   if (ci == NULL)
1481   {
1482     pthread_mutex_unlock(&cache_lock);
1483     return (0);
1484   }
1485
1486   if (ci->values)
1487     rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1488
1489   wipe_ci_values(ci, now);
1490   remove_from_queue(ci);
1491
1492   pthread_mutex_unlock(&cache_lock);
1493   return (0);
1494 } /* }}} int handle_request_wrote */
1495
1496 /* start "BATCH" processing */
1497 static int batch_start (HANDLER_PROTO) /* {{{ */
1498 {
1499   int status;
1500   if (sock->batch_start)
1501     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1502
1503   status = send_response(sock, RESP_OK,
1504                          "Go ahead.  End with dot '.' on its own line.\n");
1505   sock->batch_start = time(NULL);
1506   sock->batch_cmd = 0;
1507
1508   return status;
1509 } /* }}} static int batch_start */
1510
1511 /* finish "BATCH" processing and return results to the client */
1512 static int batch_done (HANDLER_PROTO) /* {{{ */
1513 {
1514   assert(sock->batch_start);
1515   sock->batch_start = 0;
1516   sock->batch_cmd  = 0;
1517   return send_response(sock, RESP_OK, "errors\n");
1518 } /* }}} static int batch_done */
1519
1520 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1521 {
1522   return -1;
1523 } /* }}} static int handle_request_quit */
1524
1525 static command_t list_of_commands[] = { /* {{{ */
1526   {
1527     "UPDATE",
1528     handle_request_update,
1529     CMD_CONTEXT_ANY,
1530     "UPDATE <filename> <values> [<values> ...]\n"
1531     ,
1532     "Adds the given file to the internal cache if it is not yet known and\n"
1533     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1534     "for details.\n"
1535     "\n"
1536     "Each <values> has the following form:\n"
1537     "  <values> = <time>:<value>[:<value>[...]]\n"
1538     "See the rrdupdate(1) manpage for details.\n"
1539   },
1540   {
1541     "WROTE",
1542     handle_request_wrote,
1543     CMD_CONTEXT_JOURNAL,
1544     NULL,
1545     NULL
1546   },
1547   {
1548     "FLUSH",
1549     handle_request_flush,
1550     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1551     "FLUSH <filename>\n"
1552     ,
1553     "Adds the given filename to the head of the update queue and returns\n"
1554     "after it has been dequeued.\n"
1555   },
1556   {
1557     "FLUSHALL",
1558     handle_request_flushall,
1559     CMD_CONTEXT_CLIENT,
1560     "FLUSHALL\n"
1561     ,
1562     "Triggers writing of all pending updates.  Returns immediately.\n"
1563   },
1564   {
1565     "PENDING",
1566     handle_request_pending,
1567     CMD_CONTEXT_CLIENT,
1568     "PENDING <filename>\n"
1569     ,
1570     "Shows any 'pending' updates for a file, in order.\n"
1571     "The updates shown have not yet been written to the underlying RRD file.\n"
1572   },
1573   {
1574     "FORGET",
1575     handle_request_forget,
1576     CMD_CONTEXT_ANY,
1577     "FORGET <filename>\n"
1578     ,
1579     "Removes the file completely from the cache.\n"
1580     "Any pending updates for the file will be lost.\n"
1581   },
1582   {
1583     "QUEUE",
1584     handle_request_queue,
1585     CMD_CONTEXT_CLIENT,
1586     "QUEUE\n"
1587     ,
1588         "Shows all files in the output queue.\n"
1589     "The output is zero or more lines in the following format:\n"
1590     "(where <num_vals> is the number of values to be written)\n"
1591     "\n"
1592     "<num_vals> <filename>\n"
1593   },
1594   {
1595     "STATS",
1596     handle_request_stats,
1597     CMD_CONTEXT_CLIENT,
1598     "STATS\n"
1599     ,
1600     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1601     "a description of the values.\n"
1602   },
1603   {
1604     "HELP",
1605     handle_request_help,
1606     CMD_CONTEXT_CLIENT,
1607     "HELP [<command>]\n",
1608     NULL, /* special! */
1609   },
1610   {
1611     "BATCH",
1612     batch_start,
1613     CMD_CONTEXT_CLIENT,
1614     "BATCH\n"
1615     ,
1616     "The 'BATCH' command permits the client to initiate a bulk load\n"
1617     "   of commands to rrdcached.\n"
1618     "\n"
1619     "Usage:\n"
1620     "\n"
1621     "    client: BATCH\n"
1622     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1623     "    client: command #1\n"
1624     "    client: command #2\n"
1625     "    client: ... and so on\n"
1626     "    client: .\n"
1627     "    server: 2 errors\n"
1628     "    server: 7 message for command #7\n"
1629     "    server: 9 message for command #9\n"
1630     "\n"
1631     "For more information, consult the rrdcached(1) documentation.\n"
1632   },
1633   {
1634     ".",   /* BATCH terminator */
1635     batch_done,
1636     CMD_CONTEXT_BATCH,
1637     NULL,
1638     NULL
1639   },
1640   {
1641     "QUIT",
1642     handle_request_quit,
1643     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1644     "QUIT\n"
1645     ,
1646     "Disconnect from rrdcached.\n"
1647   }
1648 }; /* }}} command_t list_of_commands[] */
1649 static size_t list_of_commands_len = sizeof (list_of_commands)
1650   / sizeof (list_of_commands[0]);
1651
1652 static command_t *find_command(char *cmd)
1653 {
1654   size_t i;
1655
1656   for (i = 0; i < list_of_commands_len; i++)
1657     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1658       return (&list_of_commands[i]);
1659   return NULL;
1660 }
1661
1662 /* We currently use the index in the `list_of_commands' array as a bit position
1663  * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
1664  * outside these functions so that switching to a more elegant storage method
1665  * is easily possible. */
1666 static ssize_t find_command_index (const char *cmd) /* {{{ */
1667 {
1668   size_t i;
1669
1670   for (i = 0; i < list_of_commands_len; i++)
1671     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1672       return ((ssize_t) i);
1673   return (-1);
1674 } /* }}} ssize_t find_command_index */
1675
1676 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
1677     const char *cmd)
1678 {
1679   ssize_t i;
1680
1681   if (JOURNAL_REPLAY(sock))
1682     return (1);
1683
1684   if (cmd == NULL)
1685     return (-1);
1686
1687   if ((strcasecmp ("QUIT", cmd) == 0)
1688       || (strcasecmp ("HELP", cmd) == 0))
1689     return (1);
1690   else if (strcmp (".", cmd) == 0)
1691     cmd = "BATCH";
1692
1693   i = find_command_index (cmd);
1694   if (i < 0)
1695     return (-1);
1696   assert (i < 32);
1697
1698   if ((sock->permissions & (1 << i)) != 0)
1699     return (1);
1700   return (0);
1701 } /* }}} int socket_permission_check */
1702
1703 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
1704     const char *cmd)
1705 {
1706   ssize_t i;
1707
1708   i = find_command_index (cmd);
1709   if (i < 0)
1710     return (-1);
1711   assert (i < 32);
1712
1713   sock->permissions |= (1 << i);
1714   return (0);
1715 } /* }}} int socket_permission_add */
1716
1717 /* check whether commands are received in the expected context */
1718 static int command_check_context(listen_socket_t *sock, command_t *cmd)
1719 {
1720   if (JOURNAL_REPLAY(sock))
1721     return (cmd->context & CMD_CONTEXT_JOURNAL);
1722   else if (sock->batch_start)
1723     return (cmd->context & CMD_CONTEXT_BATCH);
1724   else
1725     return (cmd->context & CMD_CONTEXT_CLIENT);
1726
1727   /* NOTREACHED */
1728   assert(1==0);
1729 }
1730
1731 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1732 {
1733   int status;
1734   char *cmd_str;
1735   char *resp_txt;
1736   command_t *help = NULL;
1737
1738   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1739   if (status == 0)
1740     help = find_command(cmd_str);
1741
1742   if (help && (help->syntax || help->help))
1743   {
1744     char tmp[CMD_MAX];
1745
1746     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1747     resp_txt = tmp;
1748
1749     if (help->syntax)
1750       add_response_info(sock, "Usage: %s\n", help->syntax);
1751
1752     if (help->help)
1753       add_response_info(sock, "%s\n", help->help);
1754   }
1755   else
1756   {
1757     size_t i;
1758
1759     resp_txt = "Command overview\n";
1760
1761     for (i = 0; i < list_of_commands_len; i++)
1762     {
1763       if (list_of_commands[i].syntax == NULL)
1764         continue;
1765       add_response_info (sock, "%s", list_of_commands[i].syntax);
1766     }
1767   }
1768
1769   return send_response(sock, RESP_OK, resp_txt);
1770 } /* }}} int handle_request_help */
1771
1772 static int handle_request (DISPATCH_PROTO) /* {{{ */
1773 {
1774   char *buffer_ptr = buffer;
1775   char *cmd_str = NULL;
1776   command_t *cmd = NULL;
1777   int status;
1778
1779   assert (buffer[buffer_size - 1] == '\0');
1780
1781   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1782   if (status != 0)
1783   {
1784     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1785     return (-1);
1786   }
1787
1788   if (sock != NULL && sock->batch_start)
1789     sock->batch_cmd++;
1790
1791   cmd = find_command(cmd_str);
1792   if (!cmd)
1793     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1794
1795   if (!socket_permission_check (sock, cmd->cmd))
1796     return send_response(sock, RESP_ERR, "Permission denied.\n");
1797
1798   if (!command_check_context(sock, cmd))
1799     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1800
1801   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1802 } /* }}} int handle_request */
1803
1804 static void journal_set_free (journal_set *js) /* {{{ */
1805 {
1806   if (js == NULL)
1807     return;
1808
1809   rrd_free_ptrs((void ***) &js->files, &js->files_num);
1810
1811   free(js);
1812 } /* }}} journal_set_free */
1813
1814 static void journal_set_remove (journal_set *js) /* {{{ */
1815 {
1816   if (js == NULL)
1817     return;
1818
1819   for (uint i=0; i < js->files_num; i++)
1820   {
1821     RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
1822     unlink(js->files[i]);
1823   }
1824 } /* }}} journal_set_remove */
1825
1826 /* close current journal file handle.
1827  * MUST hold journal_lock before calling */
1828 static void journal_close(void) /* {{{ */
1829 {
1830   if (journal_fh != NULL)
1831   {
1832     if (fclose(journal_fh) != 0)
1833       RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
1834   }
1835
1836   journal_fh = NULL;
1837   journal_size = 0;
1838 } /* }}} journal_close */
1839
1840 /* MUST hold journal_lock before calling */
1841 static void journal_new_file(void) /* {{{ */
1842 {
1843   struct timeval now;
1844   int  new_fd;
1845   char new_file[PATH_MAX + 1];
1846
1847   assert(journal_dir != NULL);
1848   assert(journal_cur != NULL);
1849
1850   journal_close();
1851
1852   gettimeofday(&now, NULL);
1853   /* this format assures that the files sort in strcmp() order */
1854   snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
1855            journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
1856
1857   new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
1858                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1859   if (new_fd < 0)
1860     goto error;
1861
1862   journal_fh = fdopen(new_fd, "a");
1863   if (journal_fh == NULL)
1864     goto error;
1865
1866   journal_size = ftell(journal_fh);
1867   RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
1868
1869   /* record the file in the journal set */
1870   rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
1871
1872   return;
1873
1874 error:
1875   RRDD_LOG(LOG_CRIT,
1876            "JOURNALING DISABLED: Error while trying to create %s : %s",
1877            new_file, rrd_strerror(errno));
1878   RRDD_LOG(LOG_CRIT,
1879            "JOURNALING DISABLED: All values will be flushed at shutdown");
1880
1881   close(new_fd);
1882   config_flush_at_shutdown = 1;
1883
1884 } /* }}} journal_new_file */
1885
1886 /* MUST NOT hold journal_lock before calling this */
1887 static void journal_rotate(void) /* {{{ */
1888 {
1889   journal_set *old_js = NULL;
1890
1891   if (journal_dir == NULL)
1892     return;
1893
1894   RRDD_LOG(LOG_DEBUG, "rotating journals");
1895
1896   pthread_mutex_lock(&stats_lock);
1897   ++stats_journal_rotate;
1898   pthread_mutex_unlock(&stats_lock);
1899
1900   pthread_mutex_lock(&journal_lock);
1901
1902   journal_close();
1903
1904   /* rotate the journal sets */
1905   old_js = journal_old;
1906   journal_old = journal_cur;
1907   journal_cur = calloc(1, sizeof(journal_set));
1908
1909   if (journal_cur != NULL)
1910     journal_new_file();
1911   else
1912     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
1913
1914   pthread_mutex_unlock(&journal_lock);
1915
1916   journal_set_remove(old_js);
1917   journal_set_free  (old_js);
1918
1919 } /* }}} static void journal_rotate */
1920
1921 /* MUST hold journal_lock when calling */
1922 static void journal_done(void) /* {{{ */
1923 {
1924   if (journal_cur == NULL)
1925     return;
1926
1927   journal_close();
1928
1929   if (config_flush_at_shutdown)
1930   {
1931     RRDD_LOG(LOG_INFO, "removing journals");
1932     journal_set_remove(journal_old);
1933     journal_set_remove(journal_cur);
1934   }
1935   else
1936   {
1937     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1938              "journals will be used at next startup");
1939   }
1940
1941   journal_set_free(journal_cur);
1942   journal_set_free(journal_old);
1943   free(journal_dir);
1944
1945 } /* }}} static void journal_done */
1946
1947 static int journal_write(char *cmd, char *args) /* {{{ */
1948 {
1949   int chars;
1950
1951   if (journal_fh == NULL)
1952     return 0;
1953
1954   pthread_mutex_lock(&journal_lock);
1955   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1956   journal_size += chars;
1957
1958   if (journal_size > JOURNAL_MAX)
1959     journal_new_file();
1960
1961   pthread_mutex_unlock(&journal_lock);
1962
1963   if (chars > 0)
1964   {
1965     pthread_mutex_lock(&stats_lock);
1966     stats_journal_bytes += chars;
1967     pthread_mutex_unlock(&stats_lock);
1968   }
1969
1970   return chars;
1971 } /* }}} static int journal_write */
1972
1973 static int journal_replay (const char *file) /* {{{ */
1974 {
1975   FILE *fh;
1976   int entry_cnt = 0;
1977   int fail_cnt = 0;
1978   uint64_t line = 0;
1979   char entry[CMD_MAX];
1980   time_t now;
1981
1982   if (file == NULL) return 0;
1983
1984   {
1985     char *reason = "unknown error";
1986     int status = 0;
1987     struct stat statbuf;
1988
1989     memset(&statbuf, 0, sizeof(statbuf));
1990     if (stat(file, &statbuf) != 0)
1991     {
1992       reason = "stat error";
1993       status = errno;
1994     }
1995     else if (!S_ISREG(statbuf.st_mode))
1996     {
1997       reason = "not a regular file";
1998       status = EPERM;
1999     }
2000     if (statbuf.st_uid != daemon_uid)
2001     {
2002       reason = "not owned by daemon user";
2003       status = EACCES;
2004     }
2005     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2006     {
2007       reason = "must not be user/group writable";
2008       status = EACCES;
2009     }
2010
2011     if (status != 0)
2012     {
2013       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2014                file, rrd_strerror(status), reason);
2015       return 0;
2016     }
2017   }
2018
2019   fh = fopen(file, "r");
2020   if (fh == NULL)
2021   {
2022     if (errno != ENOENT)
2023       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2024                file, rrd_strerror(errno));
2025     return 0;
2026   }
2027   else
2028     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2029
2030   now = time(NULL);
2031
2032   while(!feof(fh))
2033   {
2034     size_t entry_len;
2035
2036     ++line;
2037     if (fgets(entry, sizeof(entry), fh) == NULL)
2038       break;
2039     entry_len = strlen(entry);
2040
2041     /* check \n termination in case journal writing crashed mid-line */
2042     if (entry_len == 0)
2043       continue;
2044     else if (entry[entry_len - 1] != '\n')
2045     {
2046       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2047       ++fail_cnt;
2048       continue;
2049     }
2050
2051     entry[entry_len - 1] = '\0';
2052
2053     if (handle_request(NULL, now, entry, entry_len) == 0)
2054       ++entry_cnt;
2055     else
2056       ++fail_cnt;
2057   }
2058
2059   fclose(fh);
2060
2061   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2062            entry_cnt, fail_cnt);
2063
2064   return entry_cnt > 0 ? 1 : 0;
2065 } /* }}} static int journal_replay */
2066
2067 static int journal_sort(const void *v1, const void *v2)
2068 {
2069   char **jn1 = (char **) v1;
2070   char **jn2 = (char **) v2;
2071
2072   return strcmp(*jn1,*jn2);
2073 }
2074
2075 static void journal_init(void) /* {{{ */
2076 {
2077   int had_journal = 0;
2078   DIR *dir;
2079   struct dirent *dent;
2080   char path[PATH_MAX+1];
2081
2082   if (journal_dir == NULL) return;
2083
2084   pthread_mutex_lock(&journal_lock);
2085
2086   journal_cur = calloc(1, sizeof(journal_set));
2087   if (journal_cur == NULL)
2088   {
2089     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2090     return;
2091   }
2092
2093   RRDD_LOG(LOG_INFO, "checking for journal files");
2094
2095   /* Handle old journal files during transition.  This gives them the
2096    * correct sort order.  TODO: remove after first release
2097    */
2098   {
2099     char old_path[PATH_MAX+1];
2100     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2101     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2102     rename(old_path, path);
2103
2104     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE        );
2105     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2106     rename(old_path, path);
2107   }
2108
2109   dir = opendir(journal_dir);
2110   while ((dent = readdir(dir)) != NULL)
2111   {
2112     /* looks like a journal file? */
2113     if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2114       continue;
2115
2116     snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2117
2118     if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2119     {
2120       RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2121                dent->d_name);
2122       break;
2123     }
2124   }
2125   closedir(dir);
2126
2127   qsort(journal_cur->files, journal_cur->files_num,
2128         sizeof(journal_cur->files[0]), journal_sort);
2129
2130   for (uint i=0; i < journal_cur->files_num; i++)
2131     had_journal += journal_replay(journal_cur->files[i]);
2132
2133   journal_new_file();
2134
2135   /* it must have been a crash.  start a flush */
2136   if (had_journal && config_flush_at_shutdown)
2137     flush_old_values(-1);
2138
2139   pthread_mutex_unlock(&journal_lock);
2140
2141   RRDD_LOG(LOG_INFO, "journal processing complete");
2142
2143 } /* }}} static void journal_init */
2144
2145 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2146 {
2147   assert(sock != NULL);
2148
2149   free(sock->rbuf);  sock->rbuf = NULL;
2150   free(sock->wbuf);  sock->wbuf = NULL;
2151   free(sock);
2152 } /* }}} void free_listen_socket */
2153
2154 static void close_connection(listen_socket_t *sock) /* {{{ */
2155 {
2156   if (sock->fd >= 0)
2157   {
2158     close(sock->fd);
2159     sock->fd = -1;
2160   }
2161
2162   free_listen_socket(sock);
2163
2164 } /* }}} void close_connection */
2165
2166 static void *connection_thread_main (void *args) /* {{{ */
2167 {
2168   listen_socket_t *sock;
2169   int fd;
2170
2171   sock = (listen_socket_t *) args;
2172   fd = sock->fd;
2173
2174   /* init read buffers */
2175   sock->next_read = sock->next_cmd = 0;
2176   sock->rbuf = malloc(RBUF_SIZE);
2177   if (sock->rbuf == NULL)
2178   {
2179     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2180     close_connection(sock);
2181     return NULL;
2182   }
2183
2184   pthread_mutex_lock (&connection_threads_lock);
2185   connection_threads_num++;
2186   pthread_mutex_unlock (&connection_threads_lock);
2187
2188   while (state == RUNNING)
2189   {
2190     char *cmd;
2191     ssize_t cmd_len;
2192     ssize_t rbytes;
2193     time_t now;
2194
2195     struct pollfd pollfd;
2196     int status;
2197
2198     pollfd.fd = fd;
2199     pollfd.events = POLLIN | POLLPRI;
2200     pollfd.revents = 0;
2201
2202     status = poll (&pollfd, 1, /* timeout = */ 500);
2203     if (state != RUNNING)
2204       break;
2205     else if (status == 0) /* timeout */
2206       continue;
2207     else if (status < 0) /* error */
2208     {
2209       status = errno;
2210       if (status != EINTR)
2211         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2212       continue;
2213     }
2214
2215     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2216       break;
2217     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2218     {
2219       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2220           "poll(2) returned something unexpected: %#04hx",
2221           pollfd.revents);
2222       break;
2223     }
2224
2225     rbytes = read(fd, sock->rbuf + sock->next_read,
2226                   RBUF_SIZE - sock->next_read);
2227     if (rbytes < 0)
2228     {
2229       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2230       break;
2231     }
2232     else if (rbytes == 0)
2233       break; /* eof */
2234
2235     sock->next_read += rbytes;
2236
2237     if (sock->batch_start)
2238       now = sock->batch_start;
2239     else
2240       now = time(NULL);
2241
2242     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2243     {
2244       status = handle_request (sock, now, cmd, cmd_len+1);
2245       if (status != 0)
2246         goto out_close;
2247     }
2248   }
2249
2250 out_close:
2251   close_connection(sock);
2252
2253   /* Remove this thread from the connection threads list */
2254   pthread_mutex_lock (&connection_threads_lock);
2255   connection_threads_num--;
2256   if (connection_threads_num <= 0)
2257     pthread_cond_broadcast(&connection_threads_done);
2258   pthread_mutex_unlock (&connection_threads_lock);
2259
2260   return (NULL);
2261 } /* }}} void *connection_thread_main */
2262
2263 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2264 {
2265   int fd;
2266   struct sockaddr_un sa;
2267   listen_socket_t *temp;
2268   int status;
2269   const char *path;
2270   char *path_copy, *dir;
2271
2272   path = sock->addr;
2273   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2274     path += strlen("unix:");
2275
2276   /* dirname may modify its argument */
2277   path_copy = strdup(path);
2278   if (path_copy == NULL)
2279   {
2280     fprintf(stderr, "rrdcached: strdup(): %s\n",
2281         rrd_strerror(errno));
2282     return (-1);
2283   }
2284
2285   dir = dirname(path_copy);
2286   if (rrd_mkdir_p(dir, 0777) != 0)
2287   {
2288     fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2289         dir, rrd_strerror(errno));
2290     return (-1);
2291   }
2292
2293   free(path_copy);
2294
2295   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2296       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2297   if (temp == NULL)
2298   {
2299     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2300     return (-1);
2301   }
2302   listen_fds = temp;
2303   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2304
2305   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2306   if (fd < 0)
2307   {
2308     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2309              rrd_strerror(errno));
2310     return (-1);
2311   }
2312
2313   memset (&sa, 0, sizeof (sa));
2314   sa.sun_family = AF_UNIX;
2315   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2316
2317   /* if we've gotten this far, we own the pid file.  any daemon started
2318    * with the same args must not be alive.  therefore, ensure that we can
2319    * create the socket...
2320    */
2321   unlink(path);
2322
2323   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2324   if (status != 0)
2325   {
2326     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2327              path, rrd_strerror(errno));
2328     close (fd);
2329     return (-1);
2330   }
2331
2332   /* tweak the sockets group ownership */
2333   if (sock->socket_group != (gid_t)-1)
2334   {
2335     if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2336          (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2337     {
2338       fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2339     }
2340   }
2341
2342   status = listen (fd, /* backlog = */ 10);
2343   if (status != 0)
2344   {
2345     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2346              path, rrd_strerror(errno));
2347     close (fd);
2348     unlink (path);
2349     return (-1);
2350   }
2351
2352   listen_fds[listen_fds_num].fd = fd;
2353   listen_fds[listen_fds_num].family = PF_UNIX;
2354   strncpy(listen_fds[listen_fds_num].addr, path,
2355           sizeof (listen_fds[listen_fds_num].addr) - 1);
2356   listen_fds_num++;
2357
2358   return (0);
2359 } /* }}} int open_listen_socket_unix */
2360
2361 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2362 {
2363   struct addrinfo ai_hints;
2364   struct addrinfo *ai_res;
2365   struct addrinfo *ai_ptr;
2366   char addr_copy[NI_MAXHOST];
2367   char *addr;
2368   char *port;
2369   int status;
2370
2371   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2372   addr_copy[sizeof (addr_copy) - 1] = 0;
2373   addr = addr_copy;
2374
2375   memset (&ai_hints, 0, sizeof (ai_hints));
2376   ai_hints.ai_flags = 0;
2377 #ifdef AI_ADDRCONFIG
2378   ai_hints.ai_flags |= AI_ADDRCONFIG;
2379 #endif
2380   ai_hints.ai_family = AF_UNSPEC;
2381   ai_hints.ai_socktype = SOCK_STREAM;
2382
2383   port = NULL;
2384   if (*addr == '[') /* IPv6+port format */
2385   {
2386     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2387     addr++;
2388
2389     port = strchr (addr, ']');
2390     if (port == NULL)
2391     {
2392       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2393       return (-1);
2394     }
2395     *port = 0;
2396     port++;
2397
2398     if (*port == ':')
2399       port++;
2400     else if (*port == 0)
2401       port = NULL;
2402     else
2403     {
2404       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2405       return (-1);
2406     }
2407   } /* if (*addr == '[') */
2408   else
2409   {
2410     port = rindex(addr, ':');
2411     if (port != NULL)
2412     {
2413       *port = 0;
2414       port++;
2415     }
2416   }
2417   ai_res = NULL;
2418   status = getaddrinfo (addr,
2419                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2420                         &ai_hints, &ai_res);
2421   if (status != 0)
2422   {
2423     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2424              addr, gai_strerror (status));
2425     return (-1);
2426   }
2427
2428   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2429   {
2430     int fd;
2431     listen_socket_t *temp;
2432     int one = 1;
2433
2434     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2435         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2436     if (temp == NULL)
2437     {
2438       fprintf (stderr,
2439                "rrdcached: open_listen_socket_network: realloc failed.\n");
2440       continue;
2441     }
2442     listen_fds = temp;
2443     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2444
2445     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2446     if (fd < 0)
2447     {
2448       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2449                rrd_strerror(errno));
2450       continue;
2451     }
2452
2453     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2454
2455     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2456     if (status != 0)
2457     {
2458       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2459                sock->addr, rrd_strerror(errno));
2460       close (fd);
2461       continue;
2462     }
2463
2464     status = listen (fd, /* backlog = */ 10);
2465     if (status != 0)
2466     {
2467       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2468                sock->addr, rrd_strerror(errno));
2469       close (fd);
2470       freeaddrinfo(ai_res);
2471       return (-1);
2472     }
2473
2474     listen_fds[listen_fds_num].fd = fd;
2475     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2476     listen_fds_num++;
2477   } /* for (ai_ptr) */
2478
2479   freeaddrinfo(ai_res);
2480   return (0);
2481 } /* }}} static int open_listen_socket_network */
2482
2483 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2484 {
2485   assert(sock != NULL);
2486   assert(sock->addr != NULL);
2487
2488   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2489       || sock->addr[0] == '/')
2490     return (open_listen_socket_unix(sock));
2491   else
2492     return (open_listen_socket_network(sock));
2493 } /* }}} int open_listen_socket */
2494
2495 static int close_listen_sockets (void) /* {{{ */
2496 {
2497   size_t i;
2498
2499   for (i = 0; i < listen_fds_num; i++)
2500   {
2501     close (listen_fds[i].fd);
2502
2503     if (listen_fds[i].family == PF_UNIX)
2504       unlink(listen_fds[i].addr);
2505   }
2506
2507   free (listen_fds);
2508   listen_fds = NULL;
2509   listen_fds_num = 0;
2510
2511   return (0);
2512 } /* }}} int close_listen_sockets */
2513
2514 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2515 {
2516   struct pollfd *pollfds;
2517   int pollfds_num;
2518   int status;
2519   int i;
2520
2521   if (listen_fds_num < 1)
2522   {
2523     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2524     return (NULL);
2525   }
2526
2527   pollfds_num = listen_fds_num;
2528   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2529   if (pollfds == NULL)
2530   {
2531     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2532     return (NULL);
2533   }
2534   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2535
2536   RRDD_LOG(LOG_INFO, "listening for connections");
2537
2538   while (state == RUNNING)
2539   {
2540     for (i = 0; i < pollfds_num; i++)
2541     {
2542       pollfds[i].fd = listen_fds[i].fd;
2543       pollfds[i].events = POLLIN | POLLPRI;
2544       pollfds[i].revents = 0;
2545     }
2546
2547     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2548     if (state != RUNNING)
2549       break;
2550     else if (status == 0) /* timeout */
2551       continue;
2552     else if (status < 0) /* error */
2553     {
2554       status = errno;
2555       if (status != EINTR)
2556       {
2557         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2558       }
2559       continue;
2560     }
2561
2562     for (i = 0; i < pollfds_num; i++)
2563     {
2564       listen_socket_t *client_sock;
2565       struct sockaddr_storage client_sa;
2566       socklen_t client_sa_size;
2567       pthread_t tid;
2568       pthread_attr_t attr;
2569
2570       if (pollfds[i].revents == 0)
2571         continue;
2572
2573       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2574       {
2575         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2576             "poll(2) returned something unexpected for listen FD #%i.",
2577             pollfds[i].fd);
2578         continue;
2579       }
2580
2581       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2582       if (client_sock == NULL)
2583       {
2584         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2585         continue;
2586       }
2587       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2588
2589       client_sa_size = sizeof (client_sa);
2590       client_sock->fd = accept (pollfds[i].fd,
2591           (struct sockaddr *) &client_sa, &client_sa_size);
2592       if (client_sock->fd < 0)
2593       {
2594         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2595         free(client_sock);
2596         continue;
2597       }
2598
2599       pthread_attr_init (&attr);
2600       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2601
2602       status = pthread_create (&tid, &attr, connection_thread_main,
2603                                client_sock);
2604       if (status != 0)
2605       {
2606         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2607         close_connection(client_sock);
2608         continue;
2609       }
2610     } /* for (pollfds_num) */
2611   } /* while (state == RUNNING) */
2612
2613   RRDD_LOG(LOG_INFO, "starting shutdown");
2614
2615   close_listen_sockets ();
2616
2617   pthread_mutex_lock (&connection_threads_lock);
2618   while (connection_threads_num > 0)
2619     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2620   pthread_mutex_unlock (&connection_threads_lock);
2621
2622   free(pollfds);
2623
2624   return (NULL);
2625 } /* }}} void *listen_thread_main */
2626
2627 static int daemonize (void) /* {{{ */
2628 {
2629   int pid_fd;
2630   char *base_dir;
2631
2632   daemon_uid = geteuid();
2633
2634   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2635   if (pid_fd < 0)
2636     pid_fd = check_pidfile();
2637   if (pid_fd < 0)
2638     return pid_fd;
2639
2640   /* open all the listen sockets */
2641   if (config_listen_address_list_len > 0)
2642   {
2643     for (size_t i = 0; i < config_listen_address_list_len; i++)
2644       open_listen_socket (config_listen_address_list[i]);
2645
2646     rrd_free_ptrs((void ***) &config_listen_address_list,
2647                   &config_listen_address_list_len);
2648   }
2649   else
2650   {
2651     listen_socket_t sock;
2652     memset(&sock, 0, sizeof(sock));
2653     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2654     open_listen_socket (&sock);
2655   }
2656
2657   if (listen_fds_num < 1)
2658   {
2659     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2660     goto error;
2661   }
2662
2663   if (!stay_foreground)
2664   {
2665     pid_t child;
2666
2667     child = fork ();
2668     if (child < 0)
2669     {
2670       fprintf (stderr, "daemonize: fork(2) failed.\n");
2671       goto error;
2672     }
2673     else if (child > 0)
2674       exit(0);
2675
2676     /* Become session leader */
2677     setsid ();
2678
2679     /* Open the first three file descriptors to /dev/null */
2680     close (2);
2681     close (1);
2682     close (0);
2683
2684     open ("/dev/null", O_RDWR);
2685     if (dup(0) == -1 || dup(0) == -1){
2686         RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2687     }
2688   } /* if (!stay_foreground) */
2689
2690   /* Change into the /tmp directory. */
2691   base_dir = (config_base_dir != NULL)
2692     ? config_base_dir
2693     : "/tmp";
2694
2695   if (chdir (base_dir) != 0)
2696   {
2697     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2698     goto error;
2699   }
2700
2701   install_signal_handlers();
2702
2703   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2704   RRDD_LOG(LOG_INFO, "starting up");
2705
2706   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2707                                 (GDestroyNotify) free_cache_item);
2708   if (cache_tree == NULL)
2709   {
2710     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2711     goto error;
2712   }
2713
2714   return write_pidfile (pid_fd);
2715
2716 error:
2717   remove_pidfile();
2718   return -1;
2719 } /* }}} int daemonize */
2720
2721 static int cleanup (void) /* {{{ */
2722 {
2723   pthread_cond_broadcast (&flush_cond);
2724   pthread_join (flush_thread, NULL);
2725
2726   pthread_cond_broadcast (&queue_cond);
2727   for (int i = 0; i < config_queue_threads; i++)
2728     pthread_join (queue_threads[i], NULL);
2729
2730   if (config_flush_at_shutdown)
2731   {
2732     assert(cache_queue_head == NULL);
2733     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2734   }
2735
2736   free(queue_threads);
2737   free(config_base_dir);
2738
2739   pthread_mutex_lock(&cache_lock);
2740   g_tree_destroy(cache_tree);
2741
2742   pthread_mutex_lock(&journal_lock);
2743   journal_done();
2744
2745   RRDD_LOG(LOG_INFO, "goodbye");
2746   closelog ();
2747
2748   remove_pidfile ();
2749   free(config_pid_file);
2750
2751   return (0);
2752 } /* }}} int cleanup */
2753
2754 static int read_options (int argc, char **argv) /* {{{ */
2755 {
2756   int option;
2757   int status = 0;
2758
2759   char **permissions = NULL;
2760   size_t permissions_len = 0;
2761
2762   gid_t socket_group = (gid_t)-1;
2763
2764   while ((option = getopt(argc, argv, "gl:s:P:f:w:z:t:Bb:p:Fj:h?")) != -1)
2765   {
2766     switch (option)
2767     {
2768       case 'g':
2769         stay_foreground=1;
2770         break;
2771
2772       case 'l':
2773       {
2774         listen_socket_t *new;
2775
2776         new = malloc(sizeof(listen_socket_t));
2777         if (new == NULL)
2778         {
2779           fprintf(stderr, "read_options: malloc failed.\n");
2780           return(2);
2781         }
2782         memset(new, 0, sizeof(listen_socket_t));
2783
2784         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2785
2786         /* Add permissions to the socket {{{ */
2787         if (permissions_len != 0)
2788         {
2789           size_t i;
2790           for (i = 0; i < permissions_len; i++)
2791           {
2792             status = socket_permission_add (new, permissions[i]);
2793             if (status != 0)
2794             {
2795               fprintf (stderr, "read_options: Adding permission \"%s\" to "
2796                   "socket failed. Most likely, this permission doesn't "
2797                   "exist. Check your command line.\n", permissions[i]);
2798               status = 4;
2799             }
2800           }
2801         }
2802         else /* if (permissions_len == 0) */
2803         {
2804           /* Add permission for ALL commands to the socket. */
2805           size_t i;
2806           for (i = 0; i < list_of_commands_len; i++)
2807           {
2808             status = socket_permission_add (new, list_of_commands[i].cmd);
2809             if (status != 0)
2810             {
2811               fprintf (stderr, "read_options: Adding permission \"%s\" to "
2812                   "socket failed. This should never happen, ever! Sorry.\n",
2813                   permissions[i]);
2814               status = 4;
2815             }
2816           }
2817         }
2818         /* }}} Done adding permissions. */
2819
2820         new->socket_group = socket_group;
2821
2822         if (!rrd_add_ptr((void ***)&config_listen_address_list,
2823                          &config_listen_address_list_len, new))
2824         {
2825           fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2826           return (2);
2827         }
2828       }
2829       break;
2830
2831       /* set socket group permissions */
2832       case 's':
2833       {
2834         gid_t group_gid;
2835         struct group *grp;
2836
2837         group_gid = strtoul(optarg, NULL, 10);
2838         if (errno != EINVAL && group_gid>0)
2839         {
2840           /* we were passed a number */
2841           grp = getgrgid(group_gid);
2842         }
2843         else
2844         {
2845           grp = getgrnam(optarg);
2846         }
2847
2848         if (grp)
2849         {
2850           socket_group = grp->gr_gid;
2851         }
2852         else
2853         {
2854           /* no idea what the user wanted... */
2855           fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
2856           return (5);
2857         }
2858       }
2859       break;
2860
2861       case 'P':
2862       {
2863         char *optcopy;
2864         char *saveptr;
2865         char *dummy;
2866         char *ptr;
2867
2868         rrd_free_ptrs ((void *) &permissions, &permissions_len);
2869
2870         optcopy = strdup (optarg);
2871         dummy = optcopy;
2872         saveptr = NULL;
2873         while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
2874         {
2875           dummy = NULL;
2876           rrd_add_strdup ((void *) &permissions, &permissions_len, ptr);
2877         }
2878
2879         free (optcopy);
2880       }
2881       break;
2882
2883       case 'f':
2884       {
2885         int temp;
2886
2887         temp = atoi (optarg);
2888         if (temp > 0)
2889           config_flush_interval = temp;
2890         else
2891         {
2892           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2893           status = 3;
2894         }
2895       }
2896       break;
2897
2898       case 'w':
2899       {
2900         int temp;
2901
2902         temp = atoi (optarg);
2903         if (temp > 0)
2904           config_write_interval = temp;
2905         else
2906         {
2907           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2908           status = 2;
2909         }
2910       }
2911       break;
2912
2913       case 'z':
2914       {
2915         int temp;
2916
2917         temp = atoi(optarg);
2918         if (temp > 0)
2919           config_write_jitter = temp;
2920         else
2921         {
2922           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2923           status = 2;
2924         }
2925
2926         break;
2927       }
2928
2929       case 't':
2930       {
2931         int threads;
2932         threads = atoi(optarg);
2933         if (threads >= 1)
2934           config_queue_threads = threads;
2935         else
2936         {
2937           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2938           return 1;
2939         }
2940       }
2941       break;
2942
2943       case 'B':
2944         config_write_base_only = 1;
2945         break;
2946
2947       case 'b':
2948       {
2949         size_t len;
2950         char base_realpath[PATH_MAX];
2951
2952         if (config_base_dir != NULL)
2953           free (config_base_dir);
2954         config_base_dir = strdup (optarg);
2955         if (config_base_dir == NULL)
2956         {
2957           fprintf (stderr, "read_options: strdup failed.\n");
2958           return (3);
2959         }
2960
2961         if (rrd_mkdir_p (config_base_dir, 0777) != 0)
2962         {
2963           fprintf (stderr, "Failed to create base directory '%s': %s\n",
2964               config_base_dir, rrd_strerror (errno));
2965           return (3);
2966         }
2967
2968         /* make sure that the base directory is not resolved via
2969          * symbolic links.  this makes some performance-enhancing
2970          * assumptions possible (we don't have to resolve paths
2971          * that start with a "/")
2972          */
2973         if (realpath(config_base_dir, base_realpath) == NULL)
2974         {
2975           fprintf (stderr, "Failed to canonicalize the base directory '%s': "
2976               "%s\n", config_base_dir, rrd_strerror(errno));
2977           return 5;
2978         }
2979
2980         len = strlen (config_base_dir);
2981         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2982         {
2983           config_base_dir[len - 1] = 0;
2984           len--;
2985         }
2986
2987         if (len < 1)
2988         {
2989           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2990           return (4);
2991         }
2992
2993         _config_base_dir_len = len;
2994
2995         len = strlen (base_realpath);
2996         while ((len > 0) && (base_realpath[len - 1] == '/'))
2997         {
2998           base_realpath[len - 1] = '\0';
2999           len--;
3000         }
3001
3002         if (strncmp(config_base_dir,
3003                          base_realpath, sizeof(base_realpath)) != 0)
3004         {
3005           fprintf(stderr,
3006                   "Base directory (-b) resolved via file system links!\n"
3007                   "Please consult rrdcached '-b' documentation!\n"
3008                   "Consider specifying the real directory (%s)\n",
3009                   base_realpath);
3010           return 5;
3011         }
3012       }
3013       break;
3014
3015       case 'p':
3016       {
3017         if (config_pid_file != NULL)
3018           free (config_pid_file);
3019         config_pid_file = strdup (optarg);
3020         if (config_pid_file == NULL)
3021         {
3022           fprintf (stderr, "read_options: strdup failed.\n");
3023           return (3);
3024         }
3025       }
3026       break;
3027
3028       case 'F':
3029         config_flush_at_shutdown = 1;
3030         break;
3031
3032       case 'j':
3033       {
3034         const char *dir = journal_dir = strdup(optarg);
3035
3036         status = rrd_mkdir_p(dir, 0777);
3037         if (status != 0)
3038         {
3039           fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3040               dir, rrd_strerror(errno));
3041           return 6;
3042         }
3043
3044         if (access(dir, R_OK|W_OK|X_OK) != 0)
3045         {
3046           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3047                   errno ? rrd_strerror(errno) : "");
3048           return 6;
3049         }
3050       }
3051       break;
3052
3053       case 'h':
3054       case '?':
3055         printf ("RRDCacheD %s\n"
3056             "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3057             "\n"
3058             "Usage: rrdcached [options]\n"
3059             "\n"
3060             "Valid options are:\n"
3061             "  -l <address>  Socket address to listen to.\n"
3062             "  -P <perms>    Sets the permissions to assign to all following "
3063                             "sockets\n"
3064             "  -w <seconds>  Interval in which to write data.\n"
3065             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
3066             "  -t <threads>  Number of write threads.\n"
3067             "  -f <seconds>  Interval in which to flush dead data.\n"
3068             "  -p <file>     Location of the PID-file.\n"
3069             "  -b <dir>      Base directory to change to.\n"
3070             "  -B            Restrict file access to paths within -b <dir>\n"
3071             "  -g            Do not fork and run in the foreground.\n"
3072             "  -j <dir>      Directory in which to create the journal files.\n"
3073             "  -F            Always flush all updates at shutdown\n"
3074             "  -s <id|name>  Make socket g+rw to named group\n"
3075             "\n"
3076             "For more information and a detailed description of all options "
3077             "please refer\n"
3078             "to the rrdcached(1) manual page.\n",
3079             VERSION);
3080         status = -1;
3081         break;
3082     } /* switch (option) */
3083   } /* while (getopt) */
3084
3085   /* advise the user when values are not sane */
3086   if (config_flush_interval < 2 * config_write_interval)
3087     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3088             " 2x write interval (-w) !\n");
3089   if (config_write_jitter > config_write_interval)
3090     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3091             " write interval (-w) !\n");
3092
3093   if (config_write_base_only && config_base_dir == NULL)
3094     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3095             "  Consult the rrdcached documentation\n");
3096
3097   if (journal_dir == NULL)
3098     config_flush_at_shutdown = 1;
3099
3100   rrd_free_ptrs ((void *) &permissions, &permissions_len);
3101
3102   return (status);
3103 } /* }}} int read_options */
3104
3105 int main (int argc, char **argv)
3106 {
3107   int status;
3108
3109   status = read_options (argc, argv);
3110   if (status != 0)
3111   {
3112     if (status < 0)
3113       status = 0;
3114     return (status);
3115   }
3116
3117   status = daemonize ();
3118   if (status != 0)
3119   {
3120     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3121     return (1);
3122   }
3123
3124   journal_init();
3125
3126   /* start the queue threads */
3127   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3128   if (queue_threads == NULL)
3129   {
3130     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3131     cleanup();
3132     return (1);
3133   }
3134   for (int i = 0; i < config_queue_threads; i++)
3135   {
3136     memset (&queue_threads[i], 0, sizeof (*queue_threads));
3137     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3138     if (status != 0)
3139     {
3140       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3141       cleanup();
3142       return (1);
3143     }
3144   }
3145
3146   /* start the flush thread */
3147   memset(&flush_thread, 0, sizeof(flush_thread));
3148   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3149   if (status != 0)
3150   {
3151     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3152     cleanup();
3153     return (1);
3154   }
3155
3156   listen_thread_main (NULL);
3157   cleanup ();
3158
3159   return (0);
3160 } /* int main */
3161
3162 /*
3163  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3164  */