free config_pid_file after using it for the last time ... Else, the daemon might...
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008,2009 Florian octo Forster
4  * Copyright (C) 2008,2009 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
73
74 #include "rrd.h"
75 #include "rrd_client.h"
76
77 #include <stdlib.h>
78
79 #ifndef WIN32
80 #ifdef HAVE_STDINT_H
81 #  include <stdint.h>
82 #endif
83 #include <unistd.h>
84 #include <strings.h>
85 #include <inttypes.h>
86 #include <sys/socket.h>
87
88 #else
89
90 #endif
91 #include <stdio.h>
92 #include <string.h>
93
94 #include <sys/types.h>
95 #include <sys/stat.h>
96 #include <dirent.h>
97 #include <fcntl.h>
98 #include <signal.h>
99 #include <sys/un.h>
100 #include <netdb.h>
101 #include <poll.h>
102 #include <syslog.h>
103 #include <pthread.h>
104 #include <errno.h>
105 #include <assert.h>
106 #include <sys/time.h>
107 #include <time.h>
108
109 #include <glib-2.0/glib.h>
110 /* }}} */
111
112 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
113
114 #ifndef __GNUC__
115 # define __attribute__(x) /**/
116 #endif
117
118 /*
119  * Types
120  */
121 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
122
123 struct listen_socket_s
124 {
125   int fd;
126   char addr[PATH_MAX + 1];
127   int family;
128
129   /* state for BATCH processing */
130   time_t batch_start;
131   int batch_cmd;
132
133   /* buffered IO */
134   char *rbuf;
135   off_t next_cmd;
136   off_t next_read;
137
138   char *wbuf;
139   ssize_t wbuf_len;
140
141   uint32_t permissions;
142 };
143 typedef struct listen_socket_s listen_socket_t;
144
145 struct command_s;
146 typedef struct command_s command_t;
147 /* note: guard against "unused" warnings in the handlers */
148 #define DISPATCH_PROTO  listen_socket_t *sock   __attribute__((unused)),\
149                         time_t now              __attribute__((unused)),\
150                         char  *buffer           __attribute__((unused)),\
151                         size_t buffer_size      __attribute__((unused))
152
153 #define HANDLER_PROTO   command_t *cmd          __attribute__((unused)),\
154                         DISPATCH_PROTO
155
156 struct command_s {
157   char   *cmd;
158   int (*handler)(HANDLER_PROTO);
159
160   char  context;                /* where we expect to see it */
161 #define CMD_CONTEXT_CLIENT      (1<<0)
162 #define CMD_CONTEXT_BATCH       (1<<1)
163 #define CMD_CONTEXT_JOURNAL     (1<<2)
164 #define CMD_CONTEXT_ANY         (0x7f)
165
166   char *syntax;
167   char *help;
168 };
169
170 struct cache_item_s;
171 typedef struct cache_item_s cache_item_t;
172 struct cache_item_s
173 {
174   char *file;
175   char **values;
176   size_t values_num;
177   time_t last_flush_time;
178   time_t last_update_stamp;
179 #define CI_FLAGS_IN_TREE  (1<<0)
180 #define CI_FLAGS_IN_QUEUE (1<<1)
181   int flags;
182   pthread_cond_t  flushed;
183   cache_item_t *prev;
184   cache_item_t *next;
185 };
186
187 struct callback_flush_data_s
188 {
189   time_t now;
190   time_t abs_timeout;
191   char **keys;
192   size_t keys_num;
193 };
194 typedef struct callback_flush_data_s callback_flush_data_t;
195
196 enum queue_side_e
197 {
198   HEAD,
199   TAIL
200 };
201 typedef enum queue_side_e queue_side_t;
202
203 /* describe a set of journal files */
204 typedef struct {
205   char **files;
206   size_t files_num;
207 } journal_set;
208
209 /* max length of socket command or response */
210 #define CMD_MAX 4096
211 #define RBUF_SIZE (CMD_MAX*2)
212
213 /*
214  * Variables
215  */
216 static int stay_foreground = 0;
217 static uid_t daemon_uid;
218
219 static listen_socket_t *listen_fds = NULL;
220 static size_t listen_fds_num = 0;
221
222 enum {
223   RUNNING,              /* normal operation */
224   FLUSHING,             /* flushing remaining values */
225   SHUTDOWN              /* shutting down */
226 } state = RUNNING;
227
228 static pthread_t *queue_threads;
229 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
230 static int config_queue_threads = 4;
231
232 static pthread_t flush_thread;
233 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
234
235 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
236 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
237 static int connection_threads_num = 0;
238
239 /* Cache stuff */
240 static GTree          *cache_tree = NULL;
241 static cache_item_t   *cache_queue_head = NULL;
242 static cache_item_t   *cache_queue_tail = NULL;
243 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
244
245 static int config_write_interval = 300;
246 static int config_write_jitter   = 0;
247 static int config_flush_interval = 3600;
248 static int config_flush_at_shutdown = 0;
249 static char *config_pid_file = NULL;
250 static char *config_base_dir = NULL;
251 static size_t _config_base_dir_len = 0;
252 static int config_write_base_only = 0;
253
254 static listen_socket_t **config_listen_address_list = NULL;
255 static size_t config_listen_address_list_len = 0;
256
257 static uint64_t stats_queue_length = 0;
258 static uint64_t stats_updates_received = 0;
259 static uint64_t stats_flush_received = 0;
260 static uint64_t stats_updates_written = 0;
261 static uint64_t stats_data_sets_written = 0;
262 static uint64_t stats_journal_bytes = 0;
263 static uint64_t stats_journal_rotate = 0;
264 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
265
266 /* Journaled updates */
267 #define JOURNAL_BASE "rrd.journal"
268 static journal_set *journal_cur = NULL;
269 static journal_set *journal_old = NULL;
270 static char *journal_dir = NULL;
271 static FILE *journal_fh = NULL;         /* current journal file handle */
272 static long  journal_size = 0;          /* current journal size */
273 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
274 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
275 static int journal_write(char *cmd, char *args);
276 static void journal_done(void);
277 static void journal_rotate(void);
278
279 /* prototypes for forward refernces */
280 static int handle_request_help (HANDLER_PROTO);
281
282 /* 
283  * Functions
284  */
285 static void sig_common (const char *sig) /* {{{ */
286 {
287   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
288   state = FLUSHING;
289   pthread_cond_broadcast(&flush_cond);
290   pthread_cond_broadcast(&queue_cond);
291 } /* }}} void sig_common */
292
293 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
294 {
295   sig_common("INT");
296 } /* }}} void sig_int_handler */
297
298 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
299 {
300   sig_common("TERM");
301 } /* }}} void sig_term_handler */
302
303 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
304 {
305   config_flush_at_shutdown = 1;
306   sig_common("USR1");
307 } /* }}} void sig_usr1_handler */
308
309 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
310 {
311   config_flush_at_shutdown = 0;
312   sig_common("USR2");
313 } /* }}} void sig_usr2_handler */
314
315 static void install_signal_handlers(void) /* {{{ */
316 {
317   /* These structures are static, because `sigaction' behaves weird if the are
318    * overwritten.. */
319   static struct sigaction sa_int;
320   static struct sigaction sa_term;
321   static struct sigaction sa_pipe;
322   static struct sigaction sa_usr1;
323   static struct sigaction sa_usr2;
324
325   /* Install signal handlers */
326   memset (&sa_int, 0, sizeof (sa_int));
327   sa_int.sa_handler = sig_int_handler;
328   sigaction (SIGINT, &sa_int, NULL);
329
330   memset (&sa_term, 0, sizeof (sa_term));
331   sa_term.sa_handler = sig_term_handler;
332   sigaction (SIGTERM, &sa_term, NULL);
333
334   memset (&sa_pipe, 0, sizeof (sa_pipe));
335   sa_pipe.sa_handler = SIG_IGN;
336   sigaction (SIGPIPE, &sa_pipe, NULL);
337
338   memset (&sa_pipe, 0, sizeof (sa_usr1));
339   sa_usr1.sa_handler = sig_usr1_handler;
340   sigaction (SIGUSR1, &sa_usr1, NULL);
341
342   memset (&sa_usr2, 0, sizeof (sa_usr2));
343   sa_usr2.sa_handler = sig_usr2_handler;
344   sigaction (SIGUSR2, &sa_usr2, NULL);
345
346 } /* }}} void install_signal_handlers */
347
348 static int open_pidfile(char *action, int oflag) /* {{{ */
349 {
350   int fd;
351   char *file;
352
353   file = (config_pid_file != NULL)
354     ? config_pid_file
355     : LOCALSTATEDIR "/run/rrdcached.pid";
356
357   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
358   if (fd < 0)
359     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
360             action, file, rrd_strerror(errno));
361
362   return(fd);
363 } /* }}} static int open_pidfile */
364
365 /* check existing pid file to see whether a daemon is running */
366 static int check_pidfile(void)
367 {
368   int pid_fd;
369   pid_t pid;
370   char pid_str[16];
371
372   pid_fd = open_pidfile("open", O_RDWR);
373   if (pid_fd < 0)
374     return pid_fd;
375
376   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
377     return -1;
378
379   pid = atoi(pid_str);
380   if (pid <= 0)
381     return -1;
382
383   /* another running process that we can signal COULD be
384    * a competing rrdcached */
385   if (pid != getpid() && kill(pid, 0) == 0)
386   {
387     fprintf(stderr,
388             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
389     close(pid_fd);
390     return -1;
391   }
392
393   lseek(pid_fd, 0, SEEK_SET);
394   if (ftruncate(pid_fd, 0) == -1)
395   {
396     fprintf(stderr,
397             "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
398     close(pid_fd);
399     return -1;
400   }
401
402   fprintf(stderr,
403           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
404           "rrdcached: starting normally.\n", pid);
405
406   return pid_fd;
407 } /* }}} static int check_pidfile */
408
409 static int write_pidfile (int fd) /* {{{ */
410 {
411   pid_t pid;
412   FILE *fh;
413
414   pid = getpid ();
415
416   fh = fdopen (fd, "w");
417   if (fh == NULL)
418   {
419     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
420     close(fd);
421     return (-1);
422   }
423
424   fprintf (fh, "%i\n", (int) pid);
425   fclose (fh);
426
427   return (0);
428 } /* }}} int write_pidfile */
429
430 static int remove_pidfile (void) /* {{{ */
431 {
432   char *file;
433   int status;
434
435   file = (config_pid_file != NULL)
436     ? config_pid_file
437     : LOCALSTATEDIR "/run/rrdcached.pid";
438
439   status = unlink (file);
440   if (status == 0)
441     return (0);
442   return (errno);
443 } /* }}} int remove_pidfile */
444
445 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
446 {
447   char *eol;
448
449   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
450                sock->next_read - sock->next_cmd);
451
452   if (eol == NULL)
453   {
454     /* no commands left, move remainder back to front of rbuf */
455     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
456             sock->next_read - sock->next_cmd);
457     sock->next_read -= sock->next_cmd;
458     sock->next_cmd = 0;
459     *len = 0;
460     return NULL;
461   }
462   else
463   {
464     char *cmd = sock->rbuf + sock->next_cmd;
465     *eol = '\0';
466
467     sock->next_cmd = eol - sock->rbuf + 1;
468
469     if (eol > sock->rbuf && *(eol-1) == '\r')
470       *(--eol) = '\0'; /* handle "\r\n" EOL */
471
472     *len = eol - cmd;
473
474     return cmd;
475   }
476
477   /* NOTREACHED */
478   assert(1==0);
479 } /* }}} char *next_cmd */
480
481 /* add the characters directly to the write buffer */
482 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
483 {
484   char *new_buf;
485
486   assert(sock != NULL);
487
488   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
489   if (new_buf == NULL)
490   {
491     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
492     return -1;
493   }
494
495   strncpy(new_buf + sock->wbuf_len, str, len + 1);
496
497   sock->wbuf = new_buf;
498   sock->wbuf_len += len;
499
500   return 0;
501 } /* }}} static int add_to_wbuf */
502
503 /* add the text to the "extra" info that's sent after the status line */
504 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
505 {
506   va_list argp;
507   char buffer[CMD_MAX];
508   int len;
509
510   if (sock == NULL) return 0; /* journal replay mode */
511   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
512
513   va_start(argp, fmt);
514 #ifdef HAVE_VSNPRINTF
515   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
516 #else
517   len = vsprintf(buffer, fmt, argp);
518 #endif
519   va_end(argp);
520   if (len < 0)
521   {
522     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
523     return -1;
524   }
525
526   return add_to_wbuf(sock, buffer, len);
527 } /* }}} static int add_response_info */
528
529 static int count_lines(char *str) /* {{{ */
530 {
531   int lines = 0;
532
533   if (str != NULL)
534   {
535     while ((str = strchr(str, '\n')) != NULL)
536     {
537       ++lines;
538       ++str;
539     }
540   }
541
542   return lines;
543 } /* }}} static int count_lines */
544
545 /* send the response back to the user.
546  * returns 0 on success, -1 on error
547  * write buffer is always zeroed after this call */
548 static int send_response (listen_socket_t *sock, response_code rc,
549                           char *fmt, ...) /* {{{ */
550 {
551   va_list argp;
552   char buffer[CMD_MAX];
553   int lines;
554   ssize_t wrote;
555   int rclen, len;
556
557   if (sock == NULL) return rc;  /* journal replay mode */
558
559   if (sock->batch_start)
560   {
561     if (rc == RESP_OK)
562       return rc; /* no response on success during BATCH */
563     lines = sock->batch_cmd;
564   }
565   else if (rc == RESP_OK)
566     lines = count_lines(sock->wbuf);
567   else
568     lines = -1;
569
570   rclen = sprintf(buffer, "%d ", lines);
571   va_start(argp, fmt);
572 #ifdef HAVE_VSNPRINTF
573   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
574 #else
575   len = vsprintf(buffer+rclen, fmt, argp);
576 #endif
577   va_end(argp);
578   if (len < 0)
579     return -1;
580
581   len += rclen;
582
583   /* append the result to the wbuf, don't write to the user */
584   if (sock->batch_start)
585     return add_to_wbuf(sock, buffer, len);
586
587   /* first write must be complete */
588   if (len != write(sock->fd, buffer, len))
589   {
590     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
591     return -1;
592   }
593
594   if (sock->wbuf != NULL && rc == RESP_OK)
595   {
596     wrote = 0;
597     while (wrote < sock->wbuf_len)
598     {
599       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
600       if (wb <= 0)
601       {
602         RRDD_LOG(LOG_INFO, "send_response: could not write results");
603         return -1;
604       }
605       wrote += wb;
606     }
607   }
608
609   free(sock->wbuf); sock->wbuf = NULL;
610   sock->wbuf_len = 0;
611
612   return 0;
613 } /* }}} */
614
615 static void wipe_ci_values(cache_item_t *ci, time_t when)
616 {
617   ci->values = NULL;
618   ci->values_num = 0;
619
620   ci->last_flush_time = when;
621   if (config_write_jitter > 0)
622     ci->last_flush_time += (rrd_random() % config_write_jitter);
623 }
624
625 /* remove_from_queue
626  * remove a "cache_item_t" item from the queue.
627  * must hold 'cache_lock' when calling this
628  */
629 static void remove_from_queue(cache_item_t *ci) /* {{{ */
630 {
631   if (ci == NULL) return;
632   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
633
634   if (ci->prev == NULL)
635     cache_queue_head = ci->next; /* reset head */
636   else
637     ci->prev->next = ci->next;
638
639   if (ci->next == NULL)
640     cache_queue_tail = ci->prev; /* reset the tail */
641   else
642     ci->next->prev = ci->prev;
643
644   ci->next = ci->prev = NULL;
645   ci->flags &= ~CI_FLAGS_IN_QUEUE;
646
647   pthread_mutex_lock (&stats_lock);
648   assert (stats_queue_length > 0);
649   stats_queue_length--;
650   pthread_mutex_unlock (&stats_lock);
651
652 } /* }}} static void remove_from_queue */
653
654 /* free the resources associated with the cache_item_t
655  * must hold cache_lock when calling this function
656  */
657 static void *free_cache_item(cache_item_t *ci) /* {{{ */
658 {
659   if (ci == NULL) return NULL;
660
661   remove_from_queue(ci);
662
663   for (size_t i=0; i < ci->values_num; i++)
664     free(ci->values[i]);
665
666   free (ci->values);
667   free (ci->file);
668
669   /* in case anyone is waiting */
670   pthread_cond_broadcast(&ci->flushed);
671   pthread_cond_destroy(&ci->flushed);
672
673   free (ci);
674
675   return NULL;
676 } /* }}} static void *free_cache_item */
677
678 /*
679  * enqueue_cache_item:
680  * `cache_lock' must be acquired before calling this function!
681  */
682 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
683     queue_side_t side)
684 {
685   if (ci == NULL)
686     return (-1);
687
688   if (ci->values_num == 0)
689     return (0);
690
691   if (side == HEAD)
692   {
693     if (cache_queue_head == ci)
694       return 0;
695
696     /* remove if further down in queue */
697     remove_from_queue(ci);
698
699     ci->prev = NULL;
700     ci->next = cache_queue_head;
701     if (ci->next != NULL)
702       ci->next->prev = ci;
703     cache_queue_head = ci;
704
705     if (cache_queue_tail == NULL)
706       cache_queue_tail = cache_queue_head;
707   }
708   else /* (side == TAIL) */
709   {
710     /* We don't move values back in the list.. */
711     if (ci->flags & CI_FLAGS_IN_QUEUE)
712       return (0);
713
714     assert (ci->next == NULL);
715     assert (ci->prev == NULL);
716
717     ci->prev = cache_queue_tail;
718
719     if (cache_queue_tail == NULL)
720       cache_queue_head = ci;
721     else
722       cache_queue_tail->next = ci;
723
724     cache_queue_tail = ci;
725   }
726
727   ci->flags |= CI_FLAGS_IN_QUEUE;
728
729   pthread_cond_signal(&queue_cond);
730   pthread_mutex_lock (&stats_lock);
731   stats_queue_length++;
732   pthread_mutex_unlock (&stats_lock);
733
734   return (0);
735 } /* }}} int enqueue_cache_item */
736
737 /*
738  * tree_callback_flush:
739  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
740  * while this is in progress.
741  */
742 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
743     gpointer data)
744 {
745   cache_item_t *ci;
746   callback_flush_data_t *cfd;
747
748   ci = (cache_item_t *) value;
749   cfd = (callback_flush_data_t *) data;
750
751   if (ci->flags & CI_FLAGS_IN_QUEUE)
752     return FALSE;
753
754   if (ci->values_num > 0
755       && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
756   {
757     enqueue_cache_item (ci, TAIL);
758   }
759   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
760       && (ci->values_num <= 0))
761   {
762     assert ((char *) key == ci->file);
763     if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
764     {
765       RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
766       return (FALSE);
767     }
768   }
769
770   return (FALSE);
771 } /* }}} gboolean tree_callback_flush */
772
773 static int flush_old_values (int max_age)
774 {
775   callback_flush_data_t cfd;
776   size_t k;
777
778   memset (&cfd, 0, sizeof (cfd));
779   /* Pass the current time as user data so that we don't need to call
780    * `time' for each node. */
781   cfd.now = time (NULL);
782   cfd.keys = NULL;
783   cfd.keys_num = 0;
784
785   if (max_age > 0)
786     cfd.abs_timeout = cfd.now - max_age;
787   else
788     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
789
790   /* `tree_callback_flush' will return the keys of all values that haven't
791    * been touched in the last `config_flush_interval' seconds in `cfd'.
792    * The char*'s in this array point to the same memory as ci->file, so we
793    * don't need to free them separately. */
794   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
795
796   for (k = 0; k < cfd.keys_num; k++)
797   {
798     /* should never fail, since we have held the cache_lock
799      * the entire time */
800     assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
801   }
802
803   if (cfd.keys != NULL)
804   {
805     free (cfd.keys);
806     cfd.keys = NULL;
807   }
808
809   return (0);
810 } /* int flush_old_values */
811
812 static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
813 {
814   struct timeval now;
815   struct timespec next_flush;
816   int status;
817
818   gettimeofday (&now, NULL);
819   next_flush.tv_sec = now.tv_sec + config_flush_interval;
820   next_flush.tv_nsec = 1000 * now.tv_usec;
821
822   pthread_mutex_lock(&cache_lock);
823
824   while (state == RUNNING)
825   {
826     gettimeofday (&now, NULL);
827     if ((now.tv_sec > next_flush.tv_sec)
828         || ((now.tv_sec == next_flush.tv_sec)
829           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
830     {
831       RRDD_LOG(LOG_DEBUG, "flushing old values");
832
833       /* Determine the time of the next cache flush. */
834       next_flush.tv_sec = now.tv_sec + config_flush_interval;
835
836       /* Flush all values that haven't been written in the last
837        * `config_write_interval' seconds. */
838       flush_old_values (config_write_interval);
839
840       /* unlock the cache while we rotate so we don't block incoming
841        * updates if the fsync() blocks on disk I/O */
842       pthread_mutex_unlock(&cache_lock);
843       journal_rotate();
844       pthread_mutex_lock(&cache_lock);
845     }
846
847     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
848     if (status != 0 && status != ETIMEDOUT)
849     {
850       RRDD_LOG (LOG_ERR, "flush_thread_main: "
851                 "pthread_cond_timedwait returned %i.", status);
852     }
853   }
854
855   if (config_flush_at_shutdown)
856     flush_old_values (-1); /* flush everything */
857
858   state = SHUTDOWN;
859
860   pthread_mutex_unlock(&cache_lock);
861
862   return NULL;
863 } /* void *flush_thread_main */
864
865 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
866 {
867   pthread_mutex_lock (&cache_lock);
868
869   while (state != SHUTDOWN
870          || (cache_queue_head != NULL && config_flush_at_shutdown))
871   {
872     cache_item_t *ci;
873     char *file;
874     char **values;
875     size_t values_num;
876     int status;
877
878     /* Now, check if there's something to store away. If not, wait until
879      * something comes in. */
880     if (cache_queue_head == NULL)
881     {
882       status = pthread_cond_wait (&queue_cond, &cache_lock);
883       if ((status != 0) && (status != ETIMEDOUT))
884       {
885         RRDD_LOG (LOG_ERR, "queue_thread_main: "
886             "pthread_cond_wait returned %i.", status);
887       }
888     }
889
890     /* Check if a value has arrived. This may be NULL if we timed out or there
891      * was an interrupt such as a signal. */
892     if (cache_queue_head == NULL)
893       continue;
894
895     ci = cache_queue_head;
896
897     /* copy the relevant parts */
898     file = strdup (ci->file);
899     if (file == NULL)
900     {
901       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
902       continue;
903     }
904
905     assert(ci->values != NULL);
906     assert(ci->values_num > 0);
907
908     values = ci->values;
909     values_num = ci->values_num;
910
911     wipe_ci_values(ci, time(NULL));
912     remove_from_queue(ci);
913
914     pthread_mutex_unlock (&cache_lock);
915
916     rrd_clear_error ();
917     status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
918     if (status != 0)
919     {
920       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
921           "rrd_update_r (%s) failed with status %i. (%s)",
922           file, status, rrd_get_error());
923     }
924
925     journal_write("wrote", file);
926
927     /* Search again in the tree.  It's possible someone issued a "FORGET"
928      * while we were writing the update values. */
929     pthread_mutex_lock(&cache_lock);
930     ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
931     if (ci)
932       pthread_cond_broadcast(&ci->flushed);
933     pthread_mutex_unlock(&cache_lock);
934
935     if (status == 0)
936     {
937       pthread_mutex_lock (&stats_lock);
938       stats_updates_written++;
939       stats_data_sets_written += values_num;
940       pthread_mutex_unlock (&stats_lock);
941     }
942
943     rrd_free_ptrs((void ***) &values, &values_num);
944     free(file);
945
946     pthread_mutex_lock (&cache_lock);
947   }
948   pthread_mutex_unlock (&cache_lock);
949
950   return (NULL);
951 } /* }}} void *queue_thread_main */
952
953 static int buffer_get_field (char **buffer_ret, /* {{{ */
954     size_t *buffer_size_ret, char **field_ret)
955 {
956   char *buffer;
957   size_t buffer_pos;
958   size_t buffer_size;
959   char *field;
960   size_t field_size;
961   int status;
962
963   buffer = *buffer_ret;
964   buffer_pos = 0;
965   buffer_size = *buffer_size_ret;
966   field = *buffer_ret;
967   field_size = 0;
968
969   if (buffer_size <= 0)
970     return (-1);
971
972   /* This is ensured by `handle_request'. */
973   assert (buffer[buffer_size - 1] == '\0');
974
975   status = -1;
976   while (buffer_pos < buffer_size)
977   {
978     /* Check for end-of-field or end-of-buffer */
979     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
980     {
981       field[field_size] = 0;
982       field_size++;
983       buffer_pos++;
984       status = 0;
985       break;
986     }
987     /* Handle escaped characters. */
988     else if (buffer[buffer_pos] == '\\')
989     {
990       if (buffer_pos >= (buffer_size - 1))
991         break;
992       buffer_pos++;
993       field[field_size] = buffer[buffer_pos];
994       field_size++;
995       buffer_pos++;
996     }
997     /* Normal operation */ 
998     else
999     {
1000       field[field_size] = buffer[buffer_pos];
1001       field_size++;
1002       buffer_pos++;
1003     }
1004   } /* while (buffer_pos < buffer_size) */
1005
1006   if (status != 0)
1007     return (status);
1008
1009   *buffer_ret = buffer + buffer_pos;
1010   *buffer_size_ret = buffer_size - buffer_pos;
1011   *field_ret = field;
1012
1013   return (0);
1014 } /* }}} int buffer_get_field */
1015
1016 /* if we're restricting writes to the base directory,
1017  * check whether the file falls within the dir
1018  * returns 1 if OK, otherwise 0
1019  */
1020 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1021 {
1022   assert(file != NULL);
1023
1024   if (!config_write_base_only
1025       || sock == NULL /* journal replay */
1026       || config_base_dir == NULL)
1027     return 1;
1028
1029   if (strstr(file, "../") != NULL) goto err;
1030
1031   /* relative paths without "../" are ok */
1032   if (*file != '/') return 1;
1033
1034   /* file must be of the format base + "/" + <1+ char filename> */
1035   if (strlen(file) < _config_base_dir_len + 2) goto err;
1036   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1037   if (*(file + _config_base_dir_len) != '/') goto err;
1038
1039   return 1;
1040
1041 err:
1042   if (sock != NULL && sock->fd >= 0)
1043     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1044
1045   return 0;
1046 } /* }}} static int check_file_access */
1047
1048 /* when using a base dir, convert relative paths to absolute paths.
1049  * if necessary, modifies the "filename" pointer to point
1050  * to the new path created in "tmp".  "tmp" is provided
1051  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1052  *
1053  * this allows us to optimize for the expected case (absolute path)
1054  * with a no-op.
1055  */
1056 static void get_abs_path(char **filename, char *tmp)
1057 {
1058   assert(tmp != NULL);
1059   assert(filename != NULL && *filename != NULL);
1060
1061   if (config_base_dir == NULL || **filename == '/')
1062     return;
1063
1064   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1065   *filename = tmp;
1066 } /* }}} static int get_abs_path */
1067
1068 static int flush_file (const char *filename) /* {{{ */
1069 {
1070   cache_item_t *ci;
1071
1072   pthread_mutex_lock (&cache_lock);
1073
1074   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1075   if (ci == NULL)
1076   {
1077     pthread_mutex_unlock (&cache_lock);
1078     return (ENOENT);
1079   }
1080
1081   if (ci->values_num > 0)
1082   {
1083     /* Enqueue at head */
1084     enqueue_cache_item (ci, HEAD);
1085     pthread_cond_wait(&ci->flushed, &cache_lock);
1086   }
1087
1088   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1089    * may have been purged during our cond_wait() */
1090
1091   pthread_mutex_unlock(&cache_lock);
1092
1093   return (0);
1094 } /* }}} int flush_file */
1095
1096 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1097 {
1098   char *err = "Syntax error.\n";
1099
1100   if (cmd && cmd->syntax)
1101     err = cmd->syntax;
1102
1103   return send_response(sock, RESP_ERR, "Usage: %s", err);
1104 } /* }}} static int syntax_error() */
1105
1106 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1107 {
1108   uint64_t copy_queue_length;
1109   uint64_t copy_updates_received;
1110   uint64_t copy_flush_received;
1111   uint64_t copy_updates_written;
1112   uint64_t copy_data_sets_written;
1113   uint64_t copy_journal_bytes;
1114   uint64_t copy_journal_rotate;
1115
1116   uint64_t tree_nodes_number;
1117   uint64_t tree_depth;
1118
1119   pthread_mutex_lock (&stats_lock);
1120   copy_queue_length       = stats_queue_length;
1121   copy_updates_received   = stats_updates_received;
1122   copy_flush_received     = stats_flush_received;
1123   copy_updates_written    = stats_updates_written;
1124   copy_data_sets_written  = stats_data_sets_written;
1125   copy_journal_bytes      = stats_journal_bytes;
1126   copy_journal_rotate     = stats_journal_rotate;
1127   pthread_mutex_unlock (&stats_lock);
1128
1129   pthread_mutex_lock (&cache_lock);
1130   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1131   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1132   pthread_mutex_unlock (&cache_lock);
1133
1134   add_response_info(sock,
1135                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1136   add_response_info(sock,
1137                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1138   add_response_info(sock,
1139                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1140   add_response_info(sock,
1141                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1142   add_response_info(sock,
1143                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1144   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1145   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1146   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1147   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1148
1149   send_response(sock, RESP_OK, "Statistics follow\n");
1150
1151   return (0);
1152 } /* }}} int handle_request_stats */
1153
1154 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1155 {
1156   char *file, file_tmp[PATH_MAX];
1157   int status;
1158
1159   status = buffer_get_field (&buffer, &buffer_size, &file);
1160   if (status != 0)
1161   {
1162     return syntax_error(sock,cmd);
1163   }
1164   else
1165   {
1166     pthread_mutex_lock(&stats_lock);
1167     stats_flush_received++;
1168     pthread_mutex_unlock(&stats_lock);
1169
1170     get_abs_path(&file, file_tmp);
1171     if (!check_file_access(file, sock)) return 0;
1172
1173     status = flush_file (file);
1174     if (status == 0)
1175       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1176     else if (status == ENOENT)
1177     {
1178       /* no file in our tree; see whether it exists at all */
1179       struct stat statbuf;
1180
1181       memset(&statbuf, 0, sizeof(statbuf));
1182       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1183         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1184       else
1185         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1186     }
1187     else if (status < 0)
1188       return send_response(sock, RESP_ERR, "Internal error.\n");
1189     else
1190       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1191   }
1192
1193   /* NOTREACHED */
1194   assert(1==0);
1195 } /* }}} int handle_request_flush */
1196
1197 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1198 {
1199   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1200
1201   pthread_mutex_lock(&cache_lock);
1202   flush_old_values(-1);
1203   pthread_mutex_unlock(&cache_lock);
1204
1205   return send_response(sock, RESP_OK, "Started flush.\n");
1206 } /* }}} static int handle_request_flushall */
1207
1208 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1209 {
1210   int status;
1211   char *file, file_tmp[PATH_MAX];
1212   cache_item_t *ci;
1213
1214   status = buffer_get_field(&buffer, &buffer_size, &file);
1215   if (status != 0)
1216     return syntax_error(sock,cmd);
1217
1218   get_abs_path(&file, file_tmp);
1219
1220   pthread_mutex_lock(&cache_lock);
1221   ci = g_tree_lookup(cache_tree, file);
1222   if (ci == NULL)
1223   {
1224     pthread_mutex_unlock(&cache_lock);
1225     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1226   }
1227
1228   for (size_t i=0; i < ci->values_num; i++)
1229     add_response_info(sock, "%s\n", ci->values[i]);
1230
1231   pthread_mutex_unlock(&cache_lock);
1232   return send_response(sock, RESP_OK, "updates pending\n");
1233 } /* }}} static int handle_request_pending */
1234
1235 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1236 {
1237   int status;
1238   gboolean found;
1239   char *file, file_tmp[PATH_MAX];
1240
1241   status = buffer_get_field(&buffer, &buffer_size, &file);
1242   if (status != 0)
1243     return syntax_error(sock,cmd);
1244
1245   get_abs_path(&file, file_tmp);
1246   if (!check_file_access(file, sock)) return 0;
1247
1248   pthread_mutex_lock(&cache_lock);
1249   found = g_tree_remove(cache_tree, file);
1250   pthread_mutex_unlock(&cache_lock);
1251
1252   if (found == TRUE)
1253   {
1254     if (sock != NULL)
1255       journal_write("forget", file);
1256
1257     return send_response(sock, RESP_OK, "Gone!\n");
1258   }
1259   else
1260     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1261
1262   /* NOTREACHED */
1263   assert(1==0);
1264 } /* }}} static int handle_request_forget */
1265
1266 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1267 {
1268   cache_item_t *ci;
1269
1270   pthread_mutex_lock(&cache_lock);
1271
1272   ci = cache_queue_head;
1273   while (ci != NULL)
1274   {
1275     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1276     ci = ci->next;
1277   }
1278
1279   pthread_mutex_unlock(&cache_lock);
1280
1281   return send_response(sock, RESP_OK, "in queue.\n");
1282 } /* }}} int handle_request_queue */
1283
1284 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1285 {
1286   char *file, file_tmp[PATH_MAX];
1287   int values_num = 0;
1288   int status;
1289   char orig_buf[CMD_MAX];
1290
1291   cache_item_t *ci;
1292
1293   /* save it for the journal later */
1294   strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1295
1296   status = buffer_get_field (&buffer, &buffer_size, &file);
1297   if (status != 0)
1298     return syntax_error(sock,cmd);
1299
1300   pthread_mutex_lock(&stats_lock);
1301   stats_updates_received++;
1302   pthread_mutex_unlock(&stats_lock);
1303
1304   get_abs_path(&file, file_tmp);
1305   if (!check_file_access(file, sock)) return 0;
1306
1307   pthread_mutex_lock (&cache_lock);
1308   ci = g_tree_lookup (cache_tree, file);
1309
1310   if (ci == NULL) /* {{{ */
1311   {
1312     struct stat statbuf;
1313     cache_item_t *tmp;
1314
1315     /* don't hold the lock while we setup; stat(2) might block */
1316     pthread_mutex_unlock(&cache_lock);
1317
1318     memset (&statbuf, 0, sizeof (statbuf));
1319     status = stat (file, &statbuf);
1320     if (status != 0)
1321     {
1322       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1323
1324       status = errno;
1325       if (status == ENOENT)
1326         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1327       else
1328         return send_response(sock, RESP_ERR,
1329                              "stat failed with error %i.\n", status);
1330     }
1331     if (!S_ISREG (statbuf.st_mode))
1332       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1333
1334     if (access(file, R_OK|W_OK) != 0)
1335       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1336                            file, rrd_strerror(errno));
1337
1338     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1339     if (ci == NULL)
1340     {
1341       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1342
1343       return send_response(sock, RESP_ERR, "malloc failed.\n");
1344     }
1345     memset (ci, 0, sizeof (cache_item_t));
1346
1347     ci->file = strdup (file);
1348     if (ci->file == NULL)
1349     {
1350       free (ci);
1351       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1352
1353       return send_response(sock, RESP_ERR, "strdup failed.\n");
1354     }
1355
1356     wipe_ci_values(ci, now);
1357     ci->flags = CI_FLAGS_IN_TREE;
1358     pthread_cond_init(&ci->flushed, NULL);
1359
1360     pthread_mutex_lock(&cache_lock);
1361
1362     /* another UPDATE might have added this entry in the meantime */
1363     tmp = g_tree_lookup (cache_tree, file);
1364     if (tmp == NULL)
1365       g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1366     else
1367     {
1368       free_cache_item (ci);
1369       ci = tmp;
1370     }
1371
1372     /* state may have changed while we were unlocked */
1373     if (state == SHUTDOWN)
1374       return -1;
1375   } /* }}} */
1376   assert (ci != NULL);
1377
1378   /* don't re-write updates in replay mode */
1379   if (sock != NULL)
1380     journal_write("update", orig_buf);
1381
1382   while (buffer_size > 0)
1383   {
1384     char *value;
1385     time_t stamp;
1386     char *eostamp;
1387
1388     status = buffer_get_field (&buffer, &buffer_size, &value);
1389     if (status != 0)
1390     {
1391       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1392       break;
1393     }
1394
1395     /* make sure update time is always moving forward */
1396     stamp = strtol(value, &eostamp, 10);
1397     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1398     {
1399       pthread_mutex_unlock(&cache_lock);
1400       return send_response(sock, RESP_ERR,
1401                            "Cannot find timestamp in '%s'!\n", value);
1402     }
1403     else if (stamp <= ci->last_update_stamp)
1404     {
1405       pthread_mutex_unlock(&cache_lock);
1406       return send_response(sock, RESP_ERR,
1407                            "illegal attempt to update using time %ld when last"
1408                            " update time is %ld (minimum one second step)\n",
1409                            stamp, ci->last_update_stamp);
1410     }
1411     else
1412       ci->last_update_stamp = stamp;
1413
1414     if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
1415     {
1416       RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1417       continue;
1418     }
1419
1420     values_num++;
1421   }
1422
1423   if (((now - ci->last_flush_time) >= config_write_interval)
1424       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1425       && (ci->values_num > 0))
1426   {
1427     enqueue_cache_item (ci, TAIL);
1428   }
1429
1430   pthread_mutex_unlock (&cache_lock);
1431
1432   if (values_num < 1)
1433     return send_response(sock, RESP_ERR, "No values updated.\n");
1434   else
1435     return send_response(sock, RESP_OK,
1436                          "errors, enqueued %i value(s).\n", values_num);
1437
1438   /* NOTREACHED */
1439   assert(1==0);
1440
1441 } /* }}} int handle_request_update */
1442
1443 /* we came across a "WROTE" entry during journal replay.
1444  * throw away any values that we have accumulated for this file
1445  */
1446 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1447 {
1448   cache_item_t *ci;
1449   const char *file = buffer;
1450
1451   pthread_mutex_lock(&cache_lock);
1452
1453   ci = g_tree_lookup(cache_tree, file);
1454   if (ci == NULL)
1455   {
1456     pthread_mutex_unlock(&cache_lock);
1457     return (0);
1458   }
1459
1460   if (ci->values)
1461     rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1462
1463   wipe_ci_values(ci, now);
1464   remove_from_queue(ci);
1465
1466   pthread_mutex_unlock(&cache_lock);
1467   return (0);
1468 } /* }}} int handle_request_wrote */
1469
1470 /* start "BATCH" processing */
1471 static int batch_start (HANDLER_PROTO) /* {{{ */
1472 {
1473   int status;
1474   if (sock->batch_start)
1475     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1476
1477   status = send_response(sock, RESP_OK,
1478                          "Go ahead.  End with dot '.' on its own line.\n");
1479   sock->batch_start = time(NULL);
1480   sock->batch_cmd = 0;
1481
1482   return status;
1483 } /* }}} static int batch_start */
1484
1485 /* finish "BATCH" processing and return results to the client */
1486 static int batch_done (HANDLER_PROTO) /* {{{ */
1487 {
1488   assert(sock->batch_start);
1489   sock->batch_start = 0;
1490   sock->batch_cmd  = 0;
1491   return send_response(sock, RESP_OK, "errors\n");
1492 } /* }}} static int batch_done */
1493
1494 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1495 {
1496   return -1;
1497 } /* }}} static int handle_request_quit */
1498
1499 static command_t list_of_commands[] = { /* {{{ */
1500   {
1501     "UPDATE",
1502     handle_request_update,
1503     CMD_CONTEXT_ANY,
1504     "UPDATE <filename> <values> [<values> ...]\n"
1505     ,
1506     "Adds the given file to the internal cache if it is not yet known and\n"
1507     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1508     "for details.\n"
1509     "\n"
1510     "Each <values> has the following form:\n"
1511     "  <values> = <time>:<value>[:<value>[...]]\n"
1512     "See the rrdupdate(1) manpage for details.\n"
1513   },
1514   {
1515     "WROTE",
1516     handle_request_wrote,
1517     CMD_CONTEXT_JOURNAL,
1518     NULL,
1519     NULL
1520   },
1521   {
1522     "FLUSH",
1523     handle_request_flush,
1524     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1525     "FLUSH <filename>\n"
1526     ,
1527     "Adds the given filename to the head of the update queue and returns\n"
1528     "after it has been dequeued.\n"
1529   },
1530   {
1531     "FLUSHALL",
1532     handle_request_flushall,
1533     CMD_CONTEXT_CLIENT,
1534     "FLUSHALL\n"
1535     ,
1536     "Triggers writing of all pending updates.  Returns immediately.\n"
1537   },
1538   {
1539     "PENDING",
1540     handle_request_pending,
1541     CMD_CONTEXT_CLIENT,
1542     "PENDING <filename>\n"
1543     ,
1544     "Shows any 'pending' updates for a file, in order.\n"
1545     "The updates shown have not yet been written to the underlying RRD file.\n"
1546   },
1547   {
1548     "FORGET",
1549     handle_request_forget,
1550     CMD_CONTEXT_ANY,
1551     "FORGET <filename>\n"
1552     ,
1553     "Removes the file completely from the cache.\n"
1554     "Any pending updates for the file will be lost.\n"
1555   },
1556   {
1557     "QUEUE",
1558     handle_request_queue,
1559     CMD_CONTEXT_CLIENT,
1560     "QUEUE\n"
1561     ,
1562         "Shows all files in the output queue.\n"
1563     "The output is zero or more lines in the following format:\n"
1564     "(where <num_vals> is the number of values to be written)\n"
1565     "\n"
1566     "<num_vals> <filename>\n"
1567   },
1568   {
1569     "STATS",
1570     handle_request_stats,
1571     CMD_CONTEXT_CLIENT,
1572     "STATS\n"
1573     ,
1574     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1575     "a description of the values.\n"
1576   },
1577   {
1578     "HELP",
1579     handle_request_help,
1580     CMD_CONTEXT_CLIENT,
1581     "HELP [<command>]\n",
1582     NULL, /* special! */
1583   },
1584   {
1585     "BATCH",
1586     batch_start,
1587     CMD_CONTEXT_CLIENT,
1588     "BATCH\n"
1589     ,
1590     "The 'BATCH' command permits the client to initiate a bulk load\n"
1591     "   of commands to rrdcached.\n"
1592     "\n"
1593     "Usage:\n"
1594     "\n"
1595     "    client: BATCH\n"
1596     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1597     "    client: command #1\n"
1598     "    client: command #2\n"
1599     "    client: ... and so on\n"
1600     "    client: .\n"
1601     "    server: 2 errors\n"
1602     "    server: 7 message for command #7\n"
1603     "    server: 9 message for command #9\n"
1604     "\n"
1605     "For more information, consult the rrdcached(1) documentation.\n"
1606   },
1607   {
1608     ".",   /* BATCH terminator */
1609     batch_done,
1610     CMD_CONTEXT_BATCH,
1611     NULL,
1612     NULL
1613   },
1614   {
1615     "QUIT",
1616     handle_request_quit,
1617     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1618     "QUIT\n"
1619     ,
1620     "Disconnect from rrdcached.\n"
1621   }
1622 }; /* }}} command_t list_of_commands[] */
1623 static size_t list_of_commands_len = sizeof (list_of_commands)
1624   / sizeof (list_of_commands[0]);
1625
1626 static command_t *find_command(char *cmd)
1627 {
1628   size_t i;
1629
1630   for (i = 0; i < list_of_commands_len; i++)
1631     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1632       return (&list_of_commands[i]);
1633   return NULL;
1634 }
1635
1636 /* We currently use the index in the `list_of_commands' array as a bit position
1637  * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
1638  * outside these functions so that switching to a more elegant storage method
1639  * is easily possible. */
1640 static ssize_t find_command_index (const char *cmd) /* {{{ */
1641 {
1642   size_t i;
1643
1644   for (i = 0; i < list_of_commands_len; i++)
1645     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1646       return ((ssize_t) i);
1647   return (-1);
1648 } /* }}} ssize_t find_command_index */
1649
1650 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
1651     const char *cmd)
1652 {
1653   ssize_t i;
1654
1655   if (cmd == NULL)
1656     return (-1);
1657
1658   if ((strcasecmp ("QUIT", cmd) == 0)
1659       || (strcasecmp ("HELP", cmd) == 0))
1660     return (1);
1661   else if (strcmp (".", cmd) == 0)
1662     cmd = "BATCH";
1663
1664   i = find_command_index (cmd);
1665   if (i < 0)
1666     return (-1);
1667   assert (i < 32);
1668
1669   if ((sock->permissions & (1 << i)) != 0)
1670     return (1);
1671   return (0);
1672 } /* }}} int socket_permission_check */
1673
1674 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
1675     const char *cmd)
1676 {
1677   ssize_t i;
1678
1679   i = find_command_index (cmd);
1680   if (i < 0)
1681     return (-1);
1682   assert (i < 32);
1683
1684   sock->permissions |= (1 << i);
1685   return (0);
1686 } /* }}} int socket_permission_add */
1687
1688 /* check whether commands are received in the expected context */
1689 static int command_check_context(listen_socket_t *sock, command_t *cmd)
1690 {
1691   if (sock == NULL)
1692     return (cmd->context & CMD_CONTEXT_JOURNAL);
1693   else if (sock->batch_start)
1694     return (cmd->context & CMD_CONTEXT_BATCH);
1695   else
1696     return (cmd->context & CMD_CONTEXT_CLIENT);
1697
1698   /* NOTREACHED */
1699   assert(1==0);
1700 }
1701
1702 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1703 {
1704   int status;
1705   char *cmd_str;
1706   char *resp_txt;
1707   command_t *help = NULL;
1708
1709   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1710   if (status == 0)
1711     help = find_command(cmd_str);
1712
1713   if (help && (help->syntax || help->help))
1714   {
1715     char tmp[CMD_MAX];
1716
1717     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1718     resp_txt = tmp;
1719
1720     if (help->syntax)
1721       add_response_info(sock, "Usage: %s\n", help->syntax);
1722
1723     if (help->help)
1724       add_response_info(sock, "%s\n", help->help);
1725   }
1726   else
1727   {
1728     size_t i;
1729
1730     resp_txt = "Command overview\n";
1731
1732     for (i = 0; i < list_of_commands_len; i++)
1733     {
1734       if (list_of_commands[i].syntax == NULL)
1735         continue;
1736       add_response_info (sock, "%s", list_of_commands[i].syntax);
1737     }
1738   }
1739
1740   return send_response(sock, RESP_OK, resp_txt);
1741 } /* }}} int handle_request_help */
1742
1743 /* if sock==NULL, we are in journal replay mode */
1744 static int handle_request (DISPATCH_PROTO) /* {{{ */
1745 {
1746   char *buffer_ptr = buffer;
1747   char *cmd_str = NULL;
1748   command_t *cmd = NULL;
1749   int status;
1750
1751   assert (buffer[buffer_size - 1] == '\0');
1752
1753   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1754   if (status != 0)
1755   {
1756     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1757     return (-1);
1758   }
1759
1760   if (sock != NULL && sock->batch_start)
1761     sock->batch_cmd++;
1762
1763   cmd = find_command(cmd_str);
1764   if (!cmd)
1765     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1766
1767   if (!socket_permission_check (sock, cmd->cmd))
1768     return send_response(sock, RESP_ERR, "Permission denied.\n");
1769
1770   if (!command_check_context(sock, cmd))
1771     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1772
1773   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1774 } /* }}} int handle_request */
1775
1776 static void journal_set_free (journal_set *js) /* {{{ */
1777 {
1778   if (js == NULL)
1779     return;
1780
1781   rrd_free_ptrs((void ***) &js->files, &js->files_num);
1782
1783   free(js);
1784 } /* }}} journal_set_free */
1785
1786 static void journal_set_remove (journal_set *js) /* {{{ */
1787 {
1788   if (js == NULL)
1789     return;
1790
1791   for (uint i=0; i < js->files_num; i++)
1792   {
1793     RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
1794     unlink(js->files[i]);
1795   }
1796 } /* }}} journal_set_remove */
1797
1798 /* close current journal file handle.
1799  * MUST hold journal_lock before calling */
1800 static void journal_close(void) /* {{{ */
1801 {
1802   if (journal_fh != NULL)
1803   {
1804     if (fclose(journal_fh) != 0)
1805       RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
1806   }
1807
1808   journal_fh = NULL;
1809   journal_size = 0;
1810 } /* }}} journal_close */
1811
1812 /* MUST hold journal_lock before calling */
1813 static void journal_new_file(void) /* {{{ */
1814 {
1815   struct timeval now;
1816   int  new_fd;
1817   char new_file[PATH_MAX + 1];
1818
1819   assert(journal_dir != NULL);
1820   assert(journal_cur != NULL);
1821
1822   journal_close();
1823
1824   gettimeofday(&now, NULL);
1825   /* this format assures that the files sort in strcmp() order */
1826   snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
1827            journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
1828
1829   new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
1830                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1831   if (new_fd < 0)
1832     goto error;
1833
1834   journal_fh = fdopen(new_fd, "a");
1835   if (journal_fh == NULL)
1836     goto error;
1837
1838   journal_size = ftell(journal_fh);
1839   RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
1840
1841   /* record the file in the journal set */
1842   rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
1843
1844   return;
1845
1846 error:
1847   RRDD_LOG(LOG_CRIT,
1848            "JOURNALING DISABLED: Error while trying to create %s : %s",
1849            new_file, rrd_strerror(errno));
1850   RRDD_LOG(LOG_CRIT,
1851            "JOURNALING DISABLED: All values will be flushed at shutdown");
1852
1853   close(new_fd);
1854   config_flush_at_shutdown = 1;
1855
1856 } /* }}} journal_new_file */
1857
1858 /* MUST NOT hold journal_lock before calling this */
1859 static void journal_rotate(void) /* {{{ */
1860 {
1861   journal_set *old_js = NULL;
1862
1863   if (journal_dir == NULL)
1864     return;
1865
1866   RRDD_LOG(LOG_DEBUG, "rotating journals");
1867
1868   pthread_mutex_lock(&stats_lock);
1869   ++stats_journal_rotate;
1870   pthread_mutex_unlock(&stats_lock);
1871
1872   pthread_mutex_lock(&journal_lock);
1873
1874   journal_close();
1875
1876   /* rotate the journal sets */
1877   old_js = journal_old;
1878   journal_old = journal_cur;
1879   journal_cur = calloc(1, sizeof(journal_set));
1880
1881   if (journal_cur != NULL)
1882     journal_new_file();
1883   else
1884     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
1885
1886   pthread_mutex_unlock(&journal_lock);
1887
1888   journal_set_remove(old_js);
1889   journal_set_free  (old_js);
1890
1891 } /* }}} static void journal_rotate */
1892
1893 /* MUST hold journal_lock when calling */
1894 static void journal_done(void) /* {{{ */
1895 {
1896   if (journal_cur == NULL)
1897     return;
1898
1899   journal_close();
1900
1901   if (config_flush_at_shutdown)
1902   {
1903     RRDD_LOG(LOG_INFO, "removing journals");
1904     journal_set_remove(journal_old);
1905     journal_set_remove(journal_cur);
1906   }
1907   else
1908   {
1909     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1910              "journals will be used at next startup");
1911   }
1912
1913   journal_set_free(journal_cur);
1914   journal_set_free(journal_old);
1915   free(journal_dir);
1916
1917 } /* }}} static void journal_done */
1918
1919 static int journal_write(char *cmd, char *args) /* {{{ */
1920 {
1921   int chars;
1922
1923   if (journal_fh == NULL)
1924     return 0;
1925
1926   pthread_mutex_lock(&journal_lock);
1927   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1928   journal_size += chars;
1929
1930   if (journal_size > JOURNAL_MAX)
1931     journal_new_file();
1932
1933   pthread_mutex_unlock(&journal_lock);
1934
1935   if (chars > 0)
1936   {
1937     pthread_mutex_lock(&stats_lock);
1938     stats_journal_bytes += chars;
1939     pthread_mutex_unlock(&stats_lock);
1940   }
1941
1942   return chars;
1943 } /* }}} static int journal_write */
1944
1945 static int journal_replay (const char *file) /* {{{ */
1946 {
1947   FILE *fh;
1948   int entry_cnt = 0;
1949   int fail_cnt = 0;
1950   uint64_t line = 0;
1951   char entry[CMD_MAX];
1952   time_t now;
1953
1954   if (file == NULL) return 0;
1955
1956   {
1957     char *reason = "unknown error";
1958     int status = 0;
1959     struct stat statbuf;
1960
1961     memset(&statbuf, 0, sizeof(statbuf));
1962     if (stat(file, &statbuf) != 0)
1963     {
1964       reason = "stat error";
1965       status = errno;
1966     }
1967     else if (!S_ISREG(statbuf.st_mode))
1968     {
1969       reason = "not a regular file";
1970       status = EPERM;
1971     }
1972     if (statbuf.st_uid != daemon_uid)
1973     {
1974       reason = "not owned by daemon user";
1975       status = EACCES;
1976     }
1977     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1978     {
1979       reason = "must not be user/group writable";
1980       status = EACCES;
1981     }
1982
1983     if (status != 0)
1984     {
1985       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1986                file, rrd_strerror(status), reason);
1987       return 0;
1988     }
1989   }
1990
1991   fh = fopen(file, "r");
1992   if (fh == NULL)
1993   {
1994     if (errno != ENOENT)
1995       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1996                file, rrd_strerror(errno));
1997     return 0;
1998   }
1999   else
2000     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2001
2002   now = time(NULL);
2003
2004   while(!feof(fh))
2005   {
2006     size_t entry_len;
2007
2008     ++line;
2009     if (fgets(entry, sizeof(entry), fh) == NULL)
2010       break;
2011     entry_len = strlen(entry);
2012
2013     /* check \n termination in case journal writing crashed mid-line */
2014     if (entry_len == 0)
2015       continue;
2016     else if (entry[entry_len - 1] != '\n')
2017     {
2018       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2019       ++fail_cnt;
2020       continue;
2021     }
2022
2023     entry[entry_len - 1] = '\0';
2024
2025     if (handle_request(NULL, now, entry, entry_len) == 0)
2026       ++entry_cnt;
2027     else
2028       ++fail_cnt;
2029   }
2030
2031   fclose(fh);
2032
2033   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2034            entry_cnt, fail_cnt);
2035
2036   return entry_cnt > 0 ? 1 : 0;
2037 } /* }}} static int journal_replay */
2038
2039 static int journal_sort(const void *v1, const void *v2)
2040 {
2041   char **jn1 = (char **) v1;
2042   char **jn2 = (char **) v2;
2043
2044   return strcmp(*jn1,*jn2);
2045 }
2046
2047 static void journal_init(void) /* {{{ */
2048 {
2049   int had_journal = 0;
2050   DIR *dir;
2051   struct dirent *dent;
2052   char path[PATH_MAX+1];
2053
2054   if (journal_dir == NULL) return;
2055
2056   pthread_mutex_lock(&journal_lock);
2057
2058   journal_cur = calloc(1, sizeof(journal_set));
2059   if (journal_cur == NULL)
2060   {
2061     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2062     return;
2063   }
2064
2065   RRDD_LOG(LOG_INFO, "checking for journal files");
2066
2067   /* Handle old journal files during transition.  This gives them the
2068    * correct sort order.  TODO: remove after first release
2069    */
2070   {
2071     char old_path[PATH_MAX+1];
2072     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2073     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2074     rename(old_path, path);
2075
2076     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE        );
2077     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2078     rename(old_path, path);
2079   }
2080
2081   dir = opendir(journal_dir);
2082   while ((dent = readdir(dir)) != NULL)
2083   {
2084     /* looks like a journal file? */
2085     if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2086       continue;
2087
2088     snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2089
2090     if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2091     {
2092       RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2093                dent->d_name);
2094       break;
2095     }
2096   }
2097   closedir(dir);
2098
2099   qsort(journal_cur->files, journal_cur->files_num,
2100         sizeof(journal_cur->files[0]), journal_sort);
2101
2102   for (uint i=0; i < journal_cur->files_num; i++)
2103     had_journal += journal_replay(journal_cur->files[i]);
2104
2105   journal_new_file();
2106
2107   /* it must have been a crash.  start a flush */
2108   if (had_journal && config_flush_at_shutdown)
2109     flush_old_values(-1);
2110
2111   pthread_mutex_unlock(&journal_lock);
2112
2113   RRDD_LOG(LOG_INFO, "journal processing complete");
2114
2115 } /* }}} static void journal_init */
2116
2117 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2118 {
2119   assert(sock != NULL);
2120
2121   free(sock->rbuf);  sock->rbuf = NULL;
2122   free(sock->wbuf);  sock->wbuf = NULL;
2123   free(sock);
2124 } /* }}} void free_listen_socket */
2125
2126 static void close_connection(listen_socket_t *sock) /* {{{ */
2127 {
2128   if (sock->fd >= 0)
2129   {
2130     close(sock->fd);
2131     sock->fd = -1;
2132   }
2133
2134   free_listen_socket(sock);
2135
2136 } /* }}} void close_connection */
2137
2138 static void *connection_thread_main (void *args) /* {{{ */
2139 {
2140   listen_socket_t *sock;
2141   int fd;
2142
2143   sock = (listen_socket_t *) args;
2144   fd = sock->fd;
2145
2146   /* init read buffers */
2147   sock->next_read = sock->next_cmd = 0;
2148   sock->rbuf = malloc(RBUF_SIZE);
2149   if (sock->rbuf == NULL)
2150   {
2151     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2152     close_connection(sock);
2153     return NULL;
2154   }
2155
2156   pthread_mutex_lock (&connection_threads_lock);
2157   connection_threads_num++;
2158   pthread_mutex_unlock (&connection_threads_lock);
2159
2160   while (state == RUNNING)
2161   {
2162     char *cmd;
2163     ssize_t cmd_len;
2164     ssize_t rbytes;
2165     time_t now;
2166
2167     struct pollfd pollfd;
2168     int status;
2169
2170     pollfd.fd = fd;
2171     pollfd.events = POLLIN | POLLPRI;
2172     pollfd.revents = 0;
2173
2174     status = poll (&pollfd, 1, /* timeout = */ 500);
2175     if (state != RUNNING)
2176       break;
2177     else if (status == 0) /* timeout */
2178       continue;
2179     else if (status < 0) /* error */
2180     {
2181       status = errno;
2182       if (status != EINTR)
2183         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2184       continue;
2185     }
2186
2187     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2188       break;
2189     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2190     {
2191       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2192           "poll(2) returned something unexpected: %#04hx",
2193           pollfd.revents);
2194       break;
2195     }
2196
2197     rbytes = read(fd, sock->rbuf + sock->next_read,
2198                   RBUF_SIZE - sock->next_read);
2199     if (rbytes < 0)
2200     {
2201       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2202       break;
2203     }
2204     else if (rbytes == 0)
2205       break; /* eof */
2206
2207     sock->next_read += rbytes;
2208
2209     if (sock->batch_start)
2210       now = sock->batch_start;
2211     else
2212       now = time(NULL);
2213
2214     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2215     {
2216       status = handle_request (sock, now, cmd, cmd_len+1);
2217       if (status != 0)
2218         goto out_close;
2219     }
2220   }
2221
2222 out_close:
2223   close_connection(sock);
2224
2225   /* Remove this thread from the connection threads list */
2226   pthread_mutex_lock (&connection_threads_lock);
2227   connection_threads_num--;
2228   if (connection_threads_num <= 0)
2229     pthread_cond_broadcast(&connection_threads_done);
2230   pthread_mutex_unlock (&connection_threads_lock);
2231
2232   return (NULL);
2233 } /* }}} void *connection_thread_main */
2234
2235 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2236 {
2237   int fd;
2238   struct sockaddr_un sa;
2239   listen_socket_t *temp;
2240   int status;
2241   const char *path;
2242
2243   path = sock->addr;
2244   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2245     path += strlen("unix:");
2246
2247   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2248       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2249   if (temp == NULL)
2250   {
2251     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2252     return (-1);
2253   }
2254   listen_fds = temp;
2255   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2256
2257   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2258   if (fd < 0)
2259   {
2260     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2261              rrd_strerror(errno));
2262     return (-1);
2263   }
2264
2265   memset (&sa, 0, sizeof (sa));
2266   sa.sun_family = AF_UNIX;
2267   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2268
2269   /* if we've gotten this far, we own the pid file.  any daemon started
2270    * with the same args must not be alive.  therefore, ensure that we can
2271    * create the socket...
2272    */
2273   unlink(path);
2274
2275   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2276   if (status != 0)
2277   {
2278     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2279              path, rrd_strerror(errno));
2280     close (fd);
2281     return (-1);
2282   }
2283
2284   status = listen (fd, /* backlog = */ 10);
2285   if (status != 0)
2286   {
2287     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2288              path, rrd_strerror(errno));
2289     close (fd);
2290     unlink (path);
2291     return (-1);
2292   }
2293
2294   listen_fds[listen_fds_num].fd = fd;
2295   listen_fds[listen_fds_num].family = PF_UNIX;
2296   strncpy(listen_fds[listen_fds_num].addr, path,
2297           sizeof (listen_fds[listen_fds_num].addr) - 1);
2298   listen_fds_num++;
2299
2300   return (0);
2301 } /* }}} int open_listen_socket_unix */
2302
2303 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2304 {
2305   struct addrinfo ai_hints;
2306   struct addrinfo *ai_res;
2307   struct addrinfo *ai_ptr;
2308   char addr_copy[NI_MAXHOST];
2309   char *addr;
2310   char *port;
2311   int status;
2312
2313   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2314   addr_copy[sizeof (addr_copy) - 1] = 0;
2315   addr = addr_copy;
2316
2317   memset (&ai_hints, 0, sizeof (ai_hints));
2318   ai_hints.ai_flags = 0;
2319 #ifdef AI_ADDRCONFIG
2320   ai_hints.ai_flags |= AI_ADDRCONFIG;
2321 #endif
2322   ai_hints.ai_family = AF_UNSPEC;
2323   ai_hints.ai_socktype = SOCK_STREAM;
2324
2325   port = NULL;
2326   if (*addr == '[') /* IPv6+port format */
2327   {
2328     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2329     addr++;
2330
2331     port = strchr (addr, ']');
2332     if (port == NULL)
2333     {
2334       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2335       return (-1);
2336     }
2337     *port = 0;
2338     port++;
2339
2340     if (*port == ':')
2341       port++;
2342     else if (*port == 0)
2343       port = NULL;
2344     else
2345     {
2346       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2347       return (-1);
2348     }
2349   } /* if (*addr = ']') */
2350   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2351   {
2352     port = rindex(addr, ':');
2353     if (port != NULL)
2354     {
2355       *port = 0;
2356       port++;
2357     }
2358   }
2359   ai_res = NULL;
2360   status = getaddrinfo (addr,
2361                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2362                         &ai_hints, &ai_res);
2363   if (status != 0)
2364   {
2365     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2366              addr, gai_strerror (status));
2367     return (-1);
2368   }
2369
2370   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2371   {
2372     int fd;
2373     listen_socket_t *temp;
2374     int one = 1;
2375
2376     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2377         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2378     if (temp == NULL)
2379     {
2380       fprintf (stderr,
2381                "rrdcached: open_listen_socket_network: realloc failed.\n");
2382       continue;
2383     }
2384     listen_fds = temp;
2385     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2386
2387     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2388     if (fd < 0)
2389     {
2390       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2391                rrd_strerror(errno));
2392       continue;
2393     }
2394
2395     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2396
2397     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2398     if (status != 0)
2399     {
2400       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2401                sock->addr, rrd_strerror(errno));
2402       close (fd);
2403       continue;
2404     }
2405
2406     status = listen (fd, /* backlog = */ 10);
2407     if (status != 0)
2408     {
2409       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2410                sock->addr, rrd_strerror(errno));
2411       close (fd);
2412       freeaddrinfo(ai_res);
2413       return (-1);
2414     }
2415
2416     listen_fds[listen_fds_num].fd = fd;
2417     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2418     listen_fds_num++;
2419   } /* for (ai_ptr) */
2420
2421   freeaddrinfo(ai_res);
2422   return (0);
2423 } /* }}} static int open_listen_socket_network */
2424
2425 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2426 {
2427   assert(sock != NULL);
2428   assert(sock->addr != NULL);
2429
2430   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2431       || sock->addr[0] == '/')
2432     return (open_listen_socket_unix(sock));
2433   else
2434     return (open_listen_socket_network(sock));
2435 } /* }}} int open_listen_socket */
2436
2437 static int close_listen_sockets (void) /* {{{ */
2438 {
2439   size_t i;
2440
2441   for (i = 0; i < listen_fds_num; i++)
2442   {
2443     close (listen_fds[i].fd);
2444
2445     if (listen_fds[i].family == PF_UNIX)
2446       unlink(listen_fds[i].addr);
2447   }
2448
2449   free (listen_fds);
2450   listen_fds = NULL;
2451   listen_fds_num = 0;
2452
2453   return (0);
2454 } /* }}} int close_listen_sockets */
2455
2456 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2457 {
2458   struct pollfd *pollfds;
2459   int pollfds_num;
2460   int status;
2461   int i;
2462
2463   if (listen_fds_num < 1)
2464   {
2465     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2466     return (NULL);
2467   }
2468
2469   pollfds_num = listen_fds_num;
2470   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2471   if (pollfds == NULL)
2472   {
2473     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2474     return (NULL);
2475   }
2476   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2477
2478   RRDD_LOG(LOG_INFO, "listening for connections");
2479
2480   while (state == RUNNING)
2481   {
2482     for (i = 0; i < pollfds_num; i++)
2483     {
2484       pollfds[i].fd = listen_fds[i].fd;
2485       pollfds[i].events = POLLIN | POLLPRI;
2486       pollfds[i].revents = 0;
2487     }
2488
2489     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2490     if (state != RUNNING)
2491       break;
2492     else if (status == 0) /* timeout */
2493       continue;
2494     else if (status < 0) /* error */
2495     {
2496       status = errno;
2497       if (status != EINTR)
2498       {
2499         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2500       }
2501       continue;
2502     }
2503
2504     for (i = 0; i < pollfds_num; i++)
2505     {
2506       listen_socket_t *client_sock;
2507       struct sockaddr_storage client_sa;
2508       socklen_t client_sa_size;
2509       pthread_t tid;
2510       pthread_attr_t attr;
2511
2512       if (pollfds[i].revents == 0)
2513         continue;
2514
2515       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2516       {
2517         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2518             "poll(2) returned something unexpected for listen FD #%i.",
2519             pollfds[i].fd);
2520         continue;
2521       }
2522
2523       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2524       if (client_sock == NULL)
2525       {
2526         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2527         continue;
2528       }
2529       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2530
2531       client_sa_size = sizeof (client_sa);
2532       client_sock->fd = accept (pollfds[i].fd,
2533           (struct sockaddr *) &client_sa, &client_sa_size);
2534       if (client_sock->fd < 0)
2535       {
2536         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2537         free(client_sock);
2538         continue;
2539       }
2540
2541       pthread_attr_init (&attr);
2542       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2543
2544       status = pthread_create (&tid, &attr, connection_thread_main,
2545                                client_sock);
2546       if (status != 0)
2547       {
2548         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2549         close_connection(client_sock);
2550         continue;
2551       }
2552     } /* for (pollfds_num) */
2553   } /* while (state == RUNNING) */
2554
2555   RRDD_LOG(LOG_INFO, "starting shutdown");
2556
2557   close_listen_sockets ();
2558
2559   pthread_mutex_lock (&connection_threads_lock);
2560   while (connection_threads_num > 0)
2561     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2562   pthread_mutex_unlock (&connection_threads_lock);
2563
2564   free(pollfds);
2565
2566   return (NULL);
2567 } /* }}} void *listen_thread_main */
2568
2569 static int daemonize (void) /* {{{ */
2570 {
2571   int pid_fd;
2572   char *base_dir;
2573
2574   daemon_uid = geteuid();
2575
2576   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2577   if (pid_fd < 0)
2578     pid_fd = check_pidfile();
2579   if (pid_fd < 0)
2580     return pid_fd;
2581
2582   /* open all the listen sockets */
2583   if (config_listen_address_list_len > 0)
2584   {
2585     for (size_t i = 0; i < config_listen_address_list_len; i++)
2586       open_listen_socket (config_listen_address_list[i]);
2587
2588     rrd_free_ptrs((void ***) &config_listen_address_list,
2589                   &config_listen_address_list_len);
2590   }
2591   else
2592   {
2593     listen_socket_t sock;
2594     memset(&sock, 0, sizeof(sock));
2595     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2596     open_listen_socket (&sock);
2597   }
2598
2599   if (listen_fds_num < 1)
2600   {
2601     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2602     goto error;
2603   }
2604
2605   if (!stay_foreground)
2606   {
2607     pid_t child;
2608
2609     child = fork ();
2610     if (child < 0)
2611     {
2612       fprintf (stderr, "daemonize: fork(2) failed.\n");
2613       goto error;
2614     }
2615     else if (child > 0)
2616       exit(0);
2617
2618     /* Become session leader */
2619     setsid ();
2620
2621     /* Open the first three file descriptors to /dev/null */
2622     close (2);
2623     close (1);
2624     close (0);
2625
2626     open ("/dev/null", O_RDWR);
2627     if (dup(0) == -1 || dup(0) == -1){
2628         RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2629     }
2630   } /* if (!stay_foreground) */
2631
2632   /* Change into the /tmp directory. */
2633   base_dir = (config_base_dir != NULL)
2634     ? config_base_dir
2635     : "/tmp";
2636
2637   if (chdir (base_dir) != 0)
2638   {
2639     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2640     goto error;
2641   }
2642
2643   install_signal_handlers();
2644
2645   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2646   RRDD_LOG(LOG_INFO, "starting up");
2647
2648   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2649                                 (GDestroyNotify) free_cache_item);
2650   if (cache_tree == NULL)
2651   {
2652     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2653     goto error;
2654   }
2655
2656   return write_pidfile (pid_fd);
2657
2658 error:
2659   remove_pidfile();
2660   return -1;
2661 } /* }}} int daemonize */
2662
2663 static int cleanup (void) /* {{{ */
2664 {
2665   pthread_cond_broadcast (&flush_cond);
2666   pthread_join (flush_thread, NULL);
2667
2668   pthread_cond_broadcast (&queue_cond);
2669   for (int i = 0; i < config_queue_threads; i++)
2670     pthread_join (queue_threads[i], NULL);
2671
2672   if (config_flush_at_shutdown)
2673   {
2674     assert(cache_queue_head == NULL);
2675     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2676   }
2677
2678   free(queue_threads);
2679   free(config_base_dir);
2680
2681   pthread_mutex_lock(&cache_lock);
2682   g_tree_destroy(cache_tree);
2683
2684   pthread_mutex_lock(&journal_lock);
2685   journal_done();
2686
2687   RRDD_LOG(LOG_INFO, "goodbye");
2688   closelog ();
2689
2690   remove_pidfile ();
2691   free(config_pid_file);
2692
2693   return (0);
2694 } /* }}} int cleanup */
2695
2696 static int read_options (int argc, char **argv) /* {{{ */
2697 {
2698   int option;
2699   int status = 0;
2700
2701   char **permissions = NULL;
2702   size_t permissions_len = 0;
2703
2704   while ((option = getopt(argc, argv, "gl:P:f:w:z:t:Bb:p:Fj:h?")) != -1)
2705   {
2706     switch (option)
2707     {
2708       case 'g':
2709         stay_foreground=1;
2710         break;
2711
2712       case 'l':
2713       {
2714         listen_socket_t *new;
2715
2716         new = malloc(sizeof(listen_socket_t));
2717         if (new == NULL)
2718         {
2719           fprintf(stderr, "read_options: malloc failed.\n");
2720           return(2);
2721         }
2722         memset(new, 0, sizeof(listen_socket_t));
2723
2724         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2725
2726         /* Add permissions to the socket {{{ */
2727         if (permissions_len != 0)
2728         {
2729           size_t i;
2730           for (i = 0; i < permissions_len; i++)
2731           {
2732             status = socket_permission_add (new, permissions[i]);
2733             if (status != 0)
2734             {
2735               fprintf (stderr, "read_options: Adding permission \"%s\" to "
2736                   "socket failed. Most likely, this permission doesn't "
2737                   "exist. Check your command line.\n", permissions[i]);
2738               status = 4;
2739             }
2740           }
2741         }
2742         else /* if (permissions_len == 0) */
2743         {
2744           /* Add permission for ALL commands to the socket. */
2745           size_t i;
2746           for (i = 0; i < list_of_commands_len; i++)
2747           {
2748             status = socket_permission_add (new, list_of_commands[i].cmd);
2749             if (status != 0)
2750             {
2751               fprintf (stderr, "read_options: Adding permission \"%s\" to "
2752                   "socket failed. This should never happen, ever! Sorry.\n",
2753                   permissions[i]);
2754               status = 4;
2755             }
2756           }
2757         }
2758         /* }}} Done adding permissions. */
2759
2760         if (!rrd_add_ptr((void ***)&config_listen_address_list,
2761                          &config_listen_address_list_len, new))
2762         {
2763           fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2764           return (2);
2765         }
2766       }
2767       break;
2768
2769       case 'P':
2770       {
2771         char *optcopy;
2772         char *saveptr;
2773         char *dummy;
2774         char *ptr;
2775
2776         rrd_free_ptrs ((void *) &permissions, &permissions_len);
2777
2778         optcopy = strdup (optarg);
2779         dummy = optcopy;
2780         saveptr = NULL;
2781         while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
2782         {
2783           dummy = NULL;
2784           rrd_add_strdup ((void *) &permissions, &permissions_len, ptr);
2785         }
2786
2787         free (optcopy);
2788       }
2789       break;
2790
2791       case 'f':
2792       {
2793         int temp;
2794
2795         temp = atoi (optarg);
2796         if (temp > 0)
2797           config_flush_interval = temp;
2798         else
2799         {
2800           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2801           status = 3;
2802         }
2803       }
2804       break;
2805
2806       case 'w':
2807       {
2808         int temp;
2809
2810         temp = atoi (optarg);
2811         if (temp > 0)
2812           config_write_interval = temp;
2813         else
2814         {
2815           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2816           status = 2;
2817         }
2818       }
2819       break;
2820
2821       case 'z':
2822       {
2823         int temp;
2824
2825         temp = atoi(optarg);
2826         if (temp > 0)
2827           config_write_jitter = temp;
2828         else
2829         {
2830           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2831           status = 2;
2832         }
2833
2834         break;
2835       }
2836
2837       case 't':
2838       {
2839         int threads;
2840         threads = atoi(optarg);
2841         if (threads >= 1)
2842           config_queue_threads = threads;
2843         else
2844         {
2845           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2846           return 1;
2847         }
2848       }
2849       break;
2850
2851       case 'B':
2852         config_write_base_only = 1;
2853         break;
2854
2855       case 'b':
2856       {
2857         size_t len;
2858         char base_realpath[PATH_MAX];
2859
2860         if (config_base_dir != NULL)
2861           free (config_base_dir);
2862         config_base_dir = strdup (optarg);
2863         if (config_base_dir == NULL)
2864         {
2865           fprintf (stderr, "read_options: strdup failed.\n");
2866           return (3);
2867         }
2868
2869         /* make sure that the base directory is not resolved via
2870          * symbolic links.  this makes some performance-enhancing
2871          * assumptions possible (we don't have to resolve paths
2872          * that start with a "/")
2873          */
2874         if (realpath(config_base_dir, base_realpath) == NULL)
2875         {
2876           fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2877           return 5;
2878         }
2879         else if (strncmp(config_base_dir,
2880                          base_realpath, sizeof(base_realpath)) != 0)
2881         {
2882           fprintf(stderr,
2883                   "Base directory (-b) resolved via file system links!\n"
2884                   "Please consult rrdcached '-b' documentation!\n"
2885                   "Consider specifying the real directory (%s)\n",
2886                   base_realpath);
2887           return 5;
2888         }
2889
2890         len = strlen (config_base_dir);
2891         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2892         {
2893           config_base_dir[len - 1] = 0;
2894           len--;
2895         }
2896
2897         if (len < 1)
2898         {
2899           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2900           return (4);
2901         }
2902
2903         _config_base_dir_len = len;
2904       }
2905       break;
2906
2907       case 'p':
2908       {
2909         if (config_pid_file != NULL)
2910           free (config_pid_file);
2911         config_pid_file = strdup (optarg);
2912         if (config_pid_file == NULL)
2913         {
2914           fprintf (stderr, "read_options: strdup failed.\n");
2915           return (3);
2916         }
2917       }
2918       break;
2919
2920       case 'F':
2921         config_flush_at_shutdown = 1;
2922         break;
2923
2924       case 'j':
2925       {
2926         const char *dir = journal_dir = strdup(optarg);
2927
2928         status = rrd_mkdir_p(dir, 0777);
2929         if (status != 0)
2930         {
2931           fprintf(stderr, "Failed to create journal directory '%s': %s\n",
2932               dir, rrd_strerror(errno));
2933           return 6;
2934         }
2935
2936         if (access(dir, R_OK|W_OK|X_OK) != 0)
2937         {
2938           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2939                   errno ? rrd_strerror(errno) : "");
2940           return 6;
2941         }
2942       }
2943       break;
2944
2945       case 'h':
2946       case '?':
2947         printf ("RRDCacheD %s\n"
2948             "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
2949             "\n"
2950             "Usage: rrdcached [options]\n"
2951             "\n"
2952             "Valid options are:\n"
2953             "  -l <address>  Socket address to listen to.\n"
2954             "  -P <perms>    Sets the permissions to assign to all following "
2955                             "sockets\n"
2956             "  -w <seconds>  Interval in which to write data.\n"
2957             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2958             "  -t <threads>  Number of write threads.\n"
2959             "  -f <seconds>  Interval in which to flush dead data.\n"
2960             "  -p <file>     Location of the PID-file.\n"
2961             "  -b <dir>      Base directory to change to.\n"
2962             "  -B            Restrict file access to paths within -b <dir>\n"
2963             "  -g            Do not fork and run in the foreground.\n"
2964             "  -j <dir>      Directory in which to create the journal files.\n"
2965             "  -F            Always flush all updates at shutdown\n"
2966             "\n"
2967             "For more information and a detailed description of all options "
2968             "please refer\n"
2969             "to the rrdcached(1) manual page.\n",
2970             VERSION);
2971         status = -1;
2972         break;
2973     } /* switch (option) */
2974   } /* while (getopt) */
2975
2976   /* advise the user when values are not sane */
2977   if (config_flush_interval < 2 * config_write_interval)
2978     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2979             " 2x write interval (-w) !\n");
2980   if (config_write_jitter > config_write_interval)
2981     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2982             " write interval (-w) !\n");
2983
2984   if (config_write_base_only && config_base_dir == NULL)
2985     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2986             "  Consult the rrdcached documentation\n");
2987
2988   if (journal_dir == NULL)
2989     config_flush_at_shutdown = 1;
2990
2991   rrd_free_ptrs ((void *) &permissions, &permissions_len);
2992
2993   return (status);
2994 } /* }}} int read_options */
2995
2996 int main (int argc, char **argv)
2997 {
2998   int status;
2999
3000   status = read_options (argc, argv);
3001   if (status != 0)
3002   {
3003     if (status < 0)
3004       status = 0;
3005     return (status);
3006   }
3007
3008   status = daemonize ();
3009   if (status != 0)
3010   {
3011     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3012     return (1);
3013   }
3014
3015   journal_init();
3016
3017   /* start the queue threads */
3018   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3019   if (queue_threads == NULL)
3020   {
3021     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3022     cleanup();
3023     return (1);
3024   }
3025   for (int i = 0; i < config_queue_threads; i++)
3026   {
3027     memset (&queue_threads[i], 0, sizeof (*queue_threads));
3028     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3029     if (status != 0)
3030     {
3031       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3032       cleanup();
3033       return (1);
3034     }
3035   }
3036
3037   /* start the flush thread */
3038   memset(&flush_thread, 0, sizeof(flush_thread));
3039   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3040   if (status != 0)
3041   {
3042     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3043     cleanup();
3044     return (1);
3045   }
3046
3047   listen_thread_main (NULL);
3048   cleanup ();
3049
3050   return (0);
3051 } /* int main */
3052
3053 /*
3054  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3055  */