From: Florian Forster <octo@leeloo.lan.home.verplant.org>
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008,2009 Florian octo Forster
4  * Copyright (C) 2008,2009 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
73
74 #include "rrd.h"
75 #include "rrd_client.h"
76
77 #include <stdlib.h>
78
79 #ifndef WIN32
80 #include <stdint.h>
81 #include <unistd.h>
82 #include <strings.h>
83 #include <inttypes.h>
84 #       include <sys/socket.h>
85
86 #else
87
88 #endif
89 #include <stdio.h>
90 #include <string.h>
91
92 #include <sys/types.h>
93 #include <sys/stat.h>
94 #include <dirent.h>
95 #include <fcntl.h>
96 #include <signal.h>
97 #include <sys/un.h>
98 #include <netdb.h>
99 #include <poll.h>
100 #include <syslog.h>
101 #include <pthread.h>
102 #include <errno.h>
103 #include <assert.h>
104 #include <sys/time.h>
105 #include <time.h>
106
107 #include <glib-2.0/glib.h>
108 /* }}} */
109
110 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
111
112 #ifndef __GNUC__
113 # define __attribute__(x) /**/
114 #endif
115
116 /*
117  * Types
118  */
119 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
120
121 struct listen_socket_s
122 {
123   int fd;
124   char addr[PATH_MAX + 1];
125   int family;
126
127   /* state for BATCH processing */
128   time_t batch_start;
129   int batch_cmd;
130
131   /* buffered IO */
132   char *rbuf;
133   off_t next_cmd;
134   off_t next_read;
135
136   char *wbuf;
137   ssize_t wbuf_len;
138
139   uint32_t permissions;
140 };
141 typedef struct listen_socket_s listen_socket_t;
142
143 struct command_s;
144 typedef struct command_s command_t;
145 /* note: guard against "unused" warnings in the handlers */
146 #define DISPATCH_PROTO  listen_socket_t *sock   __attribute__((unused)),\
147                         time_t now              __attribute__((unused)),\
148                         char  *buffer           __attribute__((unused)),\
149                         size_t buffer_size      __attribute__((unused))
150
151 #define HANDLER_PROTO   command_t *cmd          __attribute__((unused)),\
152                         DISPATCH_PROTO
153
154 struct command_s {
155   char   *cmd;
156   int (*handler)(HANDLER_PROTO);
157
158   char  context;                /* where we expect to see it */
159 #define CMD_CONTEXT_CLIENT      (1<<0)
160 #define CMD_CONTEXT_BATCH       (1<<1)
161 #define CMD_CONTEXT_JOURNAL     (1<<2)
162 #define CMD_CONTEXT_ANY         (0x7f)
163
164   char *syntax;
165   char *help;
166 };
167
168 struct cache_item_s;
169 typedef struct cache_item_s cache_item_t;
170 struct cache_item_s
171 {
172   char *file;
173   char **values;
174   size_t values_num;
175   time_t last_flush_time;
176   time_t last_update_stamp;
177 #define CI_FLAGS_IN_TREE  (1<<0)
178 #define CI_FLAGS_IN_QUEUE (1<<1)
179   int flags;
180   pthread_cond_t  flushed;
181   cache_item_t *prev;
182   cache_item_t *next;
183 };
184
185 struct callback_flush_data_s
186 {
187   time_t now;
188   time_t abs_timeout;
189   char **keys;
190   size_t keys_num;
191 };
192 typedef struct callback_flush_data_s callback_flush_data_t;
193
194 enum queue_side_e
195 {
196   HEAD,
197   TAIL
198 };
199 typedef enum queue_side_e queue_side_t;
200
201 /* describe a set of journal files */
202 typedef struct {
203   char **files;
204   size_t files_num;
205 } journal_set;
206
207 /* max length of socket command or response */
208 #define CMD_MAX 4096
209 #define RBUF_SIZE (CMD_MAX*2)
210
211 /*
212  * Variables
213  */
214 static int stay_foreground = 0;
215 static uid_t daemon_uid;
216
217 static listen_socket_t *listen_fds = NULL;
218 static size_t listen_fds_num = 0;
219
220 enum {
221   RUNNING,              /* normal operation */
222   FLUSHING,             /* flushing remaining values */
223   SHUTDOWN              /* shutting down */
224 } state = RUNNING;
225
226 static pthread_t *queue_threads;
227 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
228 static int config_queue_threads = 4;
229
230 static pthread_t flush_thread;
231 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
232
233 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
234 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
235 static int connection_threads_num = 0;
236
237 /* Cache stuff */
238 static GTree          *cache_tree = NULL;
239 static cache_item_t   *cache_queue_head = NULL;
240 static cache_item_t   *cache_queue_tail = NULL;
241 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
242
243 static int config_write_interval = 300;
244 static int config_write_jitter   = 0;
245 static int config_flush_interval = 3600;
246 static int config_flush_at_shutdown = 0;
247 static char *config_pid_file = NULL;
248 static char *config_base_dir = NULL;
249 static size_t _config_base_dir_len = 0;
250 static int config_write_base_only = 0;
251
252 static listen_socket_t **config_listen_address_list = NULL;
253 static size_t config_listen_address_list_len = 0;
254
255 static uint64_t stats_queue_length = 0;
256 static uint64_t stats_updates_received = 0;
257 static uint64_t stats_flush_received = 0;
258 static uint64_t stats_updates_written = 0;
259 static uint64_t stats_data_sets_written = 0;
260 static uint64_t stats_journal_bytes = 0;
261 static uint64_t stats_journal_rotate = 0;
262 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
263
264 /* Journaled updates */
265 #define JOURNAL_BASE "rrd.journal"
266 static journal_set *journal_cur = NULL;
267 static journal_set *journal_old = NULL;
268 static char *journal_dir = NULL;
269 static FILE *journal_fh = NULL;         /* current journal file handle */
270 static long  journal_size = 0;          /* current journal size */
271 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
272 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
273 static int journal_write(char *cmd, char *args);
274 static void journal_done(void);
275 static void journal_rotate(void);
276
277 /* prototypes for forward refernces */
278 static int handle_request_help (HANDLER_PROTO);
279
280 /* 
281  * Functions
282  */
283 static void sig_common (const char *sig) /* {{{ */
284 {
285   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
286   state = FLUSHING;
287   pthread_cond_broadcast(&flush_cond);
288   pthread_cond_broadcast(&queue_cond);
289 } /* }}} void sig_common */
290
291 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
292 {
293   sig_common("INT");
294 } /* }}} void sig_int_handler */
295
296 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
297 {
298   sig_common("TERM");
299 } /* }}} void sig_term_handler */
300
301 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
302 {
303   config_flush_at_shutdown = 1;
304   sig_common("USR1");
305 } /* }}} void sig_usr1_handler */
306
307 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
308 {
309   config_flush_at_shutdown = 0;
310   sig_common("USR2");
311 } /* }}} void sig_usr2_handler */
312
313 static void install_signal_handlers(void) /* {{{ */
314 {
315   /* These structures are static, because `sigaction' behaves weird if the are
316    * overwritten.. */
317   static struct sigaction sa_int;
318   static struct sigaction sa_term;
319   static struct sigaction sa_pipe;
320   static struct sigaction sa_usr1;
321   static struct sigaction sa_usr2;
322
323   /* Install signal handlers */
324   memset (&sa_int, 0, sizeof (sa_int));
325   sa_int.sa_handler = sig_int_handler;
326   sigaction (SIGINT, &sa_int, NULL);
327
328   memset (&sa_term, 0, sizeof (sa_term));
329   sa_term.sa_handler = sig_term_handler;
330   sigaction (SIGTERM, &sa_term, NULL);
331
332   memset (&sa_pipe, 0, sizeof (sa_pipe));
333   sa_pipe.sa_handler = SIG_IGN;
334   sigaction (SIGPIPE, &sa_pipe, NULL);
335
336   memset (&sa_pipe, 0, sizeof (sa_usr1));
337   sa_usr1.sa_handler = sig_usr1_handler;
338   sigaction (SIGUSR1, &sa_usr1, NULL);
339
340   memset (&sa_usr2, 0, sizeof (sa_usr2));
341   sa_usr2.sa_handler = sig_usr2_handler;
342   sigaction (SIGUSR2, &sa_usr2, NULL);
343
344 } /* }}} void install_signal_handlers */
345
346 static int open_pidfile(char *action, int oflag) /* {{{ */
347 {
348   int fd;
349   char *file;
350
351   file = (config_pid_file != NULL)
352     ? config_pid_file
353     : LOCALSTATEDIR "/run/rrdcached.pid";
354
355   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
356   if (fd < 0)
357     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
358             action, file, rrd_strerror(errno));
359
360   return(fd);
361 } /* }}} static int open_pidfile */
362
363 /* check existing pid file to see whether a daemon is running */
364 static int check_pidfile(void)
365 {
366   int pid_fd;
367   pid_t pid;
368   char pid_str[16];
369
370   pid_fd = open_pidfile("open", O_RDWR);
371   if (pid_fd < 0)
372     return pid_fd;
373
374   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
375     return -1;
376
377   pid = atoi(pid_str);
378   if (pid <= 0)
379     return -1;
380
381   /* another running process that we can signal COULD be
382    * a competing rrdcached */
383   if (pid != getpid() && kill(pid, 0) == 0)
384   {
385     fprintf(stderr,
386             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
387     close(pid_fd);
388     return -1;
389   }
390
391   lseek(pid_fd, 0, SEEK_SET);
392   if (ftruncate(pid_fd, 0) == -1)
393   {
394     fprintf(stderr,
395             "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
396     close(pid_fd);
397     return -1;
398   }
399
400   fprintf(stderr,
401           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
402           "rrdcached: starting normally.\n", pid);
403
404   return pid_fd;
405 } /* }}} static int check_pidfile */
406
407 static int write_pidfile (int fd) /* {{{ */
408 {
409   pid_t pid;
410   FILE *fh;
411
412   pid = getpid ();
413
414   fh = fdopen (fd, "w");
415   if (fh == NULL)
416   {
417     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
418     close(fd);
419     return (-1);
420   }
421
422   fprintf (fh, "%i\n", (int) pid);
423   fclose (fh);
424
425   return (0);
426 } /* }}} int write_pidfile */
427
428 static int remove_pidfile (void) /* {{{ */
429 {
430   char *file;
431   int status;
432
433   file = (config_pid_file != NULL)
434     ? config_pid_file
435     : LOCALSTATEDIR "/run/rrdcached.pid";
436
437   status = unlink (file);
438   if (status == 0)
439     return (0);
440   return (errno);
441 } /* }}} int remove_pidfile */
442
443 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
444 {
445   char *eol;
446
447   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
448                sock->next_read - sock->next_cmd);
449
450   if (eol == NULL)
451   {
452     /* no commands left, move remainder back to front of rbuf */
453     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
454             sock->next_read - sock->next_cmd);
455     sock->next_read -= sock->next_cmd;
456     sock->next_cmd = 0;
457     *len = 0;
458     return NULL;
459   }
460   else
461   {
462     char *cmd = sock->rbuf + sock->next_cmd;
463     *eol = '\0';
464
465     sock->next_cmd = eol - sock->rbuf + 1;
466
467     if (eol > sock->rbuf && *(eol-1) == '\r')
468       *(--eol) = '\0'; /* handle "\r\n" EOL */
469
470     *len = eol - cmd;
471
472     return cmd;
473   }
474
475   /* NOTREACHED */
476   assert(1==0);
477 } /* }}} char *next_cmd */
478
479 /* add the characters directly to the write buffer */
480 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
481 {
482   char *new_buf;
483
484   assert(sock != NULL);
485
486   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
487   if (new_buf == NULL)
488   {
489     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
490     return -1;
491   }
492
493   strncpy(new_buf + sock->wbuf_len, str, len + 1);
494
495   sock->wbuf = new_buf;
496   sock->wbuf_len += len;
497
498   return 0;
499 } /* }}} static int add_to_wbuf */
500
501 /* add the text to the "extra" info that's sent after the status line */
502 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
503 {
504   va_list argp;
505   char buffer[CMD_MAX];
506   int len;
507
508   if (sock == NULL) return 0; /* journal replay mode */
509   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
510
511   va_start(argp, fmt);
512 #ifdef HAVE_VSNPRINTF
513   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
514 #else
515   len = vsprintf(buffer, fmt, argp);
516 #endif
517   va_end(argp);
518   if (len < 0)
519   {
520     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
521     return -1;
522   }
523
524   return add_to_wbuf(sock, buffer, len);
525 } /* }}} static int add_response_info */
526
527 static int count_lines(char *str) /* {{{ */
528 {
529   int lines = 0;
530
531   if (str != NULL)
532   {
533     while ((str = strchr(str, '\n')) != NULL)
534     {
535       ++lines;
536       ++str;
537     }
538   }
539
540   return lines;
541 } /* }}} static int count_lines */
542
543 /* send the response back to the user.
544  * returns 0 on success, -1 on error
545  * write buffer is always zeroed after this call */
546 static int send_response (listen_socket_t *sock, response_code rc,
547                           char *fmt, ...) /* {{{ */
548 {
549   va_list argp;
550   char buffer[CMD_MAX];
551   int lines;
552   ssize_t wrote;
553   int rclen, len;
554
555   if (sock == NULL) return rc;  /* journal replay mode */
556
557   if (sock->batch_start)
558   {
559     if (rc == RESP_OK)
560       return rc; /* no response on success during BATCH */
561     lines = sock->batch_cmd;
562   }
563   else if (rc == RESP_OK)
564     lines = count_lines(sock->wbuf);
565   else
566     lines = -1;
567
568   rclen = sprintf(buffer, "%d ", lines);
569   va_start(argp, fmt);
570 #ifdef HAVE_VSNPRINTF
571   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
572 #else
573   len = vsprintf(buffer+rclen, fmt, argp);
574 #endif
575   va_end(argp);
576   if (len < 0)
577     return -1;
578
579   len += rclen;
580
581   /* append the result to the wbuf, don't write to the user */
582   if (sock->batch_start)
583     return add_to_wbuf(sock, buffer, len);
584
585   /* first write must be complete */
586   if (len != write(sock->fd, buffer, len))
587   {
588     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
589     return -1;
590   }
591
592   if (sock->wbuf != NULL && rc == RESP_OK)
593   {
594     wrote = 0;
595     while (wrote < sock->wbuf_len)
596     {
597       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
598       if (wb <= 0)
599       {
600         RRDD_LOG(LOG_INFO, "send_response: could not write results");
601         return -1;
602       }
603       wrote += wb;
604     }
605   }
606
607   free(sock->wbuf); sock->wbuf = NULL;
608   sock->wbuf_len = 0;
609
610   return 0;
611 } /* }}} */
612
613 static void wipe_ci_values(cache_item_t *ci, time_t when)
614 {
615   ci->values = NULL;
616   ci->values_num = 0;
617
618   ci->last_flush_time = when;
619   if (config_write_jitter > 0)
620     ci->last_flush_time += (rrd_random() % config_write_jitter);
621 }
622
623 /* remove_from_queue
624  * remove a "cache_item_t" item from the queue.
625  * must hold 'cache_lock' when calling this
626  */
627 static void remove_from_queue(cache_item_t *ci) /* {{{ */
628 {
629   if (ci == NULL) return;
630   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
631
632   if (ci->prev == NULL)
633     cache_queue_head = ci->next; /* reset head */
634   else
635     ci->prev->next = ci->next;
636
637   if (ci->next == NULL)
638     cache_queue_tail = ci->prev; /* reset the tail */
639   else
640     ci->next->prev = ci->prev;
641
642   ci->next = ci->prev = NULL;
643   ci->flags &= ~CI_FLAGS_IN_QUEUE;
644
645   pthread_mutex_lock (&stats_lock);
646   assert (stats_queue_length > 0);
647   stats_queue_length--;
648   pthread_mutex_unlock (&stats_lock);
649
650 } /* }}} static void remove_from_queue */
651
652 /* free the resources associated with the cache_item_t
653  * must hold cache_lock when calling this function
654  */
655 static void *free_cache_item(cache_item_t *ci) /* {{{ */
656 {
657   if (ci == NULL) return NULL;
658
659   remove_from_queue(ci);
660
661   for (size_t i=0; i < ci->values_num; i++)
662     free(ci->values[i]);
663
664   free (ci->values);
665   free (ci->file);
666
667   /* in case anyone is waiting */
668   pthread_cond_broadcast(&ci->flushed);
669   pthread_cond_destroy(&ci->flushed);
670
671   free (ci);
672
673   return NULL;
674 } /* }}} static void *free_cache_item */
675
676 /*
677  * enqueue_cache_item:
678  * `cache_lock' must be acquired before calling this function!
679  */
680 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
681     queue_side_t side)
682 {
683   if (ci == NULL)
684     return (-1);
685
686   if (ci->values_num == 0)
687     return (0);
688
689   if (side == HEAD)
690   {
691     if (cache_queue_head == ci)
692       return 0;
693
694     /* remove if further down in queue */
695     remove_from_queue(ci);
696
697     ci->prev = NULL;
698     ci->next = cache_queue_head;
699     if (ci->next != NULL)
700       ci->next->prev = ci;
701     cache_queue_head = ci;
702
703     if (cache_queue_tail == NULL)
704       cache_queue_tail = cache_queue_head;
705   }
706   else /* (side == TAIL) */
707   {
708     /* We don't move values back in the list.. */
709     if (ci->flags & CI_FLAGS_IN_QUEUE)
710       return (0);
711
712     assert (ci->next == NULL);
713     assert (ci->prev == NULL);
714
715     ci->prev = cache_queue_tail;
716
717     if (cache_queue_tail == NULL)
718       cache_queue_head = ci;
719     else
720       cache_queue_tail->next = ci;
721
722     cache_queue_tail = ci;
723   }
724
725   ci->flags |= CI_FLAGS_IN_QUEUE;
726
727   pthread_cond_signal(&queue_cond);
728   pthread_mutex_lock (&stats_lock);
729   stats_queue_length++;
730   pthread_mutex_unlock (&stats_lock);
731
732   return (0);
733 } /* }}} int enqueue_cache_item */
734
735 /*
736  * tree_callback_flush:
737  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
738  * while this is in progress.
739  */
740 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
741     gpointer data)
742 {
743   cache_item_t *ci;
744   callback_flush_data_t *cfd;
745
746   ci = (cache_item_t *) value;
747   cfd = (callback_flush_data_t *) data;
748
749   if (ci->flags & CI_FLAGS_IN_QUEUE)
750     return FALSE;
751
752   if (ci->values_num > 0
753       && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
754   {
755     enqueue_cache_item (ci, TAIL);
756   }
757   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
758       && (ci->values_num <= 0))
759   {
760     assert ((char *) key == ci->file);
761     if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
762     {
763       RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
764       return (FALSE);
765     }
766   }
767
768   return (FALSE);
769 } /* }}} gboolean tree_callback_flush */
770
771 static int flush_old_values (int max_age)
772 {
773   callback_flush_data_t cfd;
774   size_t k;
775
776   memset (&cfd, 0, sizeof (cfd));
777   /* Pass the current time as user data so that we don't need to call
778    * `time' for each node. */
779   cfd.now = time (NULL);
780   cfd.keys = NULL;
781   cfd.keys_num = 0;
782
783   if (max_age > 0)
784     cfd.abs_timeout = cfd.now - max_age;
785   else
786     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
787
788   /* `tree_callback_flush' will return the keys of all values that haven't
789    * been touched in the last `config_flush_interval' seconds in `cfd'.
790    * The char*'s in this array point to the same memory as ci->file, so we
791    * don't need to free them separately. */
792   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
793
794   for (k = 0; k < cfd.keys_num; k++)
795   {
796     /* should never fail, since we have held the cache_lock
797      * the entire time */
798     assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
799   }
800
801   if (cfd.keys != NULL)
802   {
803     free (cfd.keys);
804     cfd.keys = NULL;
805   }
806
807   return (0);
808 } /* int flush_old_values */
809
810 static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
811 {
812   struct timeval now;
813   struct timespec next_flush;
814   int status;
815
816   gettimeofday (&now, NULL);
817   next_flush.tv_sec = now.tv_sec + config_flush_interval;
818   next_flush.tv_nsec = 1000 * now.tv_usec;
819
820   pthread_mutex_lock(&cache_lock);
821
822   while (state == RUNNING)
823   {
824     gettimeofday (&now, NULL);
825     if ((now.tv_sec > next_flush.tv_sec)
826         || ((now.tv_sec == next_flush.tv_sec)
827           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
828     {
829       RRDD_LOG(LOG_DEBUG, "flushing old values");
830
831       /* Determine the time of the next cache flush. */
832       next_flush.tv_sec = now.tv_sec + config_flush_interval;
833
834       /* Flush all values that haven't been written in the last
835        * `config_write_interval' seconds. */
836       flush_old_values (config_write_interval);
837
838       /* unlock the cache while we rotate so we don't block incoming
839        * updates if the fsync() blocks on disk I/O */
840       pthread_mutex_unlock(&cache_lock);
841       journal_rotate();
842       pthread_mutex_lock(&cache_lock);
843     }
844
845     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
846     if (status != 0 && status != ETIMEDOUT)
847     {
848       RRDD_LOG (LOG_ERR, "flush_thread_main: "
849                 "pthread_cond_timedwait returned %i.", status);
850     }
851   }
852
853   if (config_flush_at_shutdown)
854     flush_old_values (-1); /* flush everything */
855
856   state = SHUTDOWN;
857
858   pthread_mutex_unlock(&cache_lock);
859
860   return NULL;
861 } /* void *flush_thread_main */
862
863 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
864 {
865   pthread_mutex_lock (&cache_lock);
866
867   while (state != SHUTDOWN
868          || (cache_queue_head != NULL && config_flush_at_shutdown))
869   {
870     cache_item_t *ci;
871     char *file;
872     char **values;
873     size_t values_num;
874     int status;
875
876     /* Now, check if there's something to store away. If not, wait until
877      * something comes in. */
878     if (cache_queue_head == NULL)
879     {
880       status = pthread_cond_wait (&queue_cond, &cache_lock);
881       if ((status != 0) && (status != ETIMEDOUT))
882       {
883         RRDD_LOG (LOG_ERR, "queue_thread_main: "
884             "pthread_cond_wait returned %i.", status);
885       }
886     }
887
888     /* Check if a value has arrived. This may be NULL if we timed out or there
889      * was an interrupt such as a signal. */
890     if (cache_queue_head == NULL)
891       continue;
892
893     ci = cache_queue_head;
894
895     /* copy the relevant parts */
896     file = strdup (ci->file);
897     if (file == NULL)
898     {
899       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
900       continue;
901     }
902
903     assert(ci->values != NULL);
904     assert(ci->values_num > 0);
905
906     values = ci->values;
907     values_num = ci->values_num;
908
909     wipe_ci_values(ci, time(NULL));
910     remove_from_queue(ci);
911
912     pthread_mutex_unlock (&cache_lock);
913
914     rrd_clear_error ();
915     status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
916     if (status != 0)
917     {
918       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
919           "rrd_update_r (%s) failed with status %i. (%s)",
920           file, status, rrd_get_error());
921     }
922
923     journal_write("wrote", file);
924
925     /* Search again in the tree.  It's possible someone issued a "FORGET"
926      * while we were writing the update values. */
927     pthread_mutex_lock(&cache_lock);
928     ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
929     if (ci)
930       pthread_cond_broadcast(&ci->flushed);
931     pthread_mutex_unlock(&cache_lock);
932
933     if (status == 0)
934     {
935       pthread_mutex_lock (&stats_lock);
936       stats_updates_written++;
937       stats_data_sets_written += values_num;
938       pthread_mutex_unlock (&stats_lock);
939     }
940
941     rrd_free_ptrs((void ***) &values, &values_num);
942     free(file);
943
944     pthread_mutex_lock (&cache_lock);
945   }
946   pthread_mutex_unlock (&cache_lock);
947
948   return (NULL);
949 } /* }}} void *queue_thread_main */
950
951 static int buffer_get_field (char **buffer_ret, /* {{{ */
952     size_t *buffer_size_ret, char **field_ret)
953 {
954   char *buffer;
955   size_t buffer_pos;
956   size_t buffer_size;
957   char *field;
958   size_t field_size;
959   int status;
960
961   buffer = *buffer_ret;
962   buffer_pos = 0;
963   buffer_size = *buffer_size_ret;
964   field = *buffer_ret;
965   field_size = 0;
966
967   if (buffer_size <= 0)
968     return (-1);
969
970   /* This is ensured by `handle_request'. */
971   assert (buffer[buffer_size - 1] == '\0');
972
973   status = -1;
974   while (buffer_pos < buffer_size)
975   {
976     /* Check for end-of-field or end-of-buffer */
977     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
978     {
979       field[field_size] = 0;
980       field_size++;
981       buffer_pos++;
982       status = 0;
983       break;
984     }
985     /* Handle escaped characters. */
986     else if (buffer[buffer_pos] == '\\')
987     {
988       if (buffer_pos >= (buffer_size - 1))
989         break;
990       buffer_pos++;
991       field[field_size] = buffer[buffer_pos];
992       field_size++;
993       buffer_pos++;
994     }
995     /* Normal operation */ 
996     else
997     {
998       field[field_size] = buffer[buffer_pos];
999       field_size++;
1000       buffer_pos++;
1001     }
1002   } /* while (buffer_pos < buffer_size) */
1003
1004   if (status != 0)
1005     return (status);
1006
1007   *buffer_ret = buffer + buffer_pos;
1008   *buffer_size_ret = buffer_size - buffer_pos;
1009   *field_ret = field;
1010
1011   return (0);
1012 } /* }}} int buffer_get_field */
1013
1014 /* if we're restricting writes to the base directory,
1015  * check whether the file falls within the dir
1016  * returns 1 if OK, otherwise 0
1017  */
1018 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1019 {
1020   assert(file != NULL);
1021
1022   if (!config_write_base_only
1023       || sock == NULL /* journal replay */
1024       || config_base_dir == NULL)
1025     return 1;
1026
1027   if (strstr(file, "../") != NULL) goto err;
1028
1029   /* relative paths without "../" are ok */
1030   if (*file != '/') return 1;
1031
1032   /* file must be of the format base + "/" + <1+ char filename> */
1033   if (strlen(file) < _config_base_dir_len + 2) goto err;
1034   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1035   if (*(file + _config_base_dir_len) != '/') goto err;
1036
1037   return 1;
1038
1039 err:
1040   if (sock != NULL && sock->fd >= 0)
1041     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1042
1043   return 0;
1044 } /* }}} static int check_file_access */
1045
1046 /* when using a base dir, convert relative paths to absolute paths.
1047  * if necessary, modifies the "filename" pointer to point
1048  * to the new path created in "tmp".  "tmp" is provided
1049  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1050  *
1051  * this allows us to optimize for the expected case (absolute path)
1052  * with a no-op.
1053  */
1054 static void get_abs_path(char **filename, char *tmp)
1055 {
1056   assert(tmp != NULL);
1057   assert(filename != NULL && *filename != NULL);
1058
1059   if (config_base_dir == NULL || **filename == '/')
1060     return;
1061
1062   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1063   *filename = tmp;
1064 } /* }}} static int get_abs_path */
1065
1066 static int flush_file (const char *filename) /* {{{ */
1067 {
1068   cache_item_t *ci;
1069
1070   pthread_mutex_lock (&cache_lock);
1071
1072   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1073   if (ci == NULL)
1074   {
1075     pthread_mutex_unlock (&cache_lock);
1076     return (ENOENT);
1077   }
1078
1079   if (ci->values_num > 0)
1080   {
1081     /* Enqueue at head */
1082     enqueue_cache_item (ci, HEAD);
1083     pthread_cond_wait(&ci->flushed, &cache_lock);
1084   }
1085
1086   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1087    * may have been purged during our cond_wait() */
1088
1089   pthread_mutex_unlock(&cache_lock);
1090
1091   return (0);
1092 } /* }}} int flush_file */
1093
1094 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1095 {
1096   char *err = "Syntax error.\n";
1097
1098   if (cmd && cmd->syntax)
1099     err = cmd->syntax;
1100
1101   return send_response(sock, RESP_ERR, "Usage: %s", err);
1102 } /* }}} static int syntax_error() */
1103
1104 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1105 {
1106   uint64_t copy_queue_length;
1107   uint64_t copy_updates_received;
1108   uint64_t copy_flush_received;
1109   uint64_t copy_updates_written;
1110   uint64_t copy_data_sets_written;
1111   uint64_t copy_journal_bytes;
1112   uint64_t copy_journal_rotate;
1113
1114   uint64_t tree_nodes_number;
1115   uint64_t tree_depth;
1116
1117   pthread_mutex_lock (&stats_lock);
1118   copy_queue_length       = stats_queue_length;
1119   copy_updates_received   = stats_updates_received;
1120   copy_flush_received     = stats_flush_received;
1121   copy_updates_written    = stats_updates_written;
1122   copy_data_sets_written  = stats_data_sets_written;
1123   copy_journal_bytes      = stats_journal_bytes;
1124   copy_journal_rotate     = stats_journal_rotate;
1125   pthread_mutex_unlock (&stats_lock);
1126
1127   pthread_mutex_lock (&cache_lock);
1128   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1129   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1130   pthread_mutex_unlock (&cache_lock);
1131
1132   add_response_info(sock,
1133                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1134   add_response_info(sock,
1135                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1136   add_response_info(sock,
1137                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1138   add_response_info(sock,
1139                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1140   add_response_info(sock,
1141                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1142   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1143   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1144   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1145   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1146
1147   send_response(sock, RESP_OK, "Statistics follow\n");
1148
1149   return (0);
1150 } /* }}} int handle_request_stats */
1151
1152 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1153 {
1154   char *file, file_tmp[PATH_MAX];
1155   int status;
1156
1157   status = buffer_get_field (&buffer, &buffer_size, &file);
1158   if (status != 0)
1159   {
1160     return syntax_error(sock,cmd);
1161   }
1162   else
1163   {
1164     pthread_mutex_lock(&stats_lock);
1165     stats_flush_received++;
1166     pthread_mutex_unlock(&stats_lock);
1167
1168     get_abs_path(&file, file_tmp);
1169     if (!check_file_access(file, sock)) return 0;
1170
1171     status = flush_file (file);
1172     if (status == 0)
1173       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1174     else if (status == ENOENT)
1175     {
1176       /* no file in our tree; see whether it exists at all */
1177       struct stat statbuf;
1178
1179       memset(&statbuf, 0, sizeof(statbuf));
1180       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1181         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1182       else
1183         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1184     }
1185     else if (status < 0)
1186       return send_response(sock, RESP_ERR, "Internal error.\n");
1187     else
1188       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1189   }
1190
1191   /* NOTREACHED */
1192   assert(1==0);
1193 } /* }}} int handle_request_flush */
1194
1195 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1196 {
1197   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1198
1199   pthread_mutex_lock(&cache_lock);
1200   flush_old_values(-1);
1201   pthread_mutex_unlock(&cache_lock);
1202
1203   return send_response(sock, RESP_OK, "Started flush.\n");
1204 } /* }}} static int handle_request_flushall */
1205
1206 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1207 {
1208   int status;
1209   char *file, file_tmp[PATH_MAX];
1210   cache_item_t *ci;
1211
1212   status = buffer_get_field(&buffer, &buffer_size, &file);
1213   if (status != 0)
1214     return syntax_error(sock,cmd);
1215
1216   get_abs_path(&file, file_tmp);
1217
1218   pthread_mutex_lock(&cache_lock);
1219   ci = g_tree_lookup(cache_tree, file);
1220   if (ci == NULL)
1221   {
1222     pthread_mutex_unlock(&cache_lock);
1223     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1224   }
1225
1226   for (size_t i=0; i < ci->values_num; i++)
1227     add_response_info(sock, "%s\n", ci->values[i]);
1228
1229   pthread_mutex_unlock(&cache_lock);
1230   return send_response(sock, RESP_OK, "updates pending\n");
1231 } /* }}} static int handle_request_pending */
1232
1233 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1234 {
1235   int status;
1236   gboolean found;
1237   char *file, file_tmp[PATH_MAX];
1238
1239   status = buffer_get_field(&buffer, &buffer_size, &file);
1240   if (status != 0)
1241     return syntax_error(sock,cmd);
1242
1243   get_abs_path(&file, file_tmp);
1244   if (!check_file_access(file, sock)) return 0;
1245
1246   pthread_mutex_lock(&cache_lock);
1247   found = g_tree_remove(cache_tree, file);
1248   pthread_mutex_unlock(&cache_lock);
1249
1250   if (found == TRUE)
1251   {
1252     if (sock != NULL)
1253       journal_write("forget", file);
1254
1255     return send_response(sock, RESP_OK, "Gone!\n");
1256   }
1257   else
1258     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1259
1260   /* NOTREACHED */
1261   assert(1==0);
1262 } /* }}} static int handle_request_forget */
1263
1264 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1265 {
1266   cache_item_t *ci;
1267
1268   pthread_mutex_lock(&cache_lock);
1269
1270   ci = cache_queue_head;
1271   while (ci != NULL)
1272   {
1273     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1274     ci = ci->next;
1275   }
1276
1277   pthread_mutex_unlock(&cache_lock);
1278
1279   return send_response(sock, RESP_OK, "in queue.\n");
1280 } /* }}} int handle_request_queue */
1281
1282 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1283 {
1284   char *file, file_tmp[PATH_MAX];
1285   int values_num = 0;
1286   int status;
1287   char orig_buf[CMD_MAX];
1288
1289   cache_item_t *ci;
1290
1291   /* save it for the journal later */
1292   strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1293
1294   status = buffer_get_field (&buffer, &buffer_size, &file);
1295   if (status != 0)
1296     return syntax_error(sock,cmd);
1297
1298   pthread_mutex_lock(&stats_lock);
1299   stats_updates_received++;
1300   pthread_mutex_unlock(&stats_lock);
1301
1302   get_abs_path(&file, file_tmp);
1303   if (!check_file_access(file, sock)) return 0;
1304
1305   pthread_mutex_lock (&cache_lock);
1306   ci = g_tree_lookup (cache_tree, file);
1307
1308   if (ci == NULL) /* {{{ */
1309   {
1310     struct stat statbuf;
1311     cache_item_t *tmp;
1312
1313     /* don't hold the lock while we setup; stat(2) might block */
1314     pthread_mutex_unlock(&cache_lock);
1315
1316     memset (&statbuf, 0, sizeof (statbuf));
1317     status = stat (file, &statbuf);
1318     if (status != 0)
1319     {
1320       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1321
1322       status = errno;
1323       if (status == ENOENT)
1324         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1325       else
1326         return send_response(sock, RESP_ERR,
1327                              "stat failed with error %i.\n", status);
1328     }
1329     if (!S_ISREG (statbuf.st_mode))
1330       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1331
1332     if (access(file, R_OK|W_OK) != 0)
1333       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1334                            file, rrd_strerror(errno));
1335
1336     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1337     if (ci == NULL)
1338     {
1339       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1340
1341       return send_response(sock, RESP_ERR, "malloc failed.\n");
1342     }
1343     memset (ci, 0, sizeof (cache_item_t));
1344
1345     ci->file = strdup (file);
1346     if (ci->file == NULL)
1347     {
1348       free (ci);
1349       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1350
1351       return send_response(sock, RESP_ERR, "strdup failed.\n");
1352     }
1353
1354     wipe_ci_values(ci, now);
1355     ci->flags = CI_FLAGS_IN_TREE;
1356     pthread_cond_init(&ci->flushed, NULL);
1357
1358     pthread_mutex_lock(&cache_lock);
1359
1360     /* another UPDATE might have added this entry in the meantime */
1361     tmp = g_tree_lookup (cache_tree, file);
1362     if (tmp == NULL)
1363       g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1364     else
1365     {
1366       free_cache_item (ci);
1367       ci = tmp;
1368     }
1369
1370     /* state may have changed while we were unlocked */
1371     if (state == SHUTDOWN)
1372       return -1;
1373   } /* }}} */
1374   assert (ci != NULL);
1375
1376   /* don't re-write updates in replay mode */
1377   if (sock != NULL)
1378     journal_write("update", orig_buf);
1379
1380   while (buffer_size > 0)
1381   {
1382     char *value;
1383     time_t stamp;
1384     char *eostamp;
1385
1386     status = buffer_get_field (&buffer, &buffer_size, &value);
1387     if (status != 0)
1388     {
1389       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1390       break;
1391     }
1392
1393     /* make sure update time is always moving forward */
1394     stamp = strtol(value, &eostamp, 10);
1395     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1396     {
1397       pthread_mutex_unlock(&cache_lock);
1398       return send_response(sock, RESP_ERR,
1399                            "Cannot find timestamp in '%s'!\n", value);
1400     }
1401     else if (stamp <= ci->last_update_stamp)
1402     {
1403       pthread_mutex_unlock(&cache_lock);
1404       return send_response(sock, RESP_ERR,
1405                            "illegal attempt to update using time %ld when last"
1406                            " update time is %ld (minimum one second step)\n",
1407                            stamp, ci->last_update_stamp);
1408     }
1409     else
1410       ci->last_update_stamp = stamp;
1411
1412     if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
1413     {
1414       RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1415       continue;
1416     }
1417
1418     values_num++;
1419   }
1420
1421   if (((now - ci->last_flush_time) >= config_write_interval)
1422       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1423       && (ci->values_num > 0))
1424   {
1425     enqueue_cache_item (ci, TAIL);
1426   }
1427
1428   pthread_mutex_unlock (&cache_lock);
1429
1430   if (values_num < 1)
1431     return send_response(sock, RESP_ERR, "No values updated.\n");
1432   else
1433     return send_response(sock, RESP_OK,
1434                          "errors, enqueued %i value(s).\n", values_num);
1435
1436   /* NOTREACHED */
1437   assert(1==0);
1438
1439 } /* }}} int handle_request_update */
1440
1441 /* we came across a "WROTE" entry during journal replay.
1442  * throw away any values that we have accumulated for this file
1443  */
1444 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1445 {
1446   cache_item_t *ci;
1447   const char *file = buffer;
1448
1449   pthread_mutex_lock(&cache_lock);
1450
1451   ci = g_tree_lookup(cache_tree, file);
1452   if (ci == NULL)
1453   {
1454     pthread_mutex_unlock(&cache_lock);
1455     return (0);
1456   }
1457
1458   if (ci->values)
1459     rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1460
1461   wipe_ci_values(ci, now);
1462   remove_from_queue(ci);
1463
1464   pthread_mutex_unlock(&cache_lock);
1465   return (0);
1466 } /* }}} int handle_request_wrote */
1467
1468 /* start "BATCH" processing */
1469 static int batch_start (HANDLER_PROTO) /* {{{ */
1470 {
1471   int status;
1472   if (sock->batch_start)
1473     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1474
1475   status = send_response(sock, RESP_OK,
1476                          "Go ahead.  End with dot '.' on its own line.\n");
1477   sock->batch_start = time(NULL);
1478   sock->batch_cmd = 0;
1479
1480   return status;
1481 } /* }}} static int batch_start */
1482
1483 /* finish "BATCH" processing and return results to the client */
1484 static int batch_done (HANDLER_PROTO) /* {{{ */
1485 {
1486   assert(sock->batch_start);
1487   sock->batch_start = 0;
1488   sock->batch_cmd  = 0;
1489   return send_response(sock, RESP_OK, "errors\n");
1490 } /* }}} static int batch_done */
1491
1492 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1493 {
1494   return -1;
1495 } /* }}} static int handle_request_quit */
1496
1497 static command_t list_of_commands[] = { /* {{{ */
1498   {
1499     "UPDATE",
1500     handle_request_update,
1501     CMD_CONTEXT_ANY,
1502     "UPDATE <filename> <values> [<values> ...]\n"
1503     ,
1504     "Adds the given file to the internal cache if it is not yet known and\n"
1505     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1506     "for details.\n"
1507     "\n"
1508     "Each <values> has the following form:\n"
1509     "  <values> = <time>:<value>[:<value>[...]]\n"
1510     "See the rrdupdate(1) manpage for details.\n"
1511   },
1512   {
1513     "WROTE",
1514     handle_request_wrote,
1515     CMD_CONTEXT_JOURNAL,
1516     NULL,
1517     NULL
1518   },
1519   {
1520     "FLUSH",
1521     handle_request_flush,
1522     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1523     "FLUSH <filename>\n"
1524     ,
1525     "Adds the given filename to the head of the update queue and returns\n"
1526     "after it has been dequeued.\n"
1527   },
1528   {
1529     "FLUSHALL",
1530     handle_request_flushall,
1531     CMD_CONTEXT_CLIENT,
1532     "FLUSHALL\n"
1533     ,
1534     "Triggers writing of all pending updates.  Returns immediately.\n"
1535   },
1536   {
1537     "PENDING",
1538     handle_request_pending,
1539     CMD_CONTEXT_CLIENT,
1540     "PENDING <filename>\n"
1541     ,
1542     "Shows any 'pending' updates for a file, in order.\n"
1543     "The updates shown have not yet been written to the underlying RRD file.\n"
1544   },
1545   {
1546     "FORGET",
1547     handle_request_forget,
1548     CMD_CONTEXT_ANY,
1549     "FORGET <filename>\n"
1550     ,
1551     "Removes the file completely from the cache.\n"
1552     "Any pending updates for the file will be lost.\n"
1553   },
1554   {
1555     "QUEUE",
1556     handle_request_queue,
1557     CMD_CONTEXT_CLIENT,
1558     "QUEUE\n"
1559     ,
1560         "Shows all files in the output queue.\n"
1561     "The output is zero or more lines in the following format:\n"
1562     "(where <num_vals> is the number of values to be written)\n"
1563     "\n"
1564     "<num_vals> <filename>\n"
1565   },
1566   {
1567     "STATS",
1568     handle_request_stats,
1569     CMD_CONTEXT_CLIENT,
1570     "STATS\n"
1571     ,
1572     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1573     "a description of the values.\n"
1574   },
1575   {
1576     "HELP",
1577     handle_request_help,
1578     CMD_CONTEXT_CLIENT,
1579     "HELP [<command>]\n",
1580     NULL, /* special! */
1581   },
1582   {
1583     "BATCH",
1584     batch_start,
1585     CMD_CONTEXT_CLIENT,
1586     "BATCH\n"
1587     ,
1588     "The 'BATCH' command permits the client to initiate a bulk load\n"
1589     "   of commands to rrdcached.\n"
1590     "\n"
1591     "Usage:\n"
1592     "\n"
1593     "    client: BATCH\n"
1594     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1595     "    client: command #1\n"
1596     "    client: command #2\n"
1597     "    client: ... and so on\n"
1598     "    client: .\n"
1599     "    server: 2 errors\n"
1600     "    server: 7 message for command #7\n"
1601     "    server: 9 message for command #9\n"
1602     "\n"
1603     "For more information, consult the rrdcached(1) documentation.\n"
1604   },
1605   {
1606     ".",   /* BATCH terminator */
1607     batch_done,
1608     CMD_CONTEXT_BATCH,
1609     NULL,
1610     NULL
1611   },
1612   {
1613     "QUIT",
1614     handle_request_quit,
1615     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1616     "QUIT\n"
1617     ,
1618     "Disconnect from rrdcached.\n"
1619   }
1620 }; /* }}} command_t list_of_commands[] */
1621 static size_t list_of_commands_len = sizeof (list_of_commands)
1622   / sizeof (list_of_commands[0]);
1623
1624 static command_t *find_command(char *cmd)
1625 {
1626   size_t i;
1627
1628   for (i = 0; i < list_of_commands_len; i++)
1629     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1630       return (&list_of_commands[i]);
1631   return NULL;
1632 }
1633
1634 /* We currently use the index in the `list_of_commands' array as a bit position
1635  * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
1636  * outside these functions so that switching to a more elegant storage method
1637  * is easily possible. */
1638 static ssize_t find_command_index (const char *cmd) /* {{{ */
1639 {
1640   size_t i;
1641
1642   for (i = 0; i < list_of_commands_len; i++)
1643     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1644       return ((ssize_t) i);
1645   return (-1);
1646 } /* }}} ssize_t find_command_index */
1647
1648 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
1649     const char *cmd)
1650 {
1651   ssize_t i;
1652
1653   if (cmd == NULL)
1654     return (-1);
1655
1656   if ((strcasecmp ("QUIT", cmd) == 0)
1657       || (strcasecmp ("HELP", cmd) == 0))
1658     return (1);
1659   else if (strcmp (".", cmd) == 0)
1660     cmd = "BATCH";
1661
1662   i = find_command_index (cmd);
1663   if (i < 0)
1664     return (-1);
1665   assert (i < 32);
1666
1667   if ((sock->permissions & (1 << i)) != 0)
1668     return (1);
1669   return (0);
1670 } /* }}} int socket_permission_check */
1671
1672 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
1673     const char *cmd)
1674 {
1675   ssize_t i;
1676
1677   i = find_command_index (cmd);
1678   if (i < 0)
1679     return (-1);
1680   assert (i < 32);
1681
1682   sock->permissions |= (1 << i);
1683   return (0);
1684 } /* }}} int socket_permission_add */
1685
1686 /* check whether commands are received in the expected context */
1687 static int command_check_context(listen_socket_t *sock, command_t *cmd)
1688 {
1689   if (sock == NULL)
1690     return (cmd->context & CMD_CONTEXT_JOURNAL);
1691   else if (sock->batch_start)
1692     return (cmd->context & CMD_CONTEXT_BATCH);
1693   else
1694     return (cmd->context & CMD_CONTEXT_CLIENT);
1695
1696   /* NOTREACHED */
1697   assert(1==0);
1698 }
1699
1700 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1701 {
1702   int status;
1703   char *cmd_str;
1704   char *resp_txt;
1705   command_t *help = NULL;
1706
1707   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1708   if (status == 0)
1709     help = find_command(cmd_str);
1710
1711   if (help && (help->syntax || help->help))
1712   {
1713     char tmp[CMD_MAX];
1714
1715     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1716     resp_txt = tmp;
1717
1718     if (help->syntax)
1719       add_response_info(sock, "Usage: %s\n", help->syntax);
1720
1721     if (help->help)
1722       add_response_info(sock, "%s\n", help->help);
1723   }
1724   else
1725   {
1726     size_t i;
1727
1728     resp_txt = "Command overview\n";
1729
1730     for (i = 0; i < list_of_commands_len; i++)
1731     {
1732       if (list_of_commands[i].syntax == NULL)
1733         continue;
1734       add_response_info (sock, "%s", list_of_commands[i].syntax);
1735     }
1736   }
1737
1738   return send_response(sock, RESP_OK, resp_txt);
1739 } /* }}} int handle_request_help */
1740
1741 /* if sock==NULL, we are in journal replay mode */
1742 static int handle_request (DISPATCH_PROTO) /* {{{ */
1743 {
1744   char *buffer_ptr = buffer;
1745   char *cmd_str = NULL;
1746   command_t *cmd = NULL;
1747   int status;
1748
1749   assert (buffer[buffer_size - 1] == '\0');
1750
1751   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1752   if (status != 0)
1753   {
1754     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1755     return (-1);
1756   }
1757
1758   if (sock != NULL && sock->batch_start)
1759     sock->batch_cmd++;
1760
1761   cmd = find_command(cmd_str);
1762   if (!cmd)
1763     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1764
1765   if (!socket_permission_check (sock, cmd->cmd))
1766     return send_response(sock, RESP_ERR, "Permission denied.\n");
1767
1768   if (!command_check_context(sock, cmd))
1769     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1770
1771   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1772 } /* }}} int handle_request */
1773
1774 static void journal_set_free (journal_set *js) /* {{{ */
1775 {
1776   if (js == NULL)
1777     return;
1778
1779   rrd_free_ptrs((void ***) &js->files, &js->files_num);
1780
1781   free(js);
1782 } /* }}} journal_set_free */
1783
1784 static void journal_set_remove (journal_set *js) /* {{{ */
1785 {
1786   if (js == NULL)
1787     return;
1788
1789   for (uint i=0; i < js->files_num; i++)
1790   {
1791     RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
1792     unlink(js->files[i]);
1793   }
1794 } /* }}} journal_set_remove */
1795
1796 /* close current journal file handle.
1797  * MUST hold journal_lock before calling */
1798 static void journal_close(void) /* {{{ */
1799 {
1800   if (journal_fh != NULL)
1801   {
1802     if (fclose(journal_fh) != 0)
1803       RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
1804   }
1805
1806   journal_fh = NULL;
1807   journal_size = 0;
1808 } /* }}} journal_close */
1809
1810 /* MUST hold journal_lock before calling */
1811 static void journal_new_file(void) /* {{{ */
1812 {
1813   struct timeval now;
1814   int  new_fd;
1815   char new_file[PATH_MAX + 1];
1816
1817   assert(journal_dir != NULL);
1818   assert(journal_cur != NULL);
1819
1820   journal_close();
1821
1822   gettimeofday(&now, NULL);
1823   /* this format assures that the files sort in strcmp() order */
1824   snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
1825            journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
1826
1827   new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
1828                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1829   if (new_fd < 0)
1830     goto error;
1831
1832   journal_fh = fdopen(new_fd, "a");
1833   if (journal_fh == NULL)
1834     goto error;
1835
1836   journal_size = ftell(journal_fh);
1837   RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
1838
1839   /* record the file in the journal set */
1840   rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
1841
1842   return;
1843
1844 error:
1845   RRDD_LOG(LOG_CRIT,
1846            "JOURNALING DISABLED: Error while trying to create %s : %s",
1847            new_file, rrd_strerror(errno));
1848   RRDD_LOG(LOG_CRIT,
1849            "JOURNALING DISABLED: All values will be flushed at shutdown");
1850
1851   close(new_fd);
1852   config_flush_at_shutdown = 1;
1853
1854 } /* }}} journal_new_file */
1855
1856 /* MUST NOT hold journal_lock before calling this */
1857 static void journal_rotate(void) /* {{{ */
1858 {
1859   journal_set *old_js = NULL;
1860
1861   if (journal_dir == NULL)
1862     return;
1863
1864   RRDD_LOG(LOG_DEBUG, "rotating journals");
1865
1866   pthread_mutex_lock(&stats_lock);
1867   ++stats_journal_rotate;
1868   pthread_mutex_unlock(&stats_lock);
1869
1870   pthread_mutex_lock(&journal_lock);
1871
1872   journal_close();
1873
1874   /* rotate the journal sets */
1875   old_js = journal_old;
1876   journal_old = journal_cur;
1877   journal_cur = calloc(1, sizeof(journal_set));
1878
1879   if (journal_cur != NULL)
1880     journal_new_file();
1881   else
1882     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
1883
1884   pthread_mutex_unlock(&journal_lock);
1885
1886   journal_set_remove(old_js);
1887   journal_set_free  (old_js);
1888
1889 } /* }}} static void journal_rotate */
1890
1891 /* MUST hold journal_lock when calling */
1892 static void journal_done(void) /* {{{ */
1893 {
1894   if (journal_cur == NULL)
1895     return;
1896
1897   journal_close();
1898
1899   if (config_flush_at_shutdown)
1900   {
1901     RRDD_LOG(LOG_INFO, "removing journals");
1902     journal_set_remove(journal_old);
1903     journal_set_remove(journal_cur);
1904   }
1905   else
1906   {
1907     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1908              "journals will be used at next startup");
1909   }
1910
1911   journal_set_free(journal_cur);
1912   journal_set_free(journal_old);
1913   free(journal_dir);
1914
1915 } /* }}} static void journal_done */
1916
1917 static int journal_write(char *cmd, char *args) /* {{{ */
1918 {
1919   int chars;
1920
1921   if (journal_fh == NULL)
1922     return 0;
1923
1924   pthread_mutex_lock(&journal_lock);
1925   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1926   journal_size += chars;
1927
1928   if (journal_size > JOURNAL_MAX)
1929     journal_new_file();
1930
1931   pthread_mutex_unlock(&journal_lock);
1932
1933   if (chars > 0)
1934   {
1935     pthread_mutex_lock(&stats_lock);
1936     stats_journal_bytes += chars;
1937     pthread_mutex_unlock(&stats_lock);
1938   }
1939
1940   return chars;
1941 } /* }}} static int journal_write */
1942
1943 static int journal_replay (const char *file) /* {{{ */
1944 {
1945   FILE *fh;
1946   int entry_cnt = 0;
1947   int fail_cnt = 0;
1948   uint64_t line = 0;
1949   char entry[CMD_MAX];
1950   time_t now;
1951
1952   if (file == NULL) return 0;
1953
1954   {
1955     char *reason = "unknown error";
1956     int status = 0;
1957     struct stat statbuf;
1958
1959     memset(&statbuf, 0, sizeof(statbuf));
1960     if (stat(file, &statbuf) != 0)
1961     {
1962       reason = "stat error";
1963       status = errno;
1964     }
1965     else if (!S_ISREG(statbuf.st_mode))
1966     {
1967       reason = "not a regular file";
1968       status = EPERM;
1969     }
1970     if (statbuf.st_uid != daemon_uid)
1971     {
1972       reason = "not owned by daemon user";
1973       status = EACCES;
1974     }
1975     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1976     {
1977       reason = "must not be user/group writable";
1978       status = EACCES;
1979     }
1980
1981     if (status != 0)
1982     {
1983       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1984                file, rrd_strerror(status), reason);
1985       return 0;
1986     }
1987   }
1988
1989   fh = fopen(file, "r");
1990   if (fh == NULL)
1991   {
1992     if (errno != ENOENT)
1993       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1994                file, rrd_strerror(errno));
1995     return 0;
1996   }
1997   else
1998     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1999
2000   now = time(NULL);
2001
2002   while(!feof(fh))
2003   {
2004     size_t entry_len;
2005
2006     ++line;
2007     if (fgets(entry, sizeof(entry), fh) == NULL)
2008       break;
2009     entry_len = strlen(entry);
2010
2011     /* check \n termination in case journal writing crashed mid-line */
2012     if (entry_len == 0)
2013       continue;
2014     else if (entry[entry_len - 1] != '\n')
2015     {
2016       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2017       ++fail_cnt;
2018       continue;
2019     }
2020
2021     entry[entry_len - 1] = '\0';
2022
2023     if (handle_request(NULL, now, entry, entry_len) == 0)
2024       ++entry_cnt;
2025     else
2026       ++fail_cnt;
2027   }
2028
2029   fclose(fh);
2030
2031   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2032            entry_cnt, fail_cnt);
2033
2034   return entry_cnt > 0 ? 1 : 0;
2035 } /* }}} static int journal_replay */
2036
2037 static int journal_sort(const void *v1, const void *v2)
2038 {
2039   char **jn1 = (char **) v1;
2040   char **jn2 = (char **) v2;
2041
2042   return strcmp(*jn1,*jn2);
2043 }
2044
2045 static void journal_init(void) /* {{{ */
2046 {
2047   int had_journal = 0;
2048   DIR *dir;
2049   struct dirent *dent;
2050   char path[PATH_MAX+1];
2051
2052   if (journal_dir == NULL) return;
2053
2054   pthread_mutex_lock(&journal_lock);
2055
2056   journal_cur = calloc(1, sizeof(journal_set));
2057   if (journal_cur == NULL)
2058   {
2059     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2060     return;
2061   }
2062
2063   RRDD_LOG(LOG_INFO, "checking for journal files");
2064
2065   /* Handle old journal files during transition.  This gives them the
2066    * correct sort order.  TODO: remove after first release
2067    */
2068   {
2069     char old_path[PATH_MAX+1];
2070     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2071     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2072     rename(old_path, path);
2073
2074     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE        );
2075     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2076     rename(old_path, path);
2077   }
2078
2079   dir = opendir(journal_dir);
2080   while ((dent = readdir(dir)) != NULL)
2081   {
2082     /* looks like a journal file? */
2083     if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2084       continue;
2085
2086     snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2087
2088     if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2089     {
2090       RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2091                dent->d_name);
2092       break;
2093     }
2094   }
2095   closedir(dir);
2096
2097   qsort(journal_cur->files, journal_cur->files_num,
2098         sizeof(journal_cur->files[0]), journal_sort);
2099
2100   for (uint i=0; i < journal_cur->files_num; i++)
2101     had_journal += journal_replay(journal_cur->files[i]);
2102
2103   journal_new_file();
2104
2105   /* it must have been a crash.  start a flush */
2106   if (had_journal && config_flush_at_shutdown)
2107     flush_old_values(-1);
2108
2109   pthread_mutex_unlock(&journal_lock);
2110
2111   RRDD_LOG(LOG_INFO, "journal processing complete");
2112
2113 } /* }}} static void journal_init */
2114
2115 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2116 {
2117   assert(sock != NULL);
2118
2119   free(sock->rbuf);  sock->rbuf = NULL;
2120   free(sock->wbuf);  sock->wbuf = NULL;
2121   free(sock);
2122 } /* }}} void free_listen_socket */
2123
2124 static void close_connection(listen_socket_t *sock) /* {{{ */
2125 {
2126   if (sock->fd >= 0)
2127   {
2128     close(sock->fd);
2129     sock->fd = -1;
2130   }
2131
2132   free_listen_socket(sock);
2133
2134 } /* }}} void close_connection */
2135
2136 static void *connection_thread_main (void *args) /* {{{ */
2137 {
2138   listen_socket_t *sock;
2139   int fd;
2140
2141   sock = (listen_socket_t *) args;
2142   fd = sock->fd;
2143
2144   /* init read buffers */
2145   sock->next_read = sock->next_cmd = 0;
2146   sock->rbuf = malloc(RBUF_SIZE);
2147   if (sock->rbuf == NULL)
2148   {
2149     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2150     close_connection(sock);
2151     return NULL;
2152   }
2153
2154   pthread_mutex_lock (&connection_threads_lock);
2155   connection_threads_num++;
2156   pthread_mutex_unlock (&connection_threads_lock);
2157
2158   while (state == RUNNING)
2159   {
2160     char *cmd;
2161     ssize_t cmd_len;
2162     ssize_t rbytes;
2163     time_t now;
2164
2165     struct pollfd pollfd;
2166     int status;
2167
2168     pollfd.fd = fd;
2169     pollfd.events = POLLIN | POLLPRI;
2170     pollfd.revents = 0;
2171
2172     status = poll (&pollfd, 1, /* timeout = */ 500);
2173     if (state != RUNNING)
2174       break;
2175     else if (status == 0) /* timeout */
2176       continue;
2177     else if (status < 0) /* error */
2178     {
2179       status = errno;
2180       if (status != EINTR)
2181         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2182       continue;
2183     }
2184
2185     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2186       break;
2187     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2188     {
2189       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2190           "poll(2) returned something unexpected: %#04hx",
2191           pollfd.revents);
2192       break;
2193     }
2194
2195     rbytes = read(fd, sock->rbuf + sock->next_read,
2196                   RBUF_SIZE - sock->next_read);
2197     if (rbytes < 0)
2198     {
2199       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2200       break;
2201     }
2202     else if (rbytes == 0)
2203       break; /* eof */
2204
2205     sock->next_read += rbytes;
2206
2207     if (sock->batch_start)
2208       now = sock->batch_start;
2209     else
2210       now = time(NULL);
2211
2212     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2213     {
2214       status = handle_request (sock, now, cmd, cmd_len+1);
2215       if (status != 0)
2216         goto out_close;
2217     }
2218   }
2219
2220 out_close:
2221   close_connection(sock);
2222
2223   /* Remove this thread from the connection threads list */
2224   pthread_mutex_lock (&connection_threads_lock);
2225   connection_threads_num--;
2226   if (connection_threads_num <= 0)
2227     pthread_cond_broadcast(&connection_threads_done);
2228   pthread_mutex_unlock (&connection_threads_lock);
2229
2230   return (NULL);
2231 } /* }}} void *connection_thread_main */
2232
2233 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2234 {
2235   int fd;
2236   struct sockaddr_un sa;
2237   listen_socket_t *temp;
2238   int status;
2239   const char *path;
2240
2241   path = sock->addr;
2242   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2243     path += strlen("unix:");
2244
2245   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2246       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2247   if (temp == NULL)
2248   {
2249     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2250     return (-1);
2251   }
2252   listen_fds = temp;
2253   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2254
2255   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2256   if (fd < 0)
2257   {
2258     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2259              rrd_strerror(errno));
2260     return (-1);
2261   }
2262
2263   memset (&sa, 0, sizeof (sa));
2264   sa.sun_family = AF_UNIX;
2265   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2266
2267   /* if we've gotten this far, we own the pid file.  any daemon started
2268    * with the same args must not be alive.  therefore, ensure that we can
2269    * create the socket...
2270    */
2271   unlink(path);
2272
2273   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2274   if (status != 0)
2275   {
2276     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2277              path, rrd_strerror(errno));
2278     close (fd);
2279     return (-1);
2280   }
2281
2282   status = listen (fd, /* backlog = */ 10);
2283   if (status != 0)
2284   {
2285     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2286              path, rrd_strerror(errno));
2287     close (fd);
2288     unlink (path);
2289     return (-1);
2290   }
2291
2292   listen_fds[listen_fds_num].fd = fd;
2293   listen_fds[listen_fds_num].family = PF_UNIX;
2294   strncpy(listen_fds[listen_fds_num].addr, path,
2295           sizeof (listen_fds[listen_fds_num].addr) - 1);
2296   listen_fds_num++;
2297
2298   return (0);
2299 } /* }}} int open_listen_socket_unix */
2300
2301 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2302 {
2303   struct addrinfo ai_hints;
2304   struct addrinfo *ai_res;
2305   struct addrinfo *ai_ptr;
2306   char addr_copy[NI_MAXHOST];
2307   char *addr;
2308   char *port;
2309   int status;
2310
2311   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2312   addr_copy[sizeof (addr_copy) - 1] = 0;
2313   addr = addr_copy;
2314
2315   memset (&ai_hints, 0, sizeof (ai_hints));
2316   ai_hints.ai_flags = 0;
2317 #ifdef AI_ADDRCONFIG
2318   ai_hints.ai_flags |= AI_ADDRCONFIG;
2319 #endif
2320   ai_hints.ai_family = AF_UNSPEC;
2321   ai_hints.ai_socktype = SOCK_STREAM;
2322
2323   port = NULL;
2324   if (*addr == '[') /* IPv6+port format */
2325   {
2326     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2327     addr++;
2328
2329     port = strchr (addr, ']');
2330     if (port == NULL)
2331     {
2332       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2333       return (-1);
2334     }
2335     *port = 0;
2336     port++;
2337
2338     if (*port == ':')
2339       port++;
2340     else if (*port == 0)
2341       port = NULL;
2342     else
2343     {
2344       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2345       return (-1);
2346     }
2347   } /* if (*addr = ']') */
2348   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2349   {
2350     port = rindex(addr, ':');
2351     if (port != NULL)
2352     {
2353       *port = 0;
2354       port++;
2355     }
2356   }
2357   ai_res = NULL;
2358   status = getaddrinfo (addr,
2359                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2360                         &ai_hints, &ai_res);
2361   if (status != 0)
2362   {
2363     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2364              addr, gai_strerror (status));
2365     return (-1);
2366   }
2367
2368   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2369   {
2370     int fd;
2371     listen_socket_t *temp;
2372     int one = 1;
2373
2374     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2375         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2376     if (temp == NULL)
2377     {
2378       fprintf (stderr,
2379                "rrdcached: open_listen_socket_network: realloc failed.\n");
2380       continue;
2381     }
2382     listen_fds = temp;
2383     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2384
2385     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2386     if (fd < 0)
2387     {
2388       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2389                rrd_strerror(errno));
2390       continue;
2391     }
2392
2393     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2394
2395     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2396     if (status != 0)
2397     {
2398       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2399                sock->addr, rrd_strerror(errno));
2400       close (fd);
2401       continue;
2402     }
2403
2404     status = listen (fd, /* backlog = */ 10);
2405     if (status != 0)
2406     {
2407       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2408                sock->addr, rrd_strerror(errno));
2409       close (fd);
2410       freeaddrinfo(ai_res);
2411       return (-1);
2412     }
2413
2414     listen_fds[listen_fds_num].fd = fd;
2415     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2416     listen_fds_num++;
2417   } /* for (ai_ptr) */
2418
2419   freeaddrinfo(ai_res);
2420   return (0);
2421 } /* }}} static int open_listen_socket_network */
2422
2423 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2424 {
2425   assert(sock != NULL);
2426   assert(sock->addr != NULL);
2427
2428   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2429       || sock->addr[0] == '/')
2430     return (open_listen_socket_unix(sock));
2431   else
2432     return (open_listen_socket_network(sock));
2433 } /* }}} int open_listen_socket */
2434
2435 static int close_listen_sockets (void) /* {{{ */
2436 {
2437   size_t i;
2438
2439   for (i = 0; i < listen_fds_num; i++)
2440   {
2441     close (listen_fds[i].fd);
2442
2443     if (listen_fds[i].family == PF_UNIX)
2444       unlink(listen_fds[i].addr);
2445   }
2446
2447   free (listen_fds);
2448   listen_fds = NULL;
2449   listen_fds_num = 0;
2450
2451   return (0);
2452 } /* }}} int close_listen_sockets */
2453
2454 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2455 {
2456   struct pollfd *pollfds;
2457   int pollfds_num;
2458   int status;
2459   int i;
2460
2461   if (listen_fds_num < 1)
2462   {
2463     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2464     return (NULL);
2465   }
2466
2467   pollfds_num = listen_fds_num;
2468   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2469   if (pollfds == NULL)
2470   {
2471     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2472     return (NULL);
2473   }
2474   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2475
2476   RRDD_LOG(LOG_INFO, "listening for connections");
2477
2478   while (state == RUNNING)
2479   {
2480     for (i = 0; i < pollfds_num; i++)
2481     {
2482       pollfds[i].fd = listen_fds[i].fd;
2483       pollfds[i].events = POLLIN | POLLPRI;
2484       pollfds[i].revents = 0;
2485     }
2486
2487     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2488     if (state != RUNNING)
2489       break;
2490     else if (status == 0) /* timeout */
2491       continue;
2492     else if (status < 0) /* error */
2493     {
2494       status = errno;
2495       if (status != EINTR)
2496       {
2497         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2498       }
2499       continue;
2500     }
2501
2502     for (i = 0; i < pollfds_num; i++)
2503     {
2504       listen_socket_t *client_sock;
2505       struct sockaddr_storage client_sa;
2506       socklen_t client_sa_size;
2507       pthread_t tid;
2508       pthread_attr_t attr;
2509
2510       if (pollfds[i].revents == 0)
2511         continue;
2512
2513       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2514       {
2515         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2516             "poll(2) returned something unexpected for listen FD #%i.",
2517             pollfds[i].fd);
2518         continue;
2519       }
2520
2521       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2522       if (client_sock == NULL)
2523       {
2524         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2525         continue;
2526       }
2527       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2528
2529       client_sa_size = sizeof (client_sa);
2530       client_sock->fd = accept (pollfds[i].fd,
2531           (struct sockaddr *) &client_sa, &client_sa_size);
2532       if (client_sock->fd < 0)
2533       {
2534         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2535         free(client_sock);
2536         continue;
2537       }
2538
2539       pthread_attr_init (&attr);
2540       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2541
2542       status = pthread_create (&tid, &attr, connection_thread_main,
2543                                client_sock);
2544       if (status != 0)
2545       {
2546         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2547         close_connection(client_sock);
2548         continue;
2549       }
2550     } /* for (pollfds_num) */
2551   } /* while (state == RUNNING) */
2552
2553   RRDD_LOG(LOG_INFO, "starting shutdown");
2554
2555   close_listen_sockets ();
2556
2557   pthread_mutex_lock (&connection_threads_lock);
2558   while (connection_threads_num > 0)
2559     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2560   pthread_mutex_unlock (&connection_threads_lock);
2561
2562   free(pollfds);
2563
2564   return (NULL);
2565 } /* }}} void *listen_thread_main */
2566
2567 static int daemonize (void) /* {{{ */
2568 {
2569   int pid_fd;
2570   char *base_dir;
2571
2572   daemon_uid = geteuid();
2573
2574   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2575   if (pid_fd < 0)
2576     pid_fd = check_pidfile();
2577   if (pid_fd < 0)
2578     return pid_fd;
2579
2580   /* open all the listen sockets */
2581   if (config_listen_address_list_len > 0)
2582   {
2583     for (size_t i = 0; i < config_listen_address_list_len; i++)
2584       open_listen_socket (config_listen_address_list[i]);
2585
2586     rrd_free_ptrs((void ***) &config_listen_address_list,
2587                   &config_listen_address_list_len);
2588   }
2589   else
2590   {
2591     listen_socket_t sock;
2592     memset(&sock, 0, sizeof(sock));
2593     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2594     open_listen_socket (&sock);
2595   }
2596
2597   if (listen_fds_num < 1)
2598   {
2599     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2600     goto error;
2601   }
2602
2603   if (!stay_foreground)
2604   {
2605     pid_t child;
2606
2607     child = fork ();
2608     if (child < 0)
2609     {
2610       fprintf (stderr, "daemonize: fork(2) failed.\n");
2611       goto error;
2612     }
2613     else if (child > 0)
2614       exit(0);
2615
2616     /* Become session leader */
2617     setsid ();
2618
2619     /* Open the first three file descriptors to /dev/null */
2620     close (2);
2621     close (1);
2622     close (0);
2623
2624     open ("/dev/null", O_RDWR);
2625     if (dup(0) == -1 || dup(0) == -1){
2626         RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2627     }
2628   } /* if (!stay_foreground) */
2629
2630   /* Change into the /tmp directory. */
2631   base_dir = (config_base_dir != NULL)
2632     ? config_base_dir
2633     : "/tmp";
2634
2635   if (chdir (base_dir) != 0)
2636   {
2637     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2638     goto error;
2639   }
2640
2641   install_signal_handlers();
2642
2643   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2644   RRDD_LOG(LOG_INFO, "starting up");
2645
2646   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2647                                 (GDestroyNotify) free_cache_item);
2648   if (cache_tree == NULL)
2649   {
2650     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2651     goto error;
2652   }
2653
2654   return write_pidfile (pid_fd);
2655
2656 error:
2657   remove_pidfile();
2658   return -1;
2659 } /* }}} int daemonize */
2660
2661 static int cleanup (void) /* {{{ */
2662 {
2663   pthread_cond_broadcast (&flush_cond);
2664   pthread_join (flush_thread, NULL);
2665
2666   pthread_cond_broadcast (&queue_cond);
2667   for (int i = 0; i < config_queue_threads; i++)
2668     pthread_join (queue_threads[i], NULL);
2669
2670   if (config_flush_at_shutdown)
2671   {
2672     assert(cache_queue_head == NULL);
2673     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2674   }
2675
2676   free(queue_threads);
2677   free(config_base_dir);
2678   free(config_pid_file);
2679
2680   pthread_mutex_lock(&cache_lock);
2681   g_tree_destroy(cache_tree);
2682
2683   pthread_mutex_lock(&journal_lock);
2684   journal_done();
2685
2686   RRDD_LOG(LOG_INFO, "goodbye");
2687   closelog ();
2688
2689   remove_pidfile ();
2690
2691   return (0);
2692 } /* }}} int cleanup */
2693
2694 static int read_options (int argc, char **argv) /* {{{ */
2695 {
2696   int option;
2697   int status = 0;
2698
2699   char **permissions = NULL;
2700   size_t permissions_len = 0;
2701
2702   while ((option = getopt(argc, argv, "gl:P:f:w:z:t:Bb:p:Fj:h?")) != -1)
2703   {
2704     switch (option)
2705     {
2706       case 'g':
2707         stay_foreground=1;
2708         break;
2709
2710       case 'l':
2711       {
2712         listen_socket_t *new;
2713
2714         new = malloc(sizeof(listen_socket_t));
2715         if (new == NULL)
2716         {
2717           fprintf(stderr, "read_options: malloc failed.\n");
2718           return(2);
2719         }
2720         memset(new, 0, sizeof(listen_socket_t));
2721
2722         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2723
2724         /* Add permissions to the socket {{{ */
2725         if (permissions_len != 0)
2726         {
2727           size_t i;
2728           for (i = 0; i < permissions_len; i++)
2729           {
2730             status = socket_permission_add (new, permissions[i]);
2731             if (status != 0)
2732             {
2733               fprintf (stderr, "read_options: Adding permission \"%s\" to "
2734                   "socket failed. Most likely, this permission doesn't "
2735                   "exist. Check your command line.\n", permissions[i]);
2736               status = 4;
2737             }
2738           }
2739         }
2740         else /* if (permissions_len == 0) */
2741         {
2742           /* Add permission for ALL commands to the socket. */
2743           size_t i;
2744           for (i = 0; i < list_of_commands_len; i++)
2745           {
2746             status = socket_permission_add (new, list_of_commands[i].cmd);
2747             if (status != 0)
2748             {
2749               fprintf (stderr, "read_options: Adding permission \"%s\" to "
2750                   "socket failed. This should never happen, ever! Sorry.\n",
2751                   permissions[i]);
2752               status = 4;
2753             }
2754           }
2755         }
2756         /* }}} Done adding permissions. */
2757
2758         if (!rrd_add_ptr((void ***)&config_listen_address_list,
2759                          &config_listen_address_list_len, new))
2760         {
2761           fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2762           return (2);
2763         }
2764       }
2765       break;
2766
2767       case 'P':
2768       {
2769         char *optcopy;
2770         char *saveptr;
2771         char *dummy;
2772         char *ptr;
2773
2774         rrd_free_ptrs ((void *) &permissions, &permissions_len);
2775
2776         optcopy = strdup (optarg);
2777         dummy = optcopy;
2778         saveptr = NULL;
2779         while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
2780         {
2781           dummy = NULL;
2782           rrd_add_strdup ((void *) &permissions, &permissions_len, ptr);
2783         }
2784
2785         free (optcopy);
2786       }
2787       break;
2788
2789       case 'f':
2790       {
2791         int temp;
2792
2793         temp = atoi (optarg);
2794         if (temp > 0)
2795           config_flush_interval = temp;
2796         else
2797         {
2798           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2799           status = 3;
2800         }
2801       }
2802       break;
2803
2804       case 'w':
2805       {
2806         int temp;
2807
2808         temp = atoi (optarg);
2809         if (temp > 0)
2810           config_write_interval = temp;
2811         else
2812         {
2813           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2814           status = 2;
2815         }
2816       }
2817       break;
2818
2819       case 'z':
2820       {
2821         int temp;
2822
2823         temp = atoi(optarg);
2824         if (temp > 0)
2825           config_write_jitter = temp;
2826         else
2827         {
2828           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2829           status = 2;
2830         }
2831
2832         break;
2833       }
2834
2835       case 't':
2836       {
2837         int threads;
2838         threads = atoi(optarg);
2839         if (threads >= 1)
2840           config_queue_threads = threads;
2841         else
2842         {
2843           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2844           return 1;
2845         }
2846       }
2847       break;
2848
2849       case 'B':
2850         config_write_base_only = 1;
2851         break;
2852
2853       case 'b':
2854       {
2855         size_t len;
2856         char base_realpath[PATH_MAX];
2857
2858         if (config_base_dir != NULL)
2859           free (config_base_dir);
2860         config_base_dir = strdup (optarg);
2861         if (config_base_dir == NULL)
2862         {
2863           fprintf (stderr, "read_options: strdup failed.\n");
2864           return (3);
2865         }
2866
2867         /* make sure that the base directory is not resolved via
2868          * symbolic links.  this makes some performance-enhancing
2869          * assumptions possible (we don't have to resolve paths
2870          * that start with a "/")
2871          */
2872         if (realpath(config_base_dir, base_realpath) == NULL)
2873         {
2874           fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2875           return 5;
2876         }
2877         else if (strncmp(config_base_dir,
2878                          base_realpath, sizeof(base_realpath)) != 0)
2879         {
2880           fprintf(stderr,
2881                   "Base directory (-b) resolved via file system links!\n"
2882                   "Please consult rrdcached '-b' documentation!\n"
2883                   "Consider specifying the real directory (%s)\n",
2884                   base_realpath);
2885           return 5;
2886         }
2887
2888         len = strlen (config_base_dir);
2889         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2890         {
2891           config_base_dir[len - 1] = 0;
2892           len--;
2893         }
2894
2895         if (len < 1)
2896         {
2897           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2898           return (4);
2899         }
2900
2901         _config_base_dir_len = len;
2902       }
2903       break;
2904
2905       case 'p':
2906       {
2907         if (config_pid_file != NULL)
2908           free (config_pid_file);
2909         config_pid_file = strdup (optarg);
2910         if (config_pid_file == NULL)
2911         {
2912           fprintf (stderr, "read_options: strdup failed.\n");
2913           return (3);
2914         }
2915       }
2916       break;
2917
2918       case 'F':
2919         config_flush_at_shutdown = 1;
2920         break;
2921
2922       case 'j':
2923       {
2924         struct stat statbuf;
2925         const char *dir = journal_dir = strdup(optarg);
2926
2927         status = stat(dir, &statbuf);
2928         if (status != 0)
2929         {
2930           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2931           return 6;
2932         }
2933
2934         if (!S_ISDIR(statbuf.st_mode)
2935             || access(dir, R_OK|W_OK|X_OK) != 0)
2936         {
2937           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2938                   errno ? rrd_strerror(errno) : "");
2939           return 6;
2940         }
2941       }
2942       break;
2943
2944       case 'h':
2945       case '?':
2946         printf ("RRDCacheD %s\n"
2947             "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
2948             "\n"
2949             "Usage: rrdcached [options]\n"
2950             "\n"
2951             "Valid options are:\n"
2952             "  -l <address>  Socket address to listen to.\n"
2953             "  -P <perms>    Sets the permissions to assign to all following "
2954                             "sockets\n"
2955             "  -w <seconds>  Interval in which to write data.\n"
2956             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2957             "  -t <threads>  Number of write threads.\n"
2958             "  -f <seconds>  Interval in which to flush dead data.\n"
2959             "  -p <file>     Location of the PID-file.\n"
2960             "  -b <dir>      Base directory to change to.\n"
2961             "  -B            Restrict file access to paths within -b <dir>\n"
2962             "  -g            Do not fork and run in the foreground.\n"
2963             "  -j <dir>      Directory in which to create the journal files.\n"
2964             "  -F            Always flush all updates at shutdown\n"
2965             "\n"
2966             "For more information and a detailed description of all options "
2967             "please refer\n"
2968             "to the rrdcached(1) manual page.\n",
2969             VERSION);
2970         status = -1;
2971         break;
2972     } /* switch (option) */
2973   } /* while (getopt) */
2974
2975   /* advise the user when values are not sane */
2976   if (config_flush_interval < 2 * config_write_interval)
2977     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2978             " 2x write interval (-w) !\n");
2979   if (config_write_jitter > config_write_interval)
2980     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2981             " write interval (-w) !\n");
2982
2983   if (config_write_base_only && config_base_dir == NULL)
2984     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2985             "  Consult the rrdcached documentation\n");
2986
2987   if (journal_dir == NULL)
2988     config_flush_at_shutdown = 1;
2989
2990   rrd_free_ptrs ((void *) &permissions, &permissions_len);
2991
2992   return (status);
2993 } /* }}} int read_options */
2994
2995 int main (int argc, char **argv)
2996 {
2997   int status;
2998
2999   status = read_options (argc, argv);
3000   if (status != 0)
3001   {
3002     if (status < 0)
3003       status = 0;
3004     return (status);
3005   }
3006
3007   status = daemonize ();
3008   if (status != 0)
3009   {
3010     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3011     return (1);
3012   }
3013
3014   journal_init();
3015
3016   /* start the queue threads */
3017   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3018   if (queue_threads == NULL)
3019   {
3020     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3021     cleanup();
3022     return (1);
3023   }
3024   for (int i = 0; i < config_queue_threads; i++)
3025   {
3026     memset (&queue_threads[i], 0, sizeof (*queue_threads));
3027     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3028     if (status != 0)
3029     {
3030       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3031       cleanup();
3032       return (1);
3033     }
3034   }
3035
3036   /* start the flush thread */
3037   memset(&flush_thread, 0, sizeof(flush_thread));
3038   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3039   if (status != 0)
3040   {
3041     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3042     cleanup();
3043     return (1);
3044   }
3045
3046   listen_thread_main (NULL);
3047   cleanup ();
3048
3049   return (0);
3050 } /* int main */
3051
3052 /*
3053  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3054  */