rrdcached: Create the base directory on startup. -- Sebastian Harl
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008,2009 Florian octo Forster
4  * Copyright (C) 2008,2009 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
73
74 #include "rrd.h"
75 #include "rrd_client.h"
76
77 #include <stdlib.h>
78
79 #ifndef WIN32
80 #ifdef HAVE_STDINT_H
81 #  include <stdint.h>
82 #endif
83 #include <unistd.h>
84 #include <strings.h>
85 #include <inttypes.h>
86 #include <sys/socket.h>
87
88 #else
89
90 #endif
91 #include <stdio.h>
92 #include <string.h>
93
94 #include <sys/types.h>
95 #include <sys/stat.h>
96 #include <dirent.h>
97 #include <fcntl.h>
98 #include <signal.h>
99 #include <sys/un.h>
100 #include <netdb.h>
101 #include <poll.h>
102 #include <syslog.h>
103 #include <pthread.h>
104 #include <errno.h>
105 #include <assert.h>
106 #include <sys/time.h>
107 #include <time.h>
108 #include <libgen.h>
109
110 #include <glib-2.0/glib.h>
111 /* }}} */
112
113 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
114
115 #ifndef __GNUC__
116 # define __attribute__(x) /**/
117 #endif
118
119 /*
120  * Types
121  */
122 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
123
124 struct listen_socket_s
125 {
126   int fd;
127   char addr[PATH_MAX + 1];
128   int family;
129
130   /* state for BATCH processing */
131   time_t batch_start;
132   int batch_cmd;
133
134   /* buffered IO */
135   char *rbuf;
136   off_t next_cmd;
137   off_t next_read;
138
139   char *wbuf;
140   ssize_t wbuf_len;
141
142   uint32_t permissions;
143 };
144 typedef struct listen_socket_s listen_socket_t;
145
146 struct command_s;
147 typedef struct command_s command_t;
148 /* note: guard against "unused" warnings in the handlers */
149 #define DISPATCH_PROTO  listen_socket_t *sock   __attribute__((unused)),\
150                         time_t now              __attribute__((unused)),\
151                         char  *buffer           __attribute__((unused)),\
152                         size_t buffer_size      __attribute__((unused))
153
154 #define HANDLER_PROTO   command_t *cmd          __attribute__((unused)),\
155                         DISPATCH_PROTO
156
157 struct command_s {
158   char   *cmd;
159   int (*handler)(HANDLER_PROTO);
160
161   char  context;                /* where we expect to see it */
162 #define CMD_CONTEXT_CLIENT      (1<<0)
163 #define CMD_CONTEXT_BATCH       (1<<1)
164 #define CMD_CONTEXT_JOURNAL     (1<<2)
165 #define CMD_CONTEXT_ANY         (0x7f)
166
167   char *syntax;
168   char *help;
169 };
170
171 struct cache_item_s;
172 typedef struct cache_item_s cache_item_t;
173 struct cache_item_s
174 {
175   char *file;
176   char **values;
177   size_t values_num;
178   time_t last_flush_time;
179   time_t last_update_stamp;
180 #define CI_FLAGS_IN_TREE  (1<<0)
181 #define CI_FLAGS_IN_QUEUE (1<<1)
182   int flags;
183   pthread_cond_t  flushed;
184   cache_item_t *prev;
185   cache_item_t *next;
186 };
187
188 struct callback_flush_data_s
189 {
190   time_t now;
191   time_t abs_timeout;
192   char **keys;
193   size_t keys_num;
194 };
195 typedef struct callback_flush_data_s callback_flush_data_t;
196
197 enum queue_side_e
198 {
199   HEAD,
200   TAIL
201 };
202 typedef enum queue_side_e queue_side_t;
203
204 /* describe a set of journal files */
205 typedef struct {
206   char **files;
207   size_t files_num;
208 } journal_set;
209
210 /* max length of socket command or response */
211 #define CMD_MAX 4096
212 #define RBUF_SIZE (CMD_MAX*2)
213
214 /*
215  * Variables
216  */
217 static int stay_foreground = 0;
218 static uid_t daemon_uid;
219
220 static listen_socket_t *listen_fds = NULL;
221 static size_t listen_fds_num = 0;
222
223 enum {
224   RUNNING,              /* normal operation */
225   FLUSHING,             /* flushing remaining values */
226   SHUTDOWN              /* shutting down */
227 } state = RUNNING;
228
229 static pthread_t *queue_threads;
230 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
231 static int config_queue_threads = 4;
232
233 static pthread_t flush_thread;
234 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
235
236 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
237 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
238 static int connection_threads_num = 0;
239
240 /* Cache stuff */
241 static GTree          *cache_tree = NULL;
242 static cache_item_t   *cache_queue_head = NULL;
243 static cache_item_t   *cache_queue_tail = NULL;
244 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
245
246 static int config_write_interval = 300;
247 static int config_write_jitter   = 0;
248 static int config_flush_interval = 3600;
249 static int config_flush_at_shutdown = 0;
250 static char *config_pid_file = NULL;
251 static char *config_base_dir = NULL;
252 static size_t _config_base_dir_len = 0;
253 static int config_write_base_only = 0;
254
255 static listen_socket_t **config_listen_address_list = NULL;
256 static size_t config_listen_address_list_len = 0;
257
258 static uint64_t stats_queue_length = 0;
259 static uint64_t stats_updates_received = 0;
260 static uint64_t stats_flush_received = 0;
261 static uint64_t stats_updates_written = 0;
262 static uint64_t stats_data_sets_written = 0;
263 static uint64_t stats_journal_bytes = 0;
264 static uint64_t stats_journal_rotate = 0;
265 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
266
267 /* Journaled updates */
268 #define JOURNAL_BASE "rrd.journal"
269 static journal_set *journal_cur = NULL;
270 static journal_set *journal_old = NULL;
271 static char *journal_dir = NULL;
272 static FILE *journal_fh = NULL;         /* current journal file handle */
273 static long  journal_size = 0;          /* current journal size */
274 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
275 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
276 static int journal_write(char *cmd, char *args);
277 static void journal_done(void);
278 static void journal_rotate(void);
279
280 /* prototypes for forward refernces */
281 static int handle_request_help (HANDLER_PROTO);
282
283 /* 
284  * Functions
285  */
286 static void sig_common (const char *sig) /* {{{ */
287 {
288   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
289   state = FLUSHING;
290   pthread_cond_broadcast(&flush_cond);
291   pthread_cond_broadcast(&queue_cond);
292 } /* }}} void sig_common */
293
294 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
295 {
296   sig_common("INT");
297 } /* }}} void sig_int_handler */
298
299 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
300 {
301   sig_common("TERM");
302 } /* }}} void sig_term_handler */
303
304 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
305 {
306   config_flush_at_shutdown = 1;
307   sig_common("USR1");
308 } /* }}} void sig_usr1_handler */
309
310 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
311 {
312   config_flush_at_shutdown = 0;
313   sig_common("USR2");
314 } /* }}} void sig_usr2_handler */
315
316 static void install_signal_handlers(void) /* {{{ */
317 {
318   /* These structures are static, because `sigaction' behaves weird if the are
319    * overwritten.. */
320   static struct sigaction sa_int;
321   static struct sigaction sa_term;
322   static struct sigaction sa_pipe;
323   static struct sigaction sa_usr1;
324   static struct sigaction sa_usr2;
325
326   /* Install signal handlers */
327   memset (&sa_int, 0, sizeof (sa_int));
328   sa_int.sa_handler = sig_int_handler;
329   sigaction (SIGINT, &sa_int, NULL);
330
331   memset (&sa_term, 0, sizeof (sa_term));
332   sa_term.sa_handler = sig_term_handler;
333   sigaction (SIGTERM, &sa_term, NULL);
334
335   memset (&sa_pipe, 0, sizeof (sa_pipe));
336   sa_pipe.sa_handler = SIG_IGN;
337   sigaction (SIGPIPE, &sa_pipe, NULL);
338
339   memset (&sa_pipe, 0, sizeof (sa_usr1));
340   sa_usr1.sa_handler = sig_usr1_handler;
341   sigaction (SIGUSR1, &sa_usr1, NULL);
342
343   memset (&sa_usr2, 0, sizeof (sa_usr2));
344   sa_usr2.sa_handler = sig_usr2_handler;
345   sigaction (SIGUSR2, &sa_usr2, NULL);
346
347 } /* }}} void install_signal_handlers */
348
349 static int open_pidfile(char *action, int oflag) /* {{{ */
350 {
351   int fd;
352   const char *file;
353   char *file_copy, *dir;
354
355   file = (config_pid_file != NULL)
356     ? config_pid_file
357     : LOCALSTATEDIR "/run/rrdcached.pid";
358
359   /* dirname may modify its argument */
360   file_copy = strdup(file);
361   if (file_copy == NULL)
362   {
363     fprintf(stderr, "rrdcached: strdup(): %s\n",
364         rrd_strerror(errno));
365     return -1;
366   }
367
368   dir = dirname(file_copy);
369   if (rrd_mkdir_p(dir, 0777) != 0)
370   {
371     fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
372         dir, rrd_strerror(errno));
373     return -1;
374   }
375
376   free(file_copy);
377
378   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
379   if (fd < 0)
380     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
381             action, file, rrd_strerror(errno));
382
383   return(fd);
384 } /* }}} static int open_pidfile */
385
386 /* check existing pid file to see whether a daemon is running */
387 static int check_pidfile(void)
388 {
389   int pid_fd;
390   pid_t pid;
391   char pid_str[16];
392
393   pid_fd = open_pidfile("open", O_RDWR);
394   if (pid_fd < 0)
395     return pid_fd;
396
397   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
398     return -1;
399
400   pid = atoi(pid_str);
401   if (pid <= 0)
402     return -1;
403
404   /* another running process that we can signal COULD be
405    * a competing rrdcached */
406   if (pid != getpid() && kill(pid, 0) == 0)
407   {
408     fprintf(stderr,
409             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
410     close(pid_fd);
411     return -1;
412   }
413
414   lseek(pid_fd, 0, SEEK_SET);
415   if (ftruncate(pid_fd, 0) == -1)
416   {
417     fprintf(stderr,
418             "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
419     close(pid_fd);
420     return -1;
421   }
422
423   fprintf(stderr,
424           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
425           "rrdcached: starting normally.\n", pid);
426
427   return pid_fd;
428 } /* }}} static int check_pidfile */
429
430 static int write_pidfile (int fd) /* {{{ */
431 {
432   pid_t pid;
433   FILE *fh;
434
435   pid = getpid ();
436
437   fh = fdopen (fd, "w");
438   if (fh == NULL)
439   {
440     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
441     close(fd);
442     return (-1);
443   }
444
445   fprintf (fh, "%i\n", (int) pid);
446   fclose (fh);
447
448   return (0);
449 } /* }}} int write_pidfile */
450
451 static int remove_pidfile (void) /* {{{ */
452 {
453   char *file;
454   int status;
455
456   file = (config_pid_file != NULL)
457     ? config_pid_file
458     : LOCALSTATEDIR "/run/rrdcached.pid";
459
460   status = unlink (file);
461   if (status == 0)
462     return (0);
463   return (errno);
464 } /* }}} int remove_pidfile */
465
466 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
467 {
468   char *eol;
469
470   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
471                sock->next_read - sock->next_cmd);
472
473   if (eol == NULL)
474   {
475     /* no commands left, move remainder back to front of rbuf */
476     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
477             sock->next_read - sock->next_cmd);
478     sock->next_read -= sock->next_cmd;
479     sock->next_cmd = 0;
480     *len = 0;
481     return NULL;
482   }
483   else
484   {
485     char *cmd = sock->rbuf + sock->next_cmd;
486     *eol = '\0';
487
488     sock->next_cmd = eol - sock->rbuf + 1;
489
490     if (eol > sock->rbuf && *(eol-1) == '\r')
491       *(--eol) = '\0'; /* handle "\r\n" EOL */
492
493     *len = eol - cmd;
494
495     return cmd;
496   }
497
498   /* NOTREACHED */
499   assert(1==0);
500 } /* }}} char *next_cmd */
501
502 /* add the characters directly to the write buffer */
503 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
504 {
505   char *new_buf;
506
507   assert(sock != NULL);
508
509   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
510   if (new_buf == NULL)
511   {
512     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
513     return -1;
514   }
515
516   strncpy(new_buf + sock->wbuf_len, str, len + 1);
517
518   sock->wbuf = new_buf;
519   sock->wbuf_len += len;
520
521   return 0;
522 } /* }}} static int add_to_wbuf */
523
524 /* add the text to the "extra" info that's sent after the status line */
525 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
526 {
527   va_list argp;
528   char buffer[CMD_MAX];
529   int len;
530
531   if (sock == NULL) return 0; /* journal replay mode */
532   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
533
534   va_start(argp, fmt);
535 #ifdef HAVE_VSNPRINTF
536   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
537 #else
538   len = vsprintf(buffer, fmt, argp);
539 #endif
540   va_end(argp);
541   if (len < 0)
542   {
543     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
544     return -1;
545   }
546
547   return add_to_wbuf(sock, buffer, len);
548 } /* }}} static int add_response_info */
549
550 static int count_lines(char *str) /* {{{ */
551 {
552   int lines = 0;
553
554   if (str != NULL)
555   {
556     while ((str = strchr(str, '\n')) != NULL)
557     {
558       ++lines;
559       ++str;
560     }
561   }
562
563   return lines;
564 } /* }}} static int count_lines */
565
566 /* send the response back to the user.
567  * returns 0 on success, -1 on error
568  * write buffer is always zeroed after this call */
569 static int send_response (listen_socket_t *sock, response_code rc,
570                           char *fmt, ...) /* {{{ */
571 {
572   va_list argp;
573   char buffer[CMD_MAX];
574   int lines;
575   ssize_t wrote;
576   int rclen, len;
577
578   if (sock == NULL) return rc;  /* journal replay mode */
579
580   if (sock->batch_start)
581   {
582     if (rc == RESP_OK)
583       return rc; /* no response on success during BATCH */
584     lines = sock->batch_cmd;
585   }
586   else if (rc == RESP_OK)
587     lines = count_lines(sock->wbuf);
588   else
589     lines = -1;
590
591   rclen = sprintf(buffer, "%d ", lines);
592   va_start(argp, fmt);
593 #ifdef HAVE_VSNPRINTF
594   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
595 #else
596   len = vsprintf(buffer+rclen, fmt, argp);
597 #endif
598   va_end(argp);
599   if (len < 0)
600     return -1;
601
602   len += rclen;
603
604   /* append the result to the wbuf, don't write to the user */
605   if (sock->batch_start)
606     return add_to_wbuf(sock, buffer, len);
607
608   /* first write must be complete */
609   if (len != write(sock->fd, buffer, len))
610   {
611     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
612     return -1;
613   }
614
615   if (sock->wbuf != NULL && rc == RESP_OK)
616   {
617     wrote = 0;
618     while (wrote < sock->wbuf_len)
619     {
620       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
621       if (wb <= 0)
622       {
623         RRDD_LOG(LOG_INFO, "send_response: could not write results");
624         return -1;
625       }
626       wrote += wb;
627     }
628   }
629
630   free(sock->wbuf); sock->wbuf = NULL;
631   sock->wbuf_len = 0;
632
633   return 0;
634 } /* }}} */
635
636 static void wipe_ci_values(cache_item_t *ci, time_t when)
637 {
638   ci->values = NULL;
639   ci->values_num = 0;
640
641   ci->last_flush_time = when;
642   if (config_write_jitter > 0)
643     ci->last_flush_time += (rrd_random() % config_write_jitter);
644 }
645
646 /* remove_from_queue
647  * remove a "cache_item_t" item from the queue.
648  * must hold 'cache_lock' when calling this
649  */
650 static void remove_from_queue(cache_item_t *ci) /* {{{ */
651 {
652   if (ci == NULL) return;
653   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
654
655   if (ci->prev == NULL)
656     cache_queue_head = ci->next; /* reset head */
657   else
658     ci->prev->next = ci->next;
659
660   if (ci->next == NULL)
661     cache_queue_tail = ci->prev; /* reset the tail */
662   else
663     ci->next->prev = ci->prev;
664
665   ci->next = ci->prev = NULL;
666   ci->flags &= ~CI_FLAGS_IN_QUEUE;
667
668   pthread_mutex_lock (&stats_lock);
669   assert (stats_queue_length > 0);
670   stats_queue_length--;
671   pthread_mutex_unlock (&stats_lock);
672
673 } /* }}} static void remove_from_queue */
674
675 /* free the resources associated with the cache_item_t
676  * must hold cache_lock when calling this function
677  */
678 static void *free_cache_item(cache_item_t *ci) /* {{{ */
679 {
680   if (ci == NULL) return NULL;
681
682   remove_from_queue(ci);
683
684   for (size_t i=0; i < ci->values_num; i++)
685     free(ci->values[i]);
686
687   free (ci->values);
688   free (ci->file);
689
690   /* in case anyone is waiting */
691   pthread_cond_broadcast(&ci->flushed);
692   pthread_cond_destroy(&ci->flushed);
693
694   free (ci);
695
696   return NULL;
697 } /* }}} static void *free_cache_item */
698
699 /*
700  * enqueue_cache_item:
701  * `cache_lock' must be acquired before calling this function!
702  */
703 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
704     queue_side_t side)
705 {
706   if (ci == NULL)
707     return (-1);
708
709   if (ci->values_num == 0)
710     return (0);
711
712   if (side == HEAD)
713   {
714     if (cache_queue_head == ci)
715       return 0;
716
717     /* remove if further down in queue */
718     remove_from_queue(ci);
719
720     ci->prev = NULL;
721     ci->next = cache_queue_head;
722     if (ci->next != NULL)
723       ci->next->prev = ci;
724     cache_queue_head = ci;
725
726     if (cache_queue_tail == NULL)
727       cache_queue_tail = cache_queue_head;
728   }
729   else /* (side == TAIL) */
730   {
731     /* We don't move values back in the list.. */
732     if (ci->flags & CI_FLAGS_IN_QUEUE)
733       return (0);
734
735     assert (ci->next == NULL);
736     assert (ci->prev == NULL);
737
738     ci->prev = cache_queue_tail;
739
740     if (cache_queue_tail == NULL)
741       cache_queue_head = ci;
742     else
743       cache_queue_tail->next = ci;
744
745     cache_queue_tail = ci;
746   }
747
748   ci->flags |= CI_FLAGS_IN_QUEUE;
749
750   pthread_cond_signal(&queue_cond);
751   pthread_mutex_lock (&stats_lock);
752   stats_queue_length++;
753   pthread_mutex_unlock (&stats_lock);
754
755   return (0);
756 } /* }}} int enqueue_cache_item */
757
758 /*
759  * tree_callback_flush:
760  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
761  * while this is in progress.
762  */
763 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
764     gpointer data)
765 {
766   cache_item_t *ci;
767   callback_flush_data_t *cfd;
768
769   ci = (cache_item_t *) value;
770   cfd = (callback_flush_data_t *) data;
771
772   if (ci->flags & CI_FLAGS_IN_QUEUE)
773     return FALSE;
774
775   if (ci->values_num > 0
776       && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
777   {
778     enqueue_cache_item (ci, TAIL);
779   }
780   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
781       && (ci->values_num <= 0))
782   {
783     assert ((char *) key == ci->file);
784     if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
785     {
786       RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
787       return (FALSE);
788     }
789   }
790
791   return (FALSE);
792 } /* }}} gboolean tree_callback_flush */
793
794 static int flush_old_values (int max_age)
795 {
796   callback_flush_data_t cfd;
797   size_t k;
798
799   memset (&cfd, 0, sizeof (cfd));
800   /* Pass the current time as user data so that we don't need to call
801    * `time' for each node. */
802   cfd.now = time (NULL);
803   cfd.keys = NULL;
804   cfd.keys_num = 0;
805
806   if (max_age > 0)
807     cfd.abs_timeout = cfd.now - max_age;
808   else
809     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
810
811   /* `tree_callback_flush' will return the keys of all values that haven't
812    * been touched in the last `config_flush_interval' seconds in `cfd'.
813    * The char*'s in this array point to the same memory as ci->file, so we
814    * don't need to free them separately. */
815   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
816
817   for (k = 0; k < cfd.keys_num; k++)
818   {
819     /* should never fail, since we have held the cache_lock
820      * the entire time */
821     assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
822   }
823
824   if (cfd.keys != NULL)
825   {
826     free (cfd.keys);
827     cfd.keys = NULL;
828   }
829
830   return (0);
831 } /* int flush_old_values */
832
833 static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
834 {
835   struct timeval now;
836   struct timespec next_flush;
837   int status;
838
839   gettimeofday (&now, NULL);
840   next_flush.tv_sec = now.tv_sec + config_flush_interval;
841   next_flush.tv_nsec = 1000 * now.tv_usec;
842
843   pthread_mutex_lock(&cache_lock);
844
845   while (state == RUNNING)
846   {
847     gettimeofday (&now, NULL);
848     if ((now.tv_sec > next_flush.tv_sec)
849         || ((now.tv_sec == next_flush.tv_sec)
850           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
851     {
852       RRDD_LOG(LOG_DEBUG, "flushing old values");
853
854       /* Determine the time of the next cache flush. */
855       next_flush.tv_sec = now.tv_sec + config_flush_interval;
856
857       /* Flush all values that haven't been written in the last
858        * `config_write_interval' seconds. */
859       flush_old_values (config_write_interval);
860
861       /* unlock the cache while we rotate so we don't block incoming
862        * updates if the fsync() blocks on disk I/O */
863       pthread_mutex_unlock(&cache_lock);
864       journal_rotate();
865       pthread_mutex_lock(&cache_lock);
866     }
867
868     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
869     if (status != 0 && status != ETIMEDOUT)
870     {
871       RRDD_LOG (LOG_ERR, "flush_thread_main: "
872                 "pthread_cond_timedwait returned %i.", status);
873     }
874   }
875
876   if (config_flush_at_shutdown)
877     flush_old_values (-1); /* flush everything */
878
879   state = SHUTDOWN;
880
881   pthread_mutex_unlock(&cache_lock);
882
883   return NULL;
884 } /* void *flush_thread_main */
885
886 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
887 {
888   pthread_mutex_lock (&cache_lock);
889
890   while (state != SHUTDOWN
891          || (cache_queue_head != NULL && config_flush_at_shutdown))
892   {
893     cache_item_t *ci;
894     char *file;
895     char **values;
896     size_t values_num;
897     int status;
898
899     /* Now, check if there's something to store away. If not, wait until
900      * something comes in. */
901     if (cache_queue_head == NULL)
902     {
903       status = pthread_cond_wait (&queue_cond, &cache_lock);
904       if ((status != 0) && (status != ETIMEDOUT))
905       {
906         RRDD_LOG (LOG_ERR, "queue_thread_main: "
907             "pthread_cond_wait returned %i.", status);
908       }
909     }
910
911     /* Check if a value has arrived. This may be NULL if we timed out or there
912      * was an interrupt such as a signal. */
913     if (cache_queue_head == NULL)
914       continue;
915
916     ci = cache_queue_head;
917
918     /* copy the relevant parts */
919     file = strdup (ci->file);
920     if (file == NULL)
921     {
922       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
923       continue;
924     }
925
926     assert(ci->values != NULL);
927     assert(ci->values_num > 0);
928
929     values = ci->values;
930     values_num = ci->values_num;
931
932     wipe_ci_values(ci, time(NULL));
933     remove_from_queue(ci);
934
935     pthread_mutex_unlock (&cache_lock);
936
937     rrd_clear_error ();
938     status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
939     if (status != 0)
940     {
941       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
942           "rrd_update_r (%s) failed with status %i. (%s)",
943           file, status, rrd_get_error());
944     }
945
946     journal_write("wrote", file);
947
948     /* Search again in the tree.  It's possible someone issued a "FORGET"
949      * while we were writing the update values. */
950     pthread_mutex_lock(&cache_lock);
951     ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
952     if (ci)
953       pthread_cond_broadcast(&ci->flushed);
954     pthread_mutex_unlock(&cache_lock);
955
956     if (status == 0)
957     {
958       pthread_mutex_lock (&stats_lock);
959       stats_updates_written++;
960       stats_data_sets_written += values_num;
961       pthread_mutex_unlock (&stats_lock);
962     }
963
964     rrd_free_ptrs((void ***) &values, &values_num);
965     free(file);
966
967     pthread_mutex_lock (&cache_lock);
968   }
969   pthread_mutex_unlock (&cache_lock);
970
971   return (NULL);
972 } /* }}} void *queue_thread_main */
973
974 static int buffer_get_field (char **buffer_ret, /* {{{ */
975     size_t *buffer_size_ret, char **field_ret)
976 {
977   char *buffer;
978   size_t buffer_pos;
979   size_t buffer_size;
980   char *field;
981   size_t field_size;
982   int status;
983
984   buffer = *buffer_ret;
985   buffer_pos = 0;
986   buffer_size = *buffer_size_ret;
987   field = *buffer_ret;
988   field_size = 0;
989
990   if (buffer_size <= 0)
991     return (-1);
992
993   /* This is ensured by `handle_request'. */
994   assert (buffer[buffer_size - 1] == '\0');
995
996   status = -1;
997   while (buffer_pos < buffer_size)
998   {
999     /* Check for end-of-field or end-of-buffer */
1000     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1001     {
1002       field[field_size] = 0;
1003       field_size++;
1004       buffer_pos++;
1005       status = 0;
1006       break;
1007     }
1008     /* Handle escaped characters. */
1009     else if (buffer[buffer_pos] == '\\')
1010     {
1011       if (buffer_pos >= (buffer_size - 1))
1012         break;
1013       buffer_pos++;
1014       field[field_size] = buffer[buffer_pos];
1015       field_size++;
1016       buffer_pos++;
1017     }
1018     /* Normal operation */ 
1019     else
1020     {
1021       field[field_size] = buffer[buffer_pos];
1022       field_size++;
1023       buffer_pos++;
1024     }
1025   } /* while (buffer_pos < buffer_size) */
1026
1027   if (status != 0)
1028     return (status);
1029
1030   *buffer_ret = buffer + buffer_pos;
1031   *buffer_size_ret = buffer_size - buffer_pos;
1032   *field_ret = field;
1033
1034   return (0);
1035 } /* }}} int buffer_get_field */
1036
1037 /* if we're restricting writes to the base directory,
1038  * check whether the file falls within the dir
1039  * returns 1 if OK, otherwise 0
1040  */
1041 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1042 {
1043   assert(file != NULL);
1044
1045   if (!config_write_base_only
1046       || sock == NULL /* journal replay */
1047       || config_base_dir == NULL)
1048     return 1;
1049
1050   if (strstr(file, "../") != NULL) goto err;
1051
1052   /* relative paths without "../" are ok */
1053   if (*file != '/') return 1;
1054
1055   /* file must be of the format base + "/" + <1+ char filename> */
1056   if (strlen(file) < _config_base_dir_len + 2) goto err;
1057   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1058   if (*(file + _config_base_dir_len) != '/') goto err;
1059
1060   return 1;
1061
1062 err:
1063   if (sock != NULL && sock->fd >= 0)
1064     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1065
1066   return 0;
1067 } /* }}} static int check_file_access */
1068
1069 /* when using a base dir, convert relative paths to absolute paths.
1070  * if necessary, modifies the "filename" pointer to point
1071  * to the new path created in "tmp".  "tmp" is provided
1072  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1073  *
1074  * this allows us to optimize for the expected case (absolute path)
1075  * with a no-op.
1076  */
1077 static void get_abs_path(char **filename, char *tmp)
1078 {
1079   assert(tmp != NULL);
1080   assert(filename != NULL && *filename != NULL);
1081
1082   if (config_base_dir == NULL || **filename == '/')
1083     return;
1084
1085   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1086   *filename = tmp;
1087 } /* }}} static int get_abs_path */
1088
1089 static int flush_file (const char *filename) /* {{{ */
1090 {
1091   cache_item_t *ci;
1092
1093   pthread_mutex_lock (&cache_lock);
1094
1095   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1096   if (ci == NULL)
1097   {
1098     pthread_mutex_unlock (&cache_lock);
1099     return (ENOENT);
1100   }
1101
1102   if (ci->values_num > 0)
1103   {
1104     /* Enqueue at head */
1105     enqueue_cache_item (ci, HEAD);
1106     pthread_cond_wait(&ci->flushed, &cache_lock);
1107   }
1108
1109   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1110    * may have been purged during our cond_wait() */
1111
1112   pthread_mutex_unlock(&cache_lock);
1113
1114   return (0);
1115 } /* }}} int flush_file */
1116
1117 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1118 {
1119   char *err = "Syntax error.\n";
1120
1121   if (cmd && cmd->syntax)
1122     err = cmd->syntax;
1123
1124   return send_response(sock, RESP_ERR, "Usage: %s", err);
1125 } /* }}} static int syntax_error() */
1126
1127 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1128 {
1129   uint64_t copy_queue_length;
1130   uint64_t copy_updates_received;
1131   uint64_t copy_flush_received;
1132   uint64_t copy_updates_written;
1133   uint64_t copy_data_sets_written;
1134   uint64_t copy_journal_bytes;
1135   uint64_t copy_journal_rotate;
1136
1137   uint64_t tree_nodes_number;
1138   uint64_t tree_depth;
1139
1140   pthread_mutex_lock (&stats_lock);
1141   copy_queue_length       = stats_queue_length;
1142   copy_updates_received   = stats_updates_received;
1143   copy_flush_received     = stats_flush_received;
1144   copy_updates_written    = stats_updates_written;
1145   copy_data_sets_written  = stats_data_sets_written;
1146   copy_journal_bytes      = stats_journal_bytes;
1147   copy_journal_rotate     = stats_journal_rotate;
1148   pthread_mutex_unlock (&stats_lock);
1149
1150   pthread_mutex_lock (&cache_lock);
1151   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1152   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1153   pthread_mutex_unlock (&cache_lock);
1154
1155   add_response_info(sock,
1156                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1157   add_response_info(sock,
1158                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1159   add_response_info(sock,
1160                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1161   add_response_info(sock,
1162                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1163   add_response_info(sock,
1164                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1165   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1166   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1167   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1168   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1169
1170   send_response(sock, RESP_OK, "Statistics follow\n");
1171
1172   return (0);
1173 } /* }}} int handle_request_stats */
1174
1175 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1176 {
1177   char *file, file_tmp[PATH_MAX];
1178   int status;
1179
1180   status = buffer_get_field (&buffer, &buffer_size, &file);
1181   if (status != 0)
1182   {
1183     return syntax_error(sock,cmd);
1184   }
1185   else
1186   {
1187     pthread_mutex_lock(&stats_lock);
1188     stats_flush_received++;
1189     pthread_mutex_unlock(&stats_lock);
1190
1191     get_abs_path(&file, file_tmp);
1192     if (!check_file_access(file, sock)) return 0;
1193
1194     status = flush_file (file);
1195     if (status == 0)
1196       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1197     else if (status == ENOENT)
1198     {
1199       /* no file in our tree; see whether it exists at all */
1200       struct stat statbuf;
1201
1202       memset(&statbuf, 0, sizeof(statbuf));
1203       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1204         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1205       else
1206         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1207     }
1208     else if (status < 0)
1209       return send_response(sock, RESP_ERR, "Internal error.\n");
1210     else
1211       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1212   }
1213
1214   /* NOTREACHED */
1215   assert(1==0);
1216 } /* }}} int handle_request_flush */
1217
1218 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1219 {
1220   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1221
1222   pthread_mutex_lock(&cache_lock);
1223   flush_old_values(-1);
1224   pthread_mutex_unlock(&cache_lock);
1225
1226   return send_response(sock, RESP_OK, "Started flush.\n");
1227 } /* }}} static int handle_request_flushall */
1228
1229 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1230 {
1231   int status;
1232   char *file, file_tmp[PATH_MAX];
1233   cache_item_t *ci;
1234
1235   status = buffer_get_field(&buffer, &buffer_size, &file);
1236   if (status != 0)
1237     return syntax_error(sock,cmd);
1238
1239   get_abs_path(&file, file_tmp);
1240
1241   pthread_mutex_lock(&cache_lock);
1242   ci = g_tree_lookup(cache_tree, file);
1243   if (ci == NULL)
1244   {
1245     pthread_mutex_unlock(&cache_lock);
1246     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1247   }
1248
1249   for (size_t i=0; i < ci->values_num; i++)
1250     add_response_info(sock, "%s\n", ci->values[i]);
1251
1252   pthread_mutex_unlock(&cache_lock);
1253   return send_response(sock, RESP_OK, "updates pending\n");
1254 } /* }}} static int handle_request_pending */
1255
1256 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1257 {
1258   int status;
1259   gboolean found;
1260   char *file, file_tmp[PATH_MAX];
1261
1262   status = buffer_get_field(&buffer, &buffer_size, &file);
1263   if (status != 0)
1264     return syntax_error(sock,cmd);
1265
1266   get_abs_path(&file, file_tmp);
1267   if (!check_file_access(file, sock)) return 0;
1268
1269   pthread_mutex_lock(&cache_lock);
1270   found = g_tree_remove(cache_tree, file);
1271   pthread_mutex_unlock(&cache_lock);
1272
1273   if (found == TRUE)
1274   {
1275     if (sock != NULL)
1276       journal_write("forget", file);
1277
1278     return send_response(sock, RESP_OK, "Gone!\n");
1279   }
1280   else
1281     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1282
1283   /* NOTREACHED */
1284   assert(1==0);
1285 } /* }}} static int handle_request_forget */
1286
1287 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1288 {
1289   cache_item_t *ci;
1290
1291   pthread_mutex_lock(&cache_lock);
1292
1293   ci = cache_queue_head;
1294   while (ci != NULL)
1295   {
1296     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1297     ci = ci->next;
1298   }
1299
1300   pthread_mutex_unlock(&cache_lock);
1301
1302   return send_response(sock, RESP_OK, "in queue.\n");
1303 } /* }}} int handle_request_queue */
1304
1305 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1306 {
1307   char *file, file_tmp[PATH_MAX];
1308   int values_num = 0;
1309   int status;
1310   char orig_buf[CMD_MAX];
1311
1312   cache_item_t *ci;
1313
1314   /* save it for the journal later */
1315   strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1316
1317   status = buffer_get_field (&buffer, &buffer_size, &file);
1318   if (status != 0)
1319     return syntax_error(sock,cmd);
1320
1321   pthread_mutex_lock(&stats_lock);
1322   stats_updates_received++;
1323   pthread_mutex_unlock(&stats_lock);
1324
1325   get_abs_path(&file, file_tmp);
1326   if (!check_file_access(file, sock)) return 0;
1327
1328   pthread_mutex_lock (&cache_lock);
1329   ci = g_tree_lookup (cache_tree, file);
1330
1331   if (ci == NULL) /* {{{ */
1332   {
1333     struct stat statbuf;
1334     cache_item_t *tmp;
1335
1336     /* don't hold the lock while we setup; stat(2) might block */
1337     pthread_mutex_unlock(&cache_lock);
1338
1339     memset (&statbuf, 0, sizeof (statbuf));
1340     status = stat (file, &statbuf);
1341     if (status != 0)
1342     {
1343       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1344
1345       status = errno;
1346       if (status == ENOENT)
1347         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1348       else
1349         return send_response(sock, RESP_ERR,
1350                              "stat failed with error %i.\n", status);
1351     }
1352     if (!S_ISREG (statbuf.st_mode))
1353       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1354
1355     if (access(file, R_OK|W_OK) != 0)
1356       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1357                            file, rrd_strerror(errno));
1358
1359     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1360     if (ci == NULL)
1361     {
1362       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1363
1364       return send_response(sock, RESP_ERR, "malloc failed.\n");
1365     }
1366     memset (ci, 0, sizeof (cache_item_t));
1367
1368     ci->file = strdup (file);
1369     if (ci->file == NULL)
1370     {
1371       free (ci);
1372       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1373
1374       return send_response(sock, RESP_ERR, "strdup failed.\n");
1375     }
1376
1377     wipe_ci_values(ci, now);
1378     ci->flags = CI_FLAGS_IN_TREE;
1379     pthread_cond_init(&ci->flushed, NULL);
1380
1381     pthread_mutex_lock(&cache_lock);
1382
1383     /* another UPDATE might have added this entry in the meantime */
1384     tmp = g_tree_lookup (cache_tree, file);
1385     if (tmp == NULL)
1386       g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1387     else
1388     {
1389       free_cache_item (ci);
1390       ci = tmp;
1391     }
1392
1393     /* state may have changed while we were unlocked */
1394     if (state == SHUTDOWN)
1395       return -1;
1396   } /* }}} */
1397   assert (ci != NULL);
1398
1399   /* don't re-write updates in replay mode */
1400   if (sock != NULL)
1401     journal_write("update", orig_buf);
1402
1403   while (buffer_size > 0)
1404   {
1405     char *value;
1406     time_t stamp;
1407     char *eostamp;
1408
1409     status = buffer_get_field (&buffer, &buffer_size, &value);
1410     if (status != 0)
1411     {
1412       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1413       break;
1414     }
1415
1416     /* make sure update time is always moving forward */
1417     stamp = strtol(value, &eostamp, 10);
1418     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1419     {
1420       pthread_mutex_unlock(&cache_lock);
1421       return send_response(sock, RESP_ERR,
1422                            "Cannot find timestamp in '%s'!\n", value);
1423     }
1424     else if (stamp <= ci->last_update_stamp)
1425     {
1426       pthread_mutex_unlock(&cache_lock);
1427       return send_response(sock, RESP_ERR,
1428                            "illegal attempt to update using time %ld when last"
1429                            " update time is %ld (minimum one second step)\n",
1430                            stamp, ci->last_update_stamp);
1431     }
1432     else
1433       ci->last_update_stamp = stamp;
1434
1435     if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
1436     {
1437       RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1438       continue;
1439     }
1440
1441     values_num++;
1442   }
1443
1444   if (((now - ci->last_flush_time) >= config_write_interval)
1445       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1446       && (ci->values_num > 0))
1447   {
1448     enqueue_cache_item (ci, TAIL);
1449   }
1450
1451   pthread_mutex_unlock (&cache_lock);
1452
1453   if (values_num < 1)
1454     return send_response(sock, RESP_ERR, "No values updated.\n");
1455   else
1456     return send_response(sock, RESP_OK,
1457                          "errors, enqueued %i value(s).\n", values_num);
1458
1459   /* NOTREACHED */
1460   assert(1==0);
1461
1462 } /* }}} int handle_request_update */
1463
1464 /* we came across a "WROTE" entry during journal replay.
1465  * throw away any values that we have accumulated for this file
1466  */
1467 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1468 {
1469   cache_item_t *ci;
1470   const char *file = buffer;
1471
1472   pthread_mutex_lock(&cache_lock);
1473
1474   ci = g_tree_lookup(cache_tree, file);
1475   if (ci == NULL)
1476   {
1477     pthread_mutex_unlock(&cache_lock);
1478     return (0);
1479   }
1480
1481   if (ci->values)
1482     rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1483
1484   wipe_ci_values(ci, now);
1485   remove_from_queue(ci);
1486
1487   pthread_mutex_unlock(&cache_lock);
1488   return (0);
1489 } /* }}} int handle_request_wrote */
1490
1491 /* start "BATCH" processing */
1492 static int batch_start (HANDLER_PROTO) /* {{{ */
1493 {
1494   int status;
1495   if (sock->batch_start)
1496     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1497
1498   status = send_response(sock, RESP_OK,
1499                          "Go ahead.  End with dot '.' on its own line.\n");
1500   sock->batch_start = time(NULL);
1501   sock->batch_cmd = 0;
1502
1503   return status;
1504 } /* }}} static int batch_start */
1505
1506 /* finish "BATCH" processing and return results to the client */
1507 static int batch_done (HANDLER_PROTO) /* {{{ */
1508 {
1509   assert(sock->batch_start);
1510   sock->batch_start = 0;
1511   sock->batch_cmd  = 0;
1512   return send_response(sock, RESP_OK, "errors\n");
1513 } /* }}} static int batch_done */
1514
1515 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1516 {
1517   return -1;
1518 } /* }}} static int handle_request_quit */
1519
1520 static command_t list_of_commands[] = { /* {{{ */
1521   {
1522     "UPDATE",
1523     handle_request_update,
1524     CMD_CONTEXT_ANY,
1525     "UPDATE <filename> <values> [<values> ...]\n"
1526     ,
1527     "Adds the given file to the internal cache if it is not yet known and\n"
1528     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1529     "for details.\n"
1530     "\n"
1531     "Each <values> has the following form:\n"
1532     "  <values> = <time>:<value>[:<value>[...]]\n"
1533     "See the rrdupdate(1) manpage for details.\n"
1534   },
1535   {
1536     "WROTE",
1537     handle_request_wrote,
1538     CMD_CONTEXT_JOURNAL,
1539     NULL,
1540     NULL
1541   },
1542   {
1543     "FLUSH",
1544     handle_request_flush,
1545     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1546     "FLUSH <filename>\n"
1547     ,
1548     "Adds the given filename to the head of the update queue and returns\n"
1549     "after it has been dequeued.\n"
1550   },
1551   {
1552     "FLUSHALL",
1553     handle_request_flushall,
1554     CMD_CONTEXT_CLIENT,
1555     "FLUSHALL\n"
1556     ,
1557     "Triggers writing of all pending updates.  Returns immediately.\n"
1558   },
1559   {
1560     "PENDING",
1561     handle_request_pending,
1562     CMD_CONTEXT_CLIENT,
1563     "PENDING <filename>\n"
1564     ,
1565     "Shows any 'pending' updates for a file, in order.\n"
1566     "The updates shown have not yet been written to the underlying RRD file.\n"
1567   },
1568   {
1569     "FORGET",
1570     handle_request_forget,
1571     CMD_CONTEXT_ANY,
1572     "FORGET <filename>\n"
1573     ,
1574     "Removes the file completely from the cache.\n"
1575     "Any pending updates for the file will be lost.\n"
1576   },
1577   {
1578     "QUEUE",
1579     handle_request_queue,
1580     CMD_CONTEXT_CLIENT,
1581     "QUEUE\n"
1582     ,
1583         "Shows all files in the output queue.\n"
1584     "The output is zero or more lines in the following format:\n"
1585     "(where <num_vals> is the number of values to be written)\n"
1586     "\n"
1587     "<num_vals> <filename>\n"
1588   },
1589   {
1590     "STATS",
1591     handle_request_stats,
1592     CMD_CONTEXT_CLIENT,
1593     "STATS\n"
1594     ,
1595     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1596     "a description of the values.\n"
1597   },
1598   {
1599     "HELP",
1600     handle_request_help,
1601     CMD_CONTEXT_CLIENT,
1602     "HELP [<command>]\n",
1603     NULL, /* special! */
1604   },
1605   {
1606     "BATCH",
1607     batch_start,
1608     CMD_CONTEXT_CLIENT,
1609     "BATCH\n"
1610     ,
1611     "The 'BATCH' command permits the client to initiate a bulk load\n"
1612     "   of commands to rrdcached.\n"
1613     "\n"
1614     "Usage:\n"
1615     "\n"
1616     "    client: BATCH\n"
1617     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1618     "    client: command #1\n"
1619     "    client: command #2\n"
1620     "    client: ... and so on\n"
1621     "    client: .\n"
1622     "    server: 2 errors\n"
1623     "    server: 7 message for command #7\n"
1624     "    server: 9 message for command #9\n"
1625     "\n"
1626     "For more information, consult the rrdcached(1) documentation.\n"
1627   },
1628   {
1629     ".",   /* BATCH terminator */
1630     batch_done,
1631     CMD_CONTEXT_BATCH,
1632     NULL,
1633     NULL
1634   },
1635   {
1636     "QUIT",
1637     handle_request_quit,
1638     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1639     "QUIT\n"
1640     ,
1641     "Disconnect from rrdcached.\n"
1642   }
1643 }; /* }}} command_t list_of_commands[] */
1644 static size_t list_of_commands_len = sizeof (list_of_commands)
1645   / sizeof (list_of_commands[0]);
1646
1647 static command_t *find_command(char *cmd)
1648 {
1649   size_t i;
1650
1651   for (i = 0; i < list_of_commands_len; i++)
1652     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1653       return (&list_of_commands[i]);
1654   return NULL;
1655 }
1656
1657 /* We currently use the index in the `list_of_commands' array as a bit position
1658  * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
1659  * outside these functions so that switching to a more elegant storage method
1660  * is easily possible. */
1661 static ssize_t find_command_index (const char *cmd) /* {{{ */
1662 {
1663   size_t i;
1664
1665   for (i = 0; i < list_of_commands_len; i++)
1666     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1667       return ((ssize_t) i);
1668   return (-1);
1669 } /* }}} ssize_t find_command_index */
1670
1671 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
1672     const char *cmd)
1673 {
1674   ssize_t i;
1675
1676   if (cmd == NULL)
1677     return (-1);
1678
1679   if ((strcasecmp ("QUIT", cmd) == 0)
1680       || (strcasecmp ("HELP", cmd) == 0))
1681     return (1);
1682   else if (strcmp (".", cmd) == 0)
1683     cmd = "BATCH";
1684
1685   i = find_command_index (cmd);
1686   if (i < 0)
1687     return (-1);
1688   assert (i < 32);
1689
1690   if ((sock->permissions & (1 << i)) != 0)
1691     return (1);
1692   return (0);
1693 } /* }}} int socket_permission_check */
1694
1695 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
1696     const char *cmd)
1697 {
1698   ssize_t i;
1699
1700   i = find_command_index (cmd);
1701   if (i < 0)
1702     return (-1);
1703   assert (i < 32);
1704
1705   sock->permissions |= (1 << i);
1706   return (0);
1707 } /* }}} int socket_permission_add */
1708
1709 /* check whether commands are received in the expected context */
1710 static int command_check_context(listen_socket_t *sock, command_t *cmd)
1711 {
1712   if (sock == NULL)
1713     return (cmd->context & CMD_CONTEXT_JOURNAL);
1714   else if (sock->batch_start)
1715     return (cmd->context & CMD_CONTEXT_BATCH);
1716   else
1717     return (cmd->context & CMD_CONTEXT_CLIENT);
1718
1719   /* NOTREACHED */
1720   assert(1==0);
1721 }
1722
1723 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1724 {
1725   int status;
1726   char *cmd_str;
1727   char *resp_txt;
1728   command_t *help = NULL;
1729
1730   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1731   if (status == 0)
1732     help = find_command(cmd_str);
1733
1734   if (help && (help->syntax || help->help))
1735   {
1736     char tmp[CMD_MAX];
1737
1738     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1739     resp_txt = tmp;
1740
1741     if (help->syntax)
1742       add_response_info(sock, "Usage: %s\n", help->syntax);
1743
1744     if (help->help)
1745       add_response_info(sock, "%s\n", help->help);
1746   }
1747   else
1748   {
1749     size_t i;
1750
1751     resp_txt = "Command overview\n";
1752
1753     for (i = 0; i < list_of_commands_len; i++)
1754     {
1755       if (list_of_commands[i].syntax == NULL)
1756         continue;
1757       add_response_info (sock, "%s", list_of_commands[i].syntax);
1758     }
1759   }
1760
1761   return send_response(sock, RESP_OK, resp_txt);
1762 } /* }}} int handle_request_help */
1763
1764 /* if sock==NULL, we are in journal replay mode */
1765 static int handle_request (DISPATCH_PROTO) /* {{{ */
1766 {
1767   char *buffer_ptr = buffer;
1768   char *cmd_str = NULL;
1769   command_t *cmd = NULL;
1770   int status;
1771
1772   assert (buffer[buffer_size - 1] == '\0');
1773
1774   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1775   if (status != 0)
1776   {
1777     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1778     return (-1);
1779   }
1780
1781   if (sock != NULL && sock->batch_start)
1782     sock->batch_cmd++;
1783
1784   cmd = find_command(cmd_str);
1785   if (!cmd)
1786     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1787
1788   if (!socket_permission_check (sock, cmd->cmd))
1789     return send_response(sock, RESP_ERR, "Permission denied.\n");
1790
1791   if (!command_check_context(sock, cmd))
1792     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1793
1794   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1795 } /* }}} int handle_request */
1796
1797 static void journal_set_free (journal_set *js) /* {{{ */
1798 {
1799   if (js == NULL)
1800     return;
1801
1802   rrd_free_ptrs((void ***) &js->files, &js->files_num);
1803
1804   free(js);
1805 } /* }}} journal_set_free */
1806
1807 static void journal_set_remove (journal_set *js) /* {{{ */
1808 {
1809   if (js == NULL)
1810     return;
1811
1812   for (uint i=0; i < js->files_num; i++)
1813   {
1814     RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
1815     unlink(js->files[i]);
1816   }
1817 } /* }}} journal_set_remove */
1818
1819 /* close current journal file handle.
1820  * MUST hold journal_lock before calling */
1821 static void journal_close(void) /* {{{ */
1822 {
1823   if (journal_fh != NULL)
1824   {
1825     if (fclose(journal_fh) != 0)
1826       RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
1827   }
1828
1829   journal_fh = NULL;
1830   journal_size = 0;
1831 } /* }}} journal_close */
1832
1833 /* MUST hold journal_lock before calling */
1834 static void journal_new_file(void) /* {{{ */
1835 {
1836   struct timeval now;
1837   int  new_fd;
1838   char new_file[PATH_MAX + 1];
1839
1840   assert(journal_dir != NULL);
1841   assert(journal_cur != NULL);
1842
1843   journal_close();
1844
1845   gettimeofday(&now, NULL);
1846   /* this format assures that the files sort in strcmp() order */
1847   snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
1848            journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
1849
1850   new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
1851                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1852   if (new_fd < 0)
1853     goto error;
1854
1855   journal_fh = fdopen(new_fd, "a");
1856   if (journal_fh == NULL)
1857     goto error;
1858
1859   journal_size = ftell(journal_fh);
1860   RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
1861
1862   /* record the file in the journal set */
1863   rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
1864
1865   return;
1866
1867 error:
1868   RRDD_LOG(LOG_CRIT,
1869            "JOURNALING DISABLED: Error while trying to create %s : %s",
1870            new_file, rrd_strerror(errno));
1871   RRDD_LOG(LOG_CRIT,
1872            "JOURNALING DISABLED: All values will be flushed at shutdown");
1873
1874   close(new_fd);
1875   config_flush_at_shutdown = 1;
1876
1877 } /* }}} journal_new_file */
1878
1879 /* MUST NOT hold journal_lock before calling this */
1880 static void journal_rotate(void) /* {{{ */
1881 {
1882   journal_set *old_js = NULL;
1883
1884   if (journal_dir == NULL)
1885     return;
1886
1887   RRDD_LOG(LOG_DEBUG, "rotating journals");
1888
1889   pthread_mutex_lock(&stats_lock);
1890   ++stats_journal_rotate;
1891   pthread_mutex_unlock(&stats_lock);
1892
1893   pthread_mutex_lock(&journal_lock);
1894
1895   journal_close();
1896
1897   /* rotate the journal sets */
1898   old_js = journal_old;
1899   journal_old = journal_cur;
1900   journal_cur = calloc(1, sizeof(journal_set));
1901
1902   if (journal_cur != NULL)
1903     journal_new_file();
1904   else
1905     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
1906
1907   pthread_mutex_unlock(&journal_lock);
1908
1909   journal_set_remove(old_js);
1910   journal_set_free  (old_js);
1911
1912 } /* }}} static void journal_rotate */
1913
1914 /* MUST hold journal_lock when calling */
1915 static void journal_done(void) /* {{{ */
1916 {
1917   if (journal_cur == NULL)
1918     return;
1919
1920   journal_close();
1921
1922   if (config_flush_at_shutdown)
1923   {
1924     RRDD_LOG(LOG_INFO, "removing journals");
1925     journal_set_remove(journal_old);
1926     journal_set_remove(journal_cur);
1927   }
1928   else
1929   {
1930     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1931              "journals will be used at next startup");
1932   }
1933
1934   journal_set_free(journal_cur);
1935   journal_set_free(journal_old);
1936   free(journal_dir);
1937
1938 } /* }}} static void journal_done */
1939
1940 static int journal_write(char *cmd, char *args) /* {{{ */
1941 {
1942   int chars;
1943
1944   if (journal_fh == NULL)
1945     return 0;
1946
1947   pthread_mutex_lock(&journal_lock);
1948   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1949   journal_size += chars;
1950
1951   if (journal_size > JOURNAL_MAX)
1952     journal_new_file();
1953
1954   pthread_mutex_unlock(&journal_lock);
1955
1956   if (chars > 0)
1957   {
1958     pthread_mutex_lock(&stats_lock);
1959     stats_journal_bytes += chars;
1960     pthread_mutex_unlock(&stats_lock);
1961   }
1962
1963   return chars;
1964 } /* }}} static int journal_write */
1965
1966 static int journal_replay (const char *file) /* {{{ */
1967 {
1968   FILE *fh;
1969   int entry_cnt = 0;
1970   int fail_cnt = 0;
1971   uint64_t line = 0;
1972   char entry[CMD_MAX];
1973   time_t now;
1974
1975   if (file == NULL) return 0;
1976
1977   {
1978     char *reason = "unknown error";
1979     int status = 0;
1980     struct stat statbuf;
1981
1982     memset(&statbuf, 0, sizeof(statbuf));
1983     if (stat(file, &statbuf) != 0)
1984     {
1985       reason = "stat error";
1986       status = errno;
1987     }
1988     else if (!S_ISREG(statbuf.st_mode))
1989     {
1990       reason = "not a regular file";
1991       status = EPERM;
1992     }
1993     if (statbuf.st_uid != daemon_uid)
1994     {
1995       reason = "not owned by daemon user";
1996       status = EACCES;
1997     }
1998     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1999     {
2000       reason = "must not be user/group writable";
2001       status = EACCES;
2002     }
2003
2004     if (status != 0)
2005     {
2006       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2007                file, rrd_strerror(status), reason);
2008       return 0;
2009     }
2010   }
2011
2012   fh = fopen(file, "r");
2013   if (fh == NULL)
2014   {
2015     if (errno != ENOENT)
2016       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2017                file, rrd_strerror(errno));
2018     return 0;
2019   }
2020   else
2021     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2022
2023   now = time(NULL);
2024
2025   while(!feof(fh))
2026   {
2027     size_t entry_len;
2028
2029     ++line;
2030     if (fgets(entry, sizeof(entry), fh) == NULL)
2031       break;
2032     entry_len = strlen(entry);
2033
2034     /* check \n termination in case journal writing crashed mid-line */
2035     if (entry_len == 0)
2036       continue;
2037     else if (entry[entry_len - 1] != '\n')
2038     {
2039       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2040       ++fail_cnt;
2041       continue;
2042     }
2043
2044     entry[entry_len - 1] = '\0';
2045
2046     if (handle_request(NULL, now, entry, entry_len) == 0)
2047       ++entry_cnt;
2048     else
2049       ++fail_cnt;
2050   }
2051
2052   fclose(fh);
2053
2054   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2055            entry_cnt, fail_cnt);
2056
2057   return entry_cnt > 0 ? 1 : 0;
2058 } /* }}} static int journal_replay */
2059
2060 static int journal_sort(const void *v1, const void *v2)
2061 {
2062   char **jn1 = (char **) v1;
2063   char **jn2 = (char **) v2;
2064
2065   return strcmp(*jn1,*jn2);
2066 }
2067
2068 static void journal_init(void) /* {{{ */
2069 {
2070   int had_journal = 0;
2071   DIR *dir;
2072   struct dirent *dent;
2073   char path[PATH_MAX+1];
2074
2075   if (journal_dir == NULL) return;
2076
2077   pthread_mutex_lock(&journal_lock);
2078
2079   journal_cur = calloc(1, sizeof(journal_set));
2080   if (journal_cur == NULL)
2081   {
2082     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2083     return;
2084   }
2085
2086   RRDD_LOG(LOG_INFO, "checking for journal files");
2087
2088   /* Handle old journal files during transition.  This gives them the
2089    * correct sort order.  TODO: remove after first release
2090    */
2091   {
2092     char old_path[PATH_MAX+1];
2093     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2094     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2095     rename(old_path, path);
2096
2097     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE        );
2098     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2099     rename(old_path, path);
2100   }
2101
2102   dir = opendir(journal_dir);
2103   while ((dent = readdir(dir)) != NULL)
2104   {
2105     /* looks like a journal file? */
2106     if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2107       continue;
2108
2109     snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2110
2111     if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2112     {
2113       RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2114                dent->d_name);
2115       break;
2116     }
2117   }
2118   closedir(dir);
2119
2120   qsort(journal_cur->files, journal_cur->files_num,
2121         sizeof(journal_cur->files[0]), journal_sort);
2122
2123   for (uint i=0; i < journal_cur->files_num; i++)
2124     had_journal += journal_replay(journal_cur->files[i]);
2125
2126   journal_new_file();
2127
2128   /* it must have been a crash.  start a flush */
2129   if (had_journal && config_flush_at_shutdown)
2130     flush_old_values(-1);
2131
2132   pthread_mutex_unlock(&journal_lock);
2133
2134   RRDD_LOG(LOG_INFO, "journal processing complete");
2135
2136 } /* }}} static void journal_init */
2137
2138 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2139 {
2140   assert(sock != NULL);
2141
2142   free(sock->rbuf);  sock->rbuf = NULL;
2143   free(sock->wbuf);  sock->wbuf = NULL;
2144   free(sock);
2145 } /* }}} void free_listen_socket */
2146
2147 static void close_connection(listen_socket_t *sock) /* {{{ */
2148 {
2149   if (sock->fd >= 0)
2150   {
2151     close(sock->fd);
2152     sock->fd = -1;
2153   }
2154
2155   free_listen_socket(sock);
2156
2157 } /* }}} void close_connection */
2158
2159 static void *connection_thread_main (void *args) /* {{{ */
2160 {
2161   listen_socket_t *sock;
2162   int fd;
2163
2164   sock = (listen_socket_t *) args;
2165   fd = sock->fd;
2166
2167   /* init read buffers */
2168   sock->next_read = sock->next_cmd = 0;
2169   sock->rbuf = malloc(RBUF_SIZE);
2170   if (sock->rbuf == NULL)
2171   {
2172     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2173     close_connection(sock);
2174     return NULL;
2175   }
2176
2177   pthread_mutex_lock (&connection_threads_lock);
2178   connection_threads_num++;
2179   pthread_mutex_unlock (&connection_threads_lock);
2180
2181   while (state == RUNNING)
2182   {
2183     char *cmd;
2184     ssize_t cmd_len;
2185     ssize_t rbytes;
2186     time_t now;
2187
2188     struct pollfd pollfd;
2189     int status;
2190
2191     pollfd.fd = fd;
2192     pollfd.events = POLLIN | POLLPRI;
2193     pollfd.revents = 0;
2194
2195     status = poll (&pollfd, 1, /* timeout = */ 500);
2196     if (state != RUNNING)
2197       break;
2198     else if (status == 0) /* timeout */
2199       continue;
2200     else if (status < 0) /* error */
2201     {
2202       status = errno;
2203       if (status != EINTR)
2204         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2205       continue;
2206     }
2207
2208     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2209       break;
2210     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2211     {
2212       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2213           "poll(2) returned something unexpected: %#04hx",
2214           pollfd.revents);
2215       break;
2216     }
2217
2218     rbytes = read(fd, sock->rbuf + sock->next_read,
2219                   RBUF_SIZE - sock->next_read);
2220     if (rbytes < 0)
2221     {
2222       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2223       break;
2224     }
2225     else if (rbytes == 0)
2226       break; /* eof */
2227
2228     sock->next_read += rbytes;
2229
2230     if (sock->batch_start)
2231       now = sock->batch_start;
2232     else
2233       now = time(NULL);
2234
2235     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2236     {
2237       status = handle_request (sock, now, cmd, cmd_len+1);
2238       if (status != 0)
2239         goto out_close;
2240     }
2241   }
2242
2243 out_close:
2244   close_connection(sock);
2245
2246   /* Remove this thread from the connection threads list */
2247   pthread_mutex_lock (&connection_threads_lock);
2248   connection_threads_num--;
2249   if (connection_threads_num <= 0)
2250     pthread_cond_broadcast(&connection_threads_done);
2251   pthread_mutex_unlock (&connection_threads_lock);
2252
2253   return (NULL);
2254 } /* }}} void *connection_thread_main */
2255
2256 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2257 {
2258   int fd;
2259   struct sockaddr_un sa;
2260   listen_socket_t *temp;
2261   int status;
2262   const char *path;
2263   char *path_copy, *dir;
2264
2265   path = sock->addr;
2266   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2267     path += strlen("unix:");
2268
2269   /* dirname may modify its argument */
2270   path_copy = strdup(path);
2271   if (path_copy == NULL)
2272   {
2273     fprintf(stderr, "rrdcached: strdup(): %s\n",
2274         rrd_strerror(errno));
2275     return (-1);
2276   }
2277
2278   dir = dirname(path_copy);
2279   if (rrd_mkdir_p(dir, 0777) != 0)
2280   {
2281     fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2282         dir, rrd_strerror(errno));
2283     return (-1);
2284   }
2285
2286   free(path_copy);
2287
2288   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2289       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2290   if (temp == NULL)
2291   {
2292     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2293     return (-1);
2294   }
2295   listen_fds = temp;
2296   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2297
2298   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2299   if (fd < 0)
2300   {
2301     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2302              rrd_strerror(errno));
2303     return (-1);
2304   }
2305
2306   memset (&sa, 0, sizeof (sa));
2307   sa.sun_family = AF_UNIX;
2308   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2309
2310   /* if we've gotten this far, we own the pid file.  any daemon started
2311    * with the same args must not be alive.  therefore, ensure that we can
2312    * create the socket...
2313    */
2314   unlink(path);
2315
2316   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2317   if (status != 0)
2318   {
2319     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2320              path, rrd_strerror(errno));
2321     close (fd);
2322     return (-1);
2323   }
2324
2325   status = listen (fd, /* backlog = */ 10);
2326   if (status != 0)
2327   {
2328     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2329              path, rrd_strerror(errno));
2330     close (fd);
2331     unlink (path);
2332     return (-1);
2333   }
2334
2335   listen_fds[listen_fds_num].fd = fd;
2336   listen_fds[listen_fds_num].family = PF_UNIX;
2337   strncpy(listen_fds[listen_fds_num].addr, path,
2338           sizeof (listen_fds[listen_fds_num].addr) - 1);
2339   listen_fds_num++;
2340
2341   return (0);
2342 } /* }}} int open_listen_socket_unix */
2343
2344 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2345 {
2346   struct addrinfo ai_hints;
2347   struct addrinfo *ai_res;
2348   struct addrinfo *ai_ptr;
2349   char addr_copy[NI_MAXHOST];
2350   char *addr;
2351   char *port;
2352   int status;
2353
2354   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2355   addr_copy[sizeof (addr_copy) - 1] = 0;
2356   addr = addr_copy;
2357
2358   memset (&ai_hints, 0, sizeof (ai_hints));
2359   ai_hints.ai_flags = 0;
2360 #ifdef AI_ADDRCONFIG
2361   ai_hints.ai_flags |= AI_ADDRCONFIG;
2362 #endif
2363   ai_hints.ai_family = AF_UNSPEC;
2364   ai_hints.ai_socktype = SOCK_STREAM;
2365
2366   port = NULL;
2367   if (*addr == '[') /* IPv6+port format */
2368   {
2369     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2370     addr++;
2371
2372     port = strchr (addr, ']');
2373     if (port == NULL)
2374     {
2375       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2376       return (-1);
2377     }
2378     *port = 0;
2379     port++;
2380
2381     if (*port == ':')
2382       port++;
2383     else if (*port == 0)
2384       port = NULL;
2385     else
2386     {
2387       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2388       return (-1);
2389     }
2390   } /* if (*addr = ']') */
2391   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2392   {
2393     port = rindex(addr, ':');
2394     if (port != NULL)
2395     {
2396       *port = 0;
2397       port++;
2398     }
2399   }
2400   ai_res = NULL;
2401   status = getaddrinfo (addr,
2402                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2403                         &ai_hints, &ai_res);
2404   if (status != 0)
2405   {
2406     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2407              addr, gai_strerror (status));
2408     return (-1);
2409   }
2410
2411   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2412   {
2413     int fd;
2414     listen_socket_t *temp;
2415     int one = 1;
2416
2417     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2418         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2419     if (temp == NULL)
2420     {
2421       fprintf (stderr,
2422                "rrdcached: open_listen_socket_network: realloc failed.\n");
2423       continue;
2424     }
2425     listen_fds = temp;
2426     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2427
2428     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2429     if (fd < 0)
2430     {
2431       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2432                rrd_strerror(errno));
2433       continue;
2434     }
2435
2436     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2437
2438     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2439     if (status != 0)
2440     {
2441       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2442                sock->addr, rrd_strerror(errno));
2443       close (fd);
2444       continue;
2445     }
2446
2447     status = listen (fd, /* backlog = */ 10);
2448     if (status != 0)
2449     {
2450       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2451                sock->addr, rrd_strerror(errno));
2452       close (fd);
2453       freeaddrinfo(ai_res);
2454       return (-1);
2455     }
2456
2457     listen_fds[listen_fds_num].fd = fd;
2458     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2459     listen_fds_num++;
2460   } /* for (ai_ptr) */
2461
2462   freeaddrinfo(ai_res);
2463   return (0);
2464 } /* }}} static int open_listen_socket_network */
2465
2466 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2467 {
2468   assert(sock != NULL);
2469   assert(sock->addr != NULL);
2470
2471   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2472       || sock->addr[0] == '/')
2473     return (open_listen_socket_unix(sock));
2474   else
2475     return (open_listen_socket_network(sock));
2476 } /* }}} int open_listen_socket */
2477
2478 static int close_listen_sockets (void) /* {{{ */
2479 {
2480   size_t i;
2481
2482   for (i = 0; i < listen_fds_num; i++)
2483   {
2484     close (listen_fds[i].fd);
2485
2486     if (listen_fds[i].family == PF_UNIX)
2487       unlink(listen_fds[i].addr);
2488   }
2489
2490   free (listen_fds);
2491   listen_fds = NULL;
2492   listen_fds_num = 0;
2493
2494   return (0);
2495 } /* }}} int close_listen_sockets */
2496
2497 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2498 {
2499   struct pollfd *pollfds;
2500   int pollfds_num;
2501   int status;
2502   int i;
2503
2504   if (listen_fds_num < 1)
2505   {
2506     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2507     return (NULL);
2508   }
2509
2510   pollfds_num = listen_fds_num;
2511   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2512   if (pollfds == NULL)
2513   {
2514     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2515     return (NULL);
2516   }
2517   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2518
2519   RRDD_LOG(LOG_INFO, "listening for connections");
2520
2521   while (state == RUNNING)
2522   {
2523     for (i = 0; i < pollfds_num; i++)
2524     {
2525       pollfds[i].fd = listen_fds[i].fd;
2526       pollfds[i].events = POLLIN | POLLPRI;
2527       pollfds[i].revents = 0;
2528     }
2529
2530     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2531     if (state != RUNNING)
2532       break;
2533     else if (status == 0) /* timeout */
2534       continue;
2535     else if (status < 0) /* error */
2536     {
2537       status = errno;
2538       if (status != EINTR)
2539       {
2540         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2541       }
2542       continue;
2543     }
2544
2545     for (i = 0; i < pollfds_num; i++)
2546     {
2547       listen_socket_t *client_sock;
2548       struct sockaddr_storage client_sa;
2549       socklen_t client_sa_size;
2550       pthread_t tid;
2551       pthread_attr_t attr;
2552
2553       if (pollfds[i].revents == 0)
2554         continue;
2555
2556       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2557       {
2558         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2559             "poll(2) returned something unexpected for listen FD #%i.",
2560             pollfds[i].fd);
2561         continue;
2562       }
2563
2564       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2565       if (client_sock == NULL)
2566       {
2567         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2568         continue;
2569       }
2570       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2571
2572       client_sa_size = sizeof (client_sa);
2573       client_sock->fd = accept (pollfds[i].fd,
2574           (struct sockaddr *) &client_sa, &client_sa_size);
2575       if (client_sock->fd < 0)
2576       {
2577         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2578         free(client_sock);
2579         continue;
2580       }
2581
2582       pthread_attr_init (&attr);
2583       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2584
2585       status = pthread_create (&tid, &attr, connection_thread_main,
2586                                client_sock);
2587       if (status != 0)
2588       {
2589         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2590         close_connection(client_sock);
2591         continue;
2592       }
2593     } /* for (pollfds_num) */
2594   } /* while (state == RUNNING) */
2595
2596   RRDD_LOG(LOG_INFO, "starting shutdown");
2597
2598   close_listen_sockets ();
2599
2600   pthread_mutex_lock (&connection_threads_lock);
2601   while (connection_threads_num > 0)
2602     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2603   pthread_mutex_unlock (&connection_threads_lock);
2604
2605   free(pollfds);
2606
2607   return (NULL);
2608 } /* }}} void *listen_thread_main */
2609
2610 static int daemonize (void) /* {{{ */
2611 {
2612   int pid_fd;
2613   char *base_dir;
2614
2615   daemon_uid = geteuid();
2616
2617   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2618   if (pid_fd < 0)
2619     pid_fd = check_pidfile();
2620   if (pid_fd < 0)
2621     return pid_fd;
2622
2623   /* open all the listen sockets */
2624   if (config_listen_address_list_len > 0)
2625   {
2626     for (size_t i = 0; i < config_listen_address_list_len; i++)
2627       open_listen_socket (config_listen_address_list[i]);
2628
2629     rrd_free_ptrs((void ***) &config_listen_address_list,
2630                   &config_listen_address_list_len);
2631   }
2632   else
2633   {
2634     listen_socket_t sock;
2635     memset(&sock, 0, sizeof(sock));
2636     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2637     open_listen_socket (&sock);
2638   }
2639
2640   if (listen_fds_num < 1)
2641   {
2642     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2643     goto error;
2644   }
2645
2646   if (!stay_foreground)
2647   {
2648     pid_t child;
2649
2650     child = fork ();
2651     if (child < 0)
2652     {
2653       fprintf (stderr, "daemonize: fork(2) failed.\n");
2654       goto error;
2655     }
2656     else if (child > 0)
2657       exit(0);
2658
2659     /* Become session leader */
2660     setsid ();
2661
2662     /* Open the first three file descriptors to /dev/null */
2663     close (2);
2664     close (1);
2665     close (0);
2666
2667     open ("/dev/null", O_RDWR);
2668     if (dup(0) == -1 || dup(0) == -1){
2669         RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2670     }
2671   } /* if (!stay_foreground) */
2672
2673   /* Change into the /tmp directory. */
2674   base_dir = (config_base_dir != NULL)
2675     ? config_base_dir
2676     : "/tmp";
2677
2678   if (chdir (base_dir) != 0)
2679   {
2680     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2681     goto error;
2682   }
2683
2684   install_signal_handlers();
2685
2686   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2687   RRDD_LOG(LOG_INFO, "starting up");
2688
2689   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2690                                 (GDestroyNotify) free_cache_item);
2691   if (cache_tree == NULL)
2692   {
2693     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2694     goto error;
2695   }
2696
2697   return write_pidfile (pid_fd);
2698
2699 error:
2700   remove_pidfile();
2701   return -1;
2702 } /* }}} int daemonize */
2703
2704 static int cleanup (void) /* {{{ */
2705 {
2706   pthread_cond_broadcast (&flush_cond);
2707   pthread_join (flush_thread, NULL);
2708
2709   pthread_cond_broadcast (&queue_cond);
2710   for (int i = 0; i < config_queue_threads; i++)
2711     pthread_join (queue_threads[i], NULL);
2712
2713   if (config_flush_at_shutdown)
2714   {
2715     assert(cache_queue_head == NULL);
2716     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2717   }
2718
2719   free(queue_threads);
2720   free(config_base_dir);
2721
2722   pthread_mutex_lock(&cache_lock);
2723   g_tree_destroy(cache_tree);
2724
2725   pthread_mutex_lock(&journal_lock);
2726   journal_done();
2727
2728   RRDD_LOG(LOG_INFO, "goodbye");
2729   closelog ();
2730
2731   remove_pidfile ();
2732   free(config_pid_file);
2733
2734   return (0);
2735 } /* }}} int cleanup */
2736
2737 static int read_options (int argc, char **argv) /* {{{ */
2738 {
2739   int option;
2740   int status = 0;
2741
2742   char **permissions = NULL;
2743   size_t permissions_len = 0;
2744
2745   while ((option = getopt(argc, argv, "gl:P:f:w:z:t:Bb:p:Fj:h?")) != -1)
2746   {
2747     switch (option)
2748     {
2749       case 'g':
2750         stay_foreground=1;
2751         break;
2752
2753       case 'l':
2754       {
2755         listen_socket_t *new;
2756
2757         new = malloc(sizeof(listen_socket_t));
2758         if (new == NULL)
2759         {
2760           fprintf(stderr, "read_options: malloc failed.\n");
2761           return(2);
2762         }
2763         memset(new, 0, sizeof(listen_socket_t));
2764
2765         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2766
2767         /* Add permissions to the socket {{{ */
2768         if (permissions_len != 0)
2769         {
2770           size_t i;
2771           for (i = 0; i < permissions_len; i++)
2772           {
2773             status = socket_permission_add (new, permissions[i]);
2774             if (status != 0)
2775             {
2776               fprintf (stderr, "read_options: Adding permission \"%s\" to "
2777                   "socket failed. Most likely, this permission doesn't "
2778                   "exist. Check your command line.\n", permissions[i]);
2779               status = 4;
2780             }
2781           }
2782         }
2783         else /* if (permissions_len == 0) */
2784         {
2785           /* Add permission for ALL commands to the socket. */
2786           size_t i;
2787           for (i = 0; i < list_of_commands_len; i++)
2788           {
2789             status = socket_permission_add (new, list_of_commands[i].cmd);
2790             if (status != 0)
2791             {
2792               fprintf (stderr, "read_options: Adding permission \"%s\" to "
2793                   "socket failed. This should never happen, ever! Sorry.\n",
2794                   permissions[i]);
2795               status = 4;
2796             }
2797           }
2798         }
2799         /* }}} Done adding permissions. */
2800
2801         if (!rrd_add_ptr((void ***)&config_listen_address_list,
2802                          &config_listen_address_list_len, new))
2803         {
2804           fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2805           return (2);
2806         }
2807       }
2808       break;
2809
2810       case 'P':
2811       {
2812         char *optcopy;
2813         char *saveptr;
2814         char *dummy;
2815         char *ptr;
2816
2817         rrd_free_ptrs ((void *) &permissions, &permissions_len);
2818
2819         optcopy = strdup (optarg);
2820         dummy = optcopy;
2821         saveptr = NULL;
2822         while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
2823         {
2824           dummy = NULL;
2825           rrd_add_strdup ((void *) &permissions, &permissions_len, ptr);
2826         }
2827
2828         free (optcopy);
2829       }
2830       break;
2831
2832       case 'f':
2833       {
2834         int temp;
2835
2836         temp = atoi (optarg);
2837         if (temp > 0)
2838           config_flush_interval = temp;
2839         else
2840         {
2841           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2842           status = 3;
2843         }
2844       }
2845       break;
2846
2847       case 'w':
2848       {
2849         int temp;
2850
2851         temp = atoi (optarg);
2852         if (temp > 0)
2853           config_write_interval = temp;
2854         else
2855         {
2856           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2857           status = 2;
2858         }
2859       }
2860       break;
2861
2862       case 'z':
2863       {
2864         int temp;
2865
2866         temp = atoi(optarg);
2867         if (temp > 0)
2868           config_write_jitter = temp;
2869         else
2870         {
2871           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2872           status = 2;
2873         }
2874
2875         break;
2876       }
2877
2878       case 't':
2879       {
2880         int threads;
2881         threads = atoi(optarg);
2882         if (threads >= 1)
2883           config_queue_threads = threads;
2884         else
2885         {
2886           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2887           return 1;
2888         }
2889       }
2890       break;
2891
2892       case 'B':
2893         config_write_base_only = 1;
2894         break;
2895
2896       case 'b':
2897       {
2898         size_t len;
2899         char base_realpath[PATH_MAX];
2900
2901         if (config_base_dir != NULL)
2902           free (config_base_dir);
2903         config_base_dir = strdup (optarg);
2904         if (config_base_dir == NULL)
2905         {
2906           fprintf (stderr, "read_options: strdup failed.\n");
2907           return (3);
2908         }
2909
2910         if (rrd_mkdir_p (config_base_dir, 0777) != 0)
2911         {
2912           fprintf (stderr, "Failed to create base directory '%s': %s\n",
2913               config_base_dir, rrd_strerror (errno));
2914           return (3);
2915         }
2916
2917         /* make sure that the base directory is not resolved via
2918          * symbolic links.  this makes some performance-enhancing
2919          * assumptions possible (we don't have to resolve paths
2920          * that start with a "/")
2921          */
2922         if (realpath(config_base_dir, base_realpath) == NULL)
2923         {
2924           fprintf (stderr, "Failed to canonicalize the base directory '%s': "
2925               "%s\n", config_base_dir, rrd_strerror(errno));
2926           return 5;
2927         }
2928
2929         len = strlen (config_base_dir);
2930         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2931         {
2932           config_base_dir[len - 1] = 0;
2933           len--;
2934         }
2935
2936         if (len < 1)
2937         {
2938           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2939           return (4);
2940         }
2941
2942         _config_base_dir_len = len;
2943
2944         len = strlen (base_realpath);
2945         while ((len > 0) && (base_realpath[len - 1] == '/'))
2946         {
2947           base_realpath[len - 1] = '\0';
2948           len--;
2949         }
2950
2951         if (strncmp(config_base_dir,
2952                          base_realpath, sizeof(base_realpath)) != 0)
2953         {
2954           fprintf(stderr,
2955                   "Base directory (-b) resolved via file system links!\n"
2956                   "Please consult rrdcached '-b' documentation!\n"
2957                   "Consider specifying the real directory (%s)\n",
2958                   base_realpath);
2959           return 5;
2960         }
2961       }
2962       break;
2963
2964       case 'p':
2965       {
2966         if (config_pid_file != NULL)
2967           free (config_pid_file);
2968         config_pid_file = strdup (optarg);
2969         if (config_pid_file == NULL)
2970         {
2971           fprintf (stderr, "read_options: strdup failed.\n");
2972           return (3);
2973         }
2974       }
2975       break;
2976
2977       case 'F':
2978         config_flush_at_shutdown = 1;
2979         break;
2980
2981       case 'j':
2982       {
2983         const char *dir = journal_dir = strdup(optarg);
2984
2985         status = rrd_mkdir_p(dir, 0777);
2986         if (status != 0)
2987         {
2988           fprintf(stderr, "Failed to create journal directory '%s': %s\n",
2989               dir, rrd_strerror(errno));
2990           return 6;
2991         }
2992
2993         if (access(dir, R_OK|W_OK|X_OK) != 0)
2994         {
2995           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2996                   errno ? rrd_strerror(errno) : "");
2997           return 6;
2998         }
2999       }
3000       break;
3001
3002       case 'h':
3003       case '?':
3004         printf ("RRDCacheD %s\n"
3005             "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3006             "\n"
3007             "Usage: rrdcached [options]\n"
3008             "\n"
3009             "Valid options are:\n"
3010             "  -l <address>  Socket address to listen to.\n"
3011             "  -P <perms>    Sets the permissions to assign to all following "
3012                             "sockets\n"
3013             "  -w <seconds>  Interval in which to write data.\n"
3014             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
3015             "  -t <threads>  Number of write threads.\n"
3016             "  -f <seconds>  Interval in which to flush dead data.\n"
3017             "  -p <file>     Location of the PID-file.\n"
3018             "  -b <dir>      Base directory to change to.\n"
3019             "  -B            Restrict file access to paths within -b <dir>\n"
3020             "  -g            Do not fork and run in the foreground.\n"
3021             "  -j <dir>      Directory in which to create the journal files.\n"
3022             "  -F            Always flush all updates at shutdown\n"
3023             "\n"
3024             "For more information and a detailed description of all options "
3025             "please refer\n"
3026             "to the rrdcached(1) manual page.\n",
3027             VERSION);
3028         status = -1;
3029         break;
3030     } /* switch (option) */
3031   } /* while (getopt) */
3032
3033   /* advise the user when values are not sane */
3034   if (config_flush_interval < 2 * config_write_interval)
3035     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3036             " 2x write interval (-w) !\n");
3037   if (config_write_jitter > config_write_interval)
3038     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3039             " write interval (-w) !\n");
3040
3041   if (config_write_base_only && config_base_dir == NULL)
3042     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3043             "  Consult the rrdcached documentation\n");
3044
3045   if (journal_dir == NULL)
3046     config_flush_at_shutdown = 1;
3047
3048   rrd_free_ptrs ((void *) &permissions, &permissions_len);
3049
3050   return (status);
3051 } /* }}} int read_options */
3052
3053 int main (int argc, char **argv)
3054 {
3055   int status;
3056
3057   status = read_options (argc, argv);
3058   if (status != 0)
3059   {
3060     if (status < 0)
3061       status = 0;
3062     return (status);
3063   }
3064
3065   status = daemonize ();
3066   if (status != 0)
3067   {
3068     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3069     return (1);
3070   }
3071
3072   journal_init();
3073
3074   /* start the queue threads */
3075   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3076   if (queue_threads == NULL)
3077   {
3078     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3079     cleanup();
3080     return (1);
3081   }
3082   for (int i = 0; i < config_queue_threads; i++)
3083   {
3084     memset (&queue_threads[i], 0, sizeof (*queue_threads));
3085     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3086     if (status != 0)
3087     {
3088       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3089       cleanup();
3090       return (1);
3091     }
3092   }
3093
3094   /* start the flush thread */
3095   memset(&flush_thread, 0, sizeof(flush_thread));
3096   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3097   if (status != 0)
3098   {
3099     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3100     cleanup();
3101     return (1);
3102   }
3103
3104   listen_thread_main (NULL);
3105   cleanup ();
3106
3107   return (0);
3108 } /* int main */
3109
3110 /*
3111  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3112  */