rrdcached: Updated help output
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008,2009 Florian octo Forster
4  * Copyright (C) 2008,2009 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
73
74 #include "rrd.h"
75 #include "rrd_client.h"
76
77 #include <stdlib.h>
78
79 #ifndef WIN32
80 #ifdef HAVE_STDINT_H
81 #  include <stdint.h>
82 #endif
83 #include <unistd.h>
84 #include <strings.h>
85 #include <inttypes.h>
86 #include <sys/socket.h>
87
88 #else
89
90 #endif
91 #include <stdio.h>
92 #include <string.h>
93
94 #include <sys/types.h>
95 #include <sys/stat.h>
96 #include <dirent.h>
97 #include <fcntl.h>
98 #include <signal.h>
99 #include <sys/un.h>
100 #include <netdb.h>
101 #include <poll.h>
102 #include <syslog.h>
103 #include <pthread.h>
104 #include <errno.h>
105 #include <assert.h>
106 #include <sys/time.h>
107 #include <time.h>
108 #include <libgen.h>
109 #include <grp.h>
110
111 #include <glib-2.0/glib.h>
112 /* }}} */
113
114 #define RRDD_LOG(severity, ...) \
115   do { \
116     if (stay_foreground) \
117       fprintf(stderr, __VA_ARGS__); \
118     syslog ((severity), __VA_ARGS__); \
119   } while (0)
120
121 #ifndef __GNUC__
122 # define __attribute__(x) /**/
123 #endif
124
125 /*
126  * Types
127  */
128 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
129
130 struct listen_socket_s
131 {
132   int fd;
133   char addr[PATH_MAX + 1];
134   int family;
135
136   /* state for BATCH processing */
137   time_t batch_start;
138   int batch_cmd;
139
140   /* buffered IO */
141   char *rbuf;
142   off_t next_cmd;
143   off_t next_read;
144
145   char *wbuf;
146   ssize_t wbuf_len;
147
148   uint32_t permissions;
149
150   gid_t  socket_group;
151   mode_t socket_permissions;
152 };
153 typedef struct listen_socket_s listen_socket_t;
154
155 struct command_s;
156 typedef struct command_s command_t;
157 /* note: guard against "unused" warnings in the handlers */
158 #define DISPATCH_PROTO  listen_socket_t *sock   __attribute__((unused)),\
159                         time_t now              __attribute__((unused)),\
160                         char  *buffer           __attribute__((unused)),\
161                         size_t buffer_size      __attribute__((unused))
162
163 #define HANDLER_PROTO   command_t *cmd          __attribute__((unused)),\
164                         DISPATCH_PROTO
165
166 struct command_s {
167   char   *cmd;
168   int (*handler)(HANDLER_PROTO);
169
170   char  context;                /* where we expect to see it */
171 #define CMD_CONTEXT_CLIENT      (1<<0)
172 #define CMD_CONTEXT_BATCH       (1<<1)
173 #define CMD_CONTEXT_JOURNAL     (1<<2)
174 #define CMD_CONTEXT_ANY         (0x7f)
175
176   char *syntax;
177   char *help;
178 };
179
180 struct cache_item_s;
181 typedef struct cache_item_s cache_item_t;
182 struct cache_item_s
183 {
184   char *file;
185   char **values;
186   size_t values_num;
187   time_t last_flush_time;
188   time_t last_update_stamp;
189 #define CI_FLAGS_IN_TREE  (1<<0)
190 #define CI_FLAGS_IN_QUEUE (1<<1)
191   int flags;
192   pthread_cond_t  flushed;
193   cache_item_t *prev;
194   cache_item_t *next;
195 };
196
197 struct callback_flush_data_s
198 {
199   time_t now;
200   time_t abs_timeout;
201   char **keys;
202   size_t keys_num;
203 };
204 typedef struct callback_flush_data_s callback_flush_data_t;
205
206 enum queue_side_e
207 {
208   HEAD,
209   TAIL
210 };
211 typedef enum queue_side_e queue_side_t;
212
213 /* describe a set of journal files */
214 typedef struct {
215   char **files;
216   size_t files_num;
217 } journal_set;
218
219 /* max length of socket command or response */
220 #define CMD_MAX 4096
221 #define RBUF_SIZE (CMD_MAX*2)
222
223 /*
224  * Variables
225  */
226 static int stay_foreground = 0;
227 static uid_t daemon_uid;
228
229 static listen_socket_t *listen_fds = NULL;
230 static size_t listen_fds_num = 0;
231
232 enum {
233   RUNNING,              /* normal operation */
234   FLUSHING,             /* flushing remaining values */
235   SHUTDOWN              /* shutting down */
236 } state = RUNNING;
237
238 static pthread_t *queue_threads;
239 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
240 static int config_queue_threads = 4;
241
242 static pthread_t flush_thread;
243 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
244
245 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
246 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
247 static int connection_threads_num = 0;
248
249 /* Cache stuff */
250 static GTree          *cache_tree = NULL;
251 static cache_item_t   *cache_queue_head = NULL;
252 static cache_item_t   *cache_queue_tail = NULL;
253 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
254
255 static int config_write_interval = 300;
256 static int config_write_jitter   = 0;
257 static int config_flush_interval = 3600;
258 static int config_flush_at_shutdown = 0;
259 static char *config_pid_file = NULL;
260 static char *config_base_dir = NULL;
261 static size_t _config_base_dir_len = 0;
262 static int config_write_base_only = 0;
263
264 static listen_socket_t **config_listen_address_list = NULL;
265 static size_t config_listen_address_list_len = 0;
266
267 static uint64_t stats_queue_length = 0;
268 static uint64_t stats_updates_received = 0;
269 static uint64_t stats_flush_received = 0;
270 static uint64_t stats_updates_written = 0;
271 static uint64_t stats_data_sets_written = 0;
272 static uint64_t stats_journal_bytes = 0;
273 static uint64_t stats_journal_rotate = 0;
274 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
275
276 /* Journaled updates */
277 #define JOURNAL_REPLAY(s) ((s) == NULL)
278 #define JOURNAL_BASE "rrd.journal"
279 static journal_set *journal_cur = NULL;
280 static journal_set *journal_old = NULL;
281 static char *journal_dir = NULL;
282 static FILE *journal_fh = NULL;         /* current journal file handle */
283 static long  journal_size = 0;          /* current journal size */
284 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
285 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
286 static int journal_write(char *cmd, char *args);
287 static void journal_done(void);
288 static void journal_rotate(void);
289
290 /* prototypes for forward refernces */
291 static int handle_request_help (HANDLER_PROTO);
292
293 /* 
294  * Functions
295  */
296 static void sig_common (const char *sig) /* {{{ */
297 {
298   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
299   state = FLUSHING;
300   pthread_cond_broadcast(&flush_cond);
301   pthread_cond_broadcast(&queue_cond);
302 } /* }}} void sig_common */
303
304 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
305 {
306   sig_common("INT");
307 } /* }}} void sig_int_handler */
308
309 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
310 {
311   sig_common("TERM");
312 } /* }}} void sig_term_handler */
313
314 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
315 {
316   config_flush_at_shutdown = 1;
317   sig_common("USR1");
318 } /* }}} void sig_usr1_handler */
319
320 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
321 {
322   config_flush_at_shutdown = 0;
323   sig_common("USR2");
324 } /* }}} void sig_usr2_handler */
325
326 static void install_signal_handlers(void) /* {{{ */
327 {
328   /* These structures are static, because `sigaction' behaves weird if the are
329    * overwritten.. */
330   static struct sigaction sa_int;
331   static struct sigaction sa_term;
332   static struct sigaction sa_pipe;
333   static struct sigaction sa_usr1;
334   static struct sigaction sa_usr2;
335
336   /* Install signal handlers */
337   memset (&sa_int, 0, sizeof (sa_int));
338   sa_int.sa_handler = sig_int_handler;
339   sigaction (SIGINT, &sa_int, NULL);
340
341   memset (&sa_term, 0, sizeof (sa_term));
342   sa_term.sa_handler = sig_term_handler;
343   sigaction (SIGTERM, &sa_term, NULL);
344
345   memset (&sa_pipe, 0, sizeof (sa_pipe));
346   sa_pipe.sa_handler = SIG_IGN;
347   sigaction (SIGPIPE, &sa_pipe, NULL);
348
349   memset (&sa_pipe, 0, sizeof (sa_usr1));
350   sa_usr1.sa_handler = sig_usr1_handler;
351   sigaction (SIGUSR1, &sa_usr1, NULL);
352
353   memset (&sa_usr2, 0, sizeof (sa_usr2));
354   sa_usr2.sa_handler = sig_usr2_handler;
355   sigaction (SIGUSR2, &sa_usr2, NULL);
356
357 } /* }}} void install_signal_handlers */
358
359 static int open_pidfile(char *action, int oflag) /* {{{ */
360 {
361   int fd;
362   const char *file;
363   char *file_copy, *dir;
364
365   file = (config_pid_file != NULL)
366     ? config_pid_file
367     : LOCALSTATEDIR "/run/rrdcached.pid";
368
369   /* dirname may modify its argument */
370   file_copy = strdup(file);
371   if (file_copy == NULL)
372   {
373     fprintf(stderr, "rrdcached: strdup(): %s\n",
374         rrd_strerror(errno));
375     return -1;
376   }
377
378   dir = dirname(file_copy);
379   if (rrd_mkdir_p(dir, 0777) != 0)
380   {
381     fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
382         dir, rrd_strerror(errno));
383     return -1;
384   }
385
386   free(file_copy);
387
388   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
389   if (fd < 0)
390     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
391             action, file, rrd_strerror(errno));
392
393   return(fd);
394 } /* }}} static int open_pidfile */
395
396 /* check existing pid file to see whether a daemon is running */
397 static int check_pidfile(void)
398 {
399   int pid_fd;
400   pid_t pid;
401   char pid_str[16];
402
403   pid_fd = open_pidfile("open", O_RDWR);
404   if (pid_fd < 0)
405     return pid_fd;
406
407   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
408     return -1;
409
410   pid = atoi(pid_str);
411   if (pid <= 0)
412     return -1;
413
414   /* another running process that we can signal COULD be
415    * a competing rrdcached */
416   if (pid != getpid() && kill(pid, 0) == 0)
417   {
418     fprintf(stderr,
419             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
420     close(pid_fd);
421     return -1;
422   }
423
424   lseek(pid_fd, 0, SEEK_SET);
425   if (ftruncate(pid_fd, 0) == -1)
426   {
427     fprintf(stderr,
428             "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
429     close(pid_fd);
430     return -1;
431   }
432
433   fprintf(stderr,
434           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
435           "rrdcached: starting normally.\n", pid);
436
437   return pid_fd;
438 } /* }}} static int check_pidfile */
439
440 static int write_pidfile (int fd) /* {{{ */
441 {
442   pid_t pid;
443   FILE *fh;
444
445   pid = getpid ();
446
447   fh = fdopen (fd, "w");
448   if (fh == NULL)
449   {
450     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
451     close(fd);
452     return (-1);
453   }
454
455   fprintf (fh, "%i\n", (int) pid);
456   fclose (fh);
457
458   return (0);
459 } /* }}} int write_pidfile */
460
461 static int remove_pidfile (void) /* {{{ */
462 {
463   char *file;
464   int status;
465
466   file = (config_pid_file != NULL)
467     ? config_pid_file
468     : LOCALSTATEDIR "/run/rrdcached.pid";
469
470   status = unlink (file);
471   if (status == 0)
472     return (0);
473   return (errno);
474 } /* }}} int remove_pidfile */
475
476 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
477 {
478   char *eol;
479
480   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
481                sock->next_read - sock->next_cmd);
482
483   if (eol == NULL)
484   {
485     /* no commands left, move remainder back to front of rbuf */
486     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
487             sock->next_read - sock->next_cmd);
488     sock->next_read -= sock->next_cmd;
489     sock->next_cmd = 0;
490     *len = 0;
491     return NULL;
492   }
493   else
494   {
495     char *cmd = sock->rbuf + sock->next_cmd;
496     *eol = '\0';
497
498     sock->next_cmd = eol - sock->rbuf + 1;
499
500     if (eol > sock->rbuf && *(eol-1) == '\r')
501       *(--eol) = '\0'; /* handle "\r\n" EOL */
502
503     *len = eol - cmd;
504
505     return cmd;
506   }
507
508   /* NOTREACHED */
509   assert(1==0);
510 } /* }}} char *next_cmd */
511
512 /* add the characters directly to the write buffer */
513 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
514 {
515   char *new_buf;
516
517   assert(sock != NULL);
518
519   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
520   if (new_buf == NULL)
521   {
522     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
523     return -1;
524   }
525
526   strncpy(new_buf + sock->wbuf_len, str, len + 1);
527
528   sock->wbuf = new_buf;
529   sock->wbuf_len += len;
530
531   return 0;
532 } /* }}} static int add_to_wbuf */
533
534 /* add the text to the "extra" info that's sent after the status line */
535 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
536 {
537   va_list argp;
538   char buffer[CMD_MAX];
539   int len;
540
541   if (JOURNAL_REPLAY(sock)) return 0;
542   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
543
544   va_start(argp, fmt);
545 #ifdef HAVE_VSNPRINTF
546   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
547 #else
548   len = vsprintf(buffer, fmt, argp);
549 #endif
550   va_end(argp);
551   if (len < 0)
552   {
553     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
554     return -1;
555   }
556
557   return add_to_wbuf(sock, buffer, len);
558 } /* }}} static int add_response_info */
559
560 static int count_lines(char *str) /* {{{ */
561 {
562   int lines = 0;
563
564   if (str != NULL)
565   {
566     while ((str = strchr(str, '\n')) != NULL)
567     {
568       ++lines;
569       ++str;
570     }
571   }
572
573   return lines;
574 } /* }}} static int count_lines */
575
576 /* send the response back to the user.
577  * returns 0 on success, -1 on error
578  * write buffer is always zeroed after this call */
579 static int send_response (listen_socket_t *sock, response_code rc,
580                           char *fmt, ...) /* {{{ */
581 {
582   va_list argp;
583   char buffer[CMD_MAX];
584   int lines;
585   ssize_t wrote;
586   int rclen, len;
587
588   if (JOURNAL_REPLAY(sock)) return rc;
589
590   if (sock->batch_start)
591   {
592     if (rc == RESP_OK)
593       return rc; /* no response on success during BATCH */
594     lines = sock->batch_cmd;
595   }
596   else if (rc == RESP_OK)
597     lines = count_lines(sock->wbuf);
598   else
599     lines = -1;
600
601   rclen = sprintf(buffer, "%d ", lines);
602   va_start(argp, fmt);
603 #ifdef HAVE_VSNPRINTF
604   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
605 #else
606   len = vsprintf(buffer+rclen, fmt, argp);
607 #endif
608   va_end(argp);
609   if (len < 0)
610     return -1;
611
612   len += rclen;
613
614   /* append the result to the wbuf, don't write to the user */
615   if (sock->batch_start)
616     return add_to_wbuf(sock, buffer, len);
617
618   /* first write must be complete */
619   if (len != write(sock->fd, buffer, len))
620   {
621     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
622     return -1;
623   }
624
625   if (sock->wbuf != NULL && rc == RESP_OK)
626   {
627     wrote = 0;
628     while (wrote < sock->wbuf_len)
629     {
630       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
631       if (wb <= 0)
632       {
633         RRDD_LOG(LOG_INFO, "send_response: could not write results");
634         return -1;
635       }
636       wrote += wb;
637     }
638   }
639
640   free(sock->wbuf); sock->wbuf = NULL;
641   sock->wbuf_len = 0;
642
643   return 0;
644 } /* }}} */
645
646 static void wipe_ci_values(cache_item_t *ci, time_t when)
647 {
648   ci->values = NULL;
649   ci->values_num = 0;
650
651   ci->last_flush_time = when;
652   if (config_write_jitter > 0)
653     ci->last_flush_time += (rrd_random() % config_write_jitter);
654 }
655
656 /* remove_from_queue
657  * remove a "cache_item_t" item from the queue.
658  * must hold 'cache_lock' when calling this
659  */
660 static void remove_from_queue(cache_item_t *ci) /* {{{ */
661 {
662   if (ci == NULL) return;
663   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
664
665   if (ci->prev == NULL)
666     cache_queue_head = ci->next; /* reset head */
667   else
668     ci->prev->next = ci->next;
669
670   if (ci->next == NULL)
671     cache_queue_tail = ci->prev; /* reset the tail */
672   else
673     ci->next->prev = ci->prev;
674
675   ci->next = ci->prev = NULL;
676   ci->flags &= ~CI_FLAGS_IN_QUEUE;
677
678   pthread_mutex_lock (&stats_lock);
679   assert (stats_queue_length > 0);
680   stats_queue_length--;
681   pthread_mutex_unlock (&stats_lock);
682
683 } /* }}} static void remove_from_queue */
684
685 /* free the resources associated with the cache_item_t
686  * must hold cache_lock when calling this function
687  */
688 static void *free_cache_item(cache_item_t *ci) /* {{{ */
689 {
690   if (ci == NULL) return NULL;
691
692   remove_from_queue(ci);
693
694   for (size_t i=0; i < ci->values_num; i++)
695     free(ci->values[i]);
696
697   free (ci->values);
698   free (ci->file);
699
700   /* in case anyone is waiting */
701   pthread_cond_broadcast(&ci->flushed);
702   pthread_cond_destroy(&ci->flushed);
703
704   free (ci);
705
706   return NULL;
707 } /* }}} static void *free_cache_item */
708
709 /*
710  * enqueue_cache_item:
711  * `cache_lock' must be acquired before calling this function!
712  */
713 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
714     queue_side_t side)
715 {
716   if (ci == NULL)
717     return (-1);
718
719   if (ci->values_num == 0)
720     return (0);
721
722   if (side == HEAD)
723   {
724     if (cache_queue_head == ci)
725       return 0;
726
727     /* remove if further down in queue */
728     remove_from_queue(ci);
729
730     ci->prev = NULL;
731     ci->next = cache_queue_head;
732     if (ci->next != NULL)
733       ci->next->prev = ci;
734     cache_queue_head = ci;
735
736     if (cache_queue_tail == NULL)
737       cache_queue_tail = cache_queue_head;
738   }
739   else /* (side == TAIL) */
740   {
741     /* We don't move values back in the list.. */
742     if (ci->flags & CI_FLAGS_IN_QUEUE)
743       return (0);
744
745     assert (ci->next == NULL);
746     assert (ci->prev == NULL);
747
748     ci->prev = cache_queue_tail;
749
750     if (cache_queue_tail == NULL)
751       cache_queue_head = ci;
752     else
753       cache_queue_tail->next = ci;
754
755     cache_queue_tail = ci;
756   }
757
758   ci->flags |= CI_FLAGS_IN_QUEUE;
759
760   pthread_cond_signal(&queue_cond);
761   pthread_mutex_lock (&stats_lock);
762   stats_queue_length++;
763   pthread_mutex_unlock (&stats_lock);
764
765   return (0);
766 } /* }}} int enqueue_cache_item */
767
768 /*
769  * tree_callback_flush:
770  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
771  * while this is in progress.
772  */
773 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
774     gpointer data)
775 {
776   cache_item_t *ci;
777   callback_flush_data_t *cfd;
778
779   ci = (cache_item_t *) value;
780   cfd = (callback_flush_data_t *) data;
781
782   if (ci->flags & CI_FLAGS_IN_QUEUE)
783     return FALSE;
784
785   if (ci->values_num > 0
786       && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
787   {
788     enqueue_cache_item (ci, TAIL);
789   }
790   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
791       && (ci->values_num <= 0))
792   {
793     assert ((char *) key == ci->file);
794     if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
795     {
796       RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
797       return (FALSE);
798     }
799   }
800
801   return (FALSE);
802 } /* }}} gboolean tree_callback_flush */
803
804 static int flush_old_values (int max_age)
805 {
806   callback_flush_data_t cfd;
807   size_t k;
808
809   memset (&cfd, 0, sizeof (cfd));
810   /* Pass the current time as user data so that we don't need to call
811    * `time' for each node. */
812   cfd.now = time (NULL);
813   cfd.keys = NULL;
814   cfd.keys_num = 0;
815
816   if (max_age > 0)
817     cfd.abs_timeout = cfd.now - max_age;
818   else
819     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
820
821   /* `tree_callback_flush' will return the keys of all values that haven't
822    * been touched in the last `config_flush_interval' seconds in `cfd'.
823    * The char*'s in this array point to the same memory as ci->file, so we
824    * don't need to free them separately. */
825   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
826
827   for (k = 0; k < cfd.keys_num; k++)
828   {
829     gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
830     /* should never fail, since we have held the cache_lock
831      * the entire time */
832     assert(status == TRUE);
833   }
834
835   if (cfd.keys != NULL)
836   {
837     free (cfd.keys);
838     cfd.keys = NULL;
839   }
840
841   return (0);
842 } /* int flush_old_values */
843
844 static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
845 {
846   struct timeval now;
847   struct timespec next_flush;
848   int status;
849
850   gettimeofday (&now, NULL);
851   next_flush.tv_sec = now.tv_sec + config_flush_interval;
852   next_flush.tv_nsec = 1000 * now.tv_usec;
853
854   pthread_mutex_lock(&cache_lock);
855
856   while (state == RUNNING)
857   {
858     gettimeofday (&now, NULL);
859     if ((now.tv_sec > next_flush.tv_sec)
860         || ((now.tv_sec == next_flush.tv_sec)
861           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
862     {
863       RRDD_LOG(LOG_DEBUG, "flushing old values");
864
865       /* Determine the time of the next cache flush. */
866       next_flush.tv_sec = now.tv_sec + config_flush_interval;
867
868       /* Flush all values that haven't been written in the last
869        * `config_write_interval' seconds. */
870       flush_old_values (config_write_interval);
871
872       /* unlock the cache while we rotate so we don't block incoming
873        * updates if the fsync() blocks on disk I/O */
874       pthread_mutex_unlock(&cache_lock);
875       journal_rotate();
876       pthread_mutex_lock(&cache_lock);
877     }
878
879     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
880     if (status != 0 && status != ETIMEDOUT)
881     {
882       RRDD_LOG (LOG_ERR, "flush_thread_main: "
883                 "pthread_cond_timedwait returned %i.", status);
884     }
885   }
886
887   if (config_flush_at_shutdown)
888     flush_old_values (-1); /* flush everything */
889
890   state = SHUTDOWN;
891
892   pthread_mutex_unlock(&cache_lock);
893
894   return NULL;
895 } /* void *flush_thread_main */
896
897 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
898 {
899   pthread_mutex_lock (&cache_lock);
900
901   while (state != SHUTDOWN
902          || (cache_queue_head != NULL && config_flush_at_shutdown))
903   {
904     cache_item_t *ci;
905     char *file;
906     char **values;
907     size_t values_num;
908     int status;
909
910     /* Now, check if there's something to store away. If not, wait until
911      * something comes in. */
912     if (cache_queue_head == NULL)
913     {
914       status = pthread_cond_wait (&queue_cond, &cache_lock);
915       if ((status != 0) && (status != ETIMEDOUT))
916       {
917         RRDD_LOG (LOG_ERR, "queue_thread_main: "
918             "pthread_cond_wait returned %i.", status);
919       }
920     }
921
922     /* Check if a value has arrived. This may be NULL if we timed out or there
923      * was an interrupt such as a signal. */
924     if (cache_queue_head == NULL)
925       continue;
926
927     ci = cache_queue_head;
928
929     /* copy the relevant parts */
930     file = strdup (ci->file);
931     if (file == NULL)
932     {
933       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
934       continue;
935     }
936
937     assert(ci->values != NULL);
938     assert(ci->values_num > 0);
939
940     values = ci->values;
941     values_num = ci->values_num;
942
943     wipe_ci_values(ci, time(NULL));
944     remove_from_queue(ci);
945
946     pthread_mutex_unlock (&cache_lock);
947
948     rrd_clear_error ();
949     status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
950     if (status != 0)
951     {
952       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
953           "rrd_update_r (%s) failed with status %i. (%s)",
954           file, status, rrd_get_error());
955     }
956
957     journal_write("wrote", file);
958
959     /* Search again in the tree.  It's possible someone issued a "FORGET"
960      * while we were writing the update values. */
961     pthread_mutex_lock(&cache_lock);
962     ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
963     if (ci)
964       pthread_cond_broadcast(&ci->flushed);
965     pthread_mutex_unlock(&cache_lock);
966
967     if (status == 0)
968     {
969       pthread_mutex_lock (&stats_lock);
970       stats_updates_written++;
971       stats_data_sets_written += values_num;
972       pthread_mutex_unlock (&stats_lock);
973     }
974
975     rrd_free_ptrs((void ***) &values, &values_num);
976     free(file);
977
978     pthread_mutex_lock (&cache_lock);
979   }
980   pthread_mutex_unlock (&cache_lock);
981
982   return (NULL);
983 } /* }}} void *queue_thread_main */
984
985 static int buffer_get_field (char **buffer_ret, /* {{{ */
986     size_t *buffer_size_ret, char **field_ret)
987 {
988   char *buffer;
989   size_t buffer_pos;
990   size_t buffer_size;
991   char *field;
992   size_t field_size;
993   int status;
994
995   buffer = *buffer_ret;
996   buffer_pos = 0;
997   buffer_size = *buffer_size_ret;
998   field = *buffer_ret;
999   field_size = 0;
1000
1001   if (buffer_size <= 0)
1002     return (-1);
1003
1004   /* This is ensured by `handle_request'. */
1005   assert (buffer[buffer_size - 1] == '\0');
1006
1007   status = -1;
1008   while (buffer_pos < buffer_size)
1009   {
1010     /* Check for end-of-field or end-of-buffer */
1011     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1012     {
1013       field[field_size] = 0;
1014       field_size++;
1015       buffer_pos++;
1016       status = 0;
1017       break;
1018     }
1019     /* Handle escaped characters. */
1020     else if (buffer[buffer_pos] == '\\')
1021     {
1022       if (buffer_pos >= (buffer_size - 1))
1023         break;
1024       buffer_pos++;
1025       field[field_size] = buffer[buffer_pos];
1026       field_size++;
1027       buffer_pos++;
1028     }
1029     /* Normal operation */ 
1030     else
1031     {
1032       field[field_size] = buffer[buffer_pos];
1033       field_size++;
1034       buffer_pos++;
1035     }
1036   } /* while (buffer_pos < buffer_size) */
1037
1038   if (status != 0)
1039     return (status);
1040
1041   *buffer_ret = buffer + buffer_pos;
1042   *buffer_size_ret = buffer_size - buffer_pos;
1043   *field_ret = field;
1044
1045   return (0);
1046 } /* }}} int buffer_get_field */
1047
1048 /* if we're restricting writes to the base directory,
1049  * check whether the file falls within the dir
1050  * returns 1 if OK, otherwise 0
1051  */
1052 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1053 {
1054   assert(file != NULL);
1055
1056   if (!config_write_base_only
1057       || JOURNAL_REPLAY(sock)
1058       || config_base_dir == NULL)
1059     return 1;
1060
1061   if (strstr(file, "../") != NULL) goto err;
1062
1063   /* relative paths without "../" are ok */
1064   if (*file != '/') return 1;
1065
1066   /* file must be of the format base + "/" + <1+ char filename> */
1067   if (strlen(file) < _config_base_dir_len + 2) goto err;
1068   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1069   if (*(file + _config_base_dir_len) != '/') goto err;
1070
1071   return 1;
1072
1073 err:
1074   if (sock != NULL && sock->fd >= 0)
1075     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1076
1077   return 0;
1078 } /* }}} static int check_file_access */
1079
1080 /* when using a base dir, convert relative paths to absolute paths.
1081  * if necessary, modifies the "filename" pointer to point
1082  * to the new path created in "tmp".  "tmp" is provided
1083  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1084  *
1085  * this allows us to optimize for the expected case (absolute path)
1086  * with a no-op.
1087  */
1088 static void get_abs_path(char **filename, char *tmp)
1089 {
1090   assert(tmp != NULL);
1091   assert(filename != NULL && *filename != NULL);
1092
1093   if (config_base_dir == NULL || **filename == '/')
1094     return;
1095
1096   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1097   *filename = tmp;
1098 } /* }}} static int get_abs_path */
1099
1100 static int flush_file (const char *filename) /* {{{ */
1101 {
1102   cache_item_t *ci;
1103
1104   pthread_mutex_lock (&cache_lock);
1105
1106   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1107   if (ci == NULL)
1108   {
1109     pthread_mutex_unlock (&cache_lock);
1110     return (ENOENT);
1111   }
1112
1113   if (ci->values_num > 0)
1114   {
1115     /* Enqueue at head */
1116     enqueue_cache_item (ci, HEAD);
1117     pthread_cond_wait(&ci->flushed, &cache_lock);
1118   }
1119
1120   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1121    * may have been purged during our cond_wait() */
1122
1123   pthread_mutex_unlock(&cache_lock);
1124
1125   return (0);
1126 } /* }}} int flush_file */
1127
1128 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1129 {
1130   char *err = "Syntax error.\n";
1131
1132   if (cmd && cmd->syntax)
1133     err = cmd->syntax;
1134
1135   return send_response(sock, RESP_ERR, "Usage: %s", err);
1136 } /* }}} static int syntax_error() */
1137
1138 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1139 {
1140   uint64_t copy_queue_length;
1141   uint64_t copy_updates_received;
1142   uint64_t copy_flush_received;
1143   uint64_t copy_updates_written;
1144   uint64_t copy_data_sets_written;
1145   uint64_t copy_journal_bytes;
1146   uint64_t copy_journal_rotate;
1147
1148   uint64_t tree_nodes_number;
1149   uint64_t tree_depth;
1150
1151   pthread_mutex_lock (&stats_lock);
1152   copy_queue_length       = stats_queue_length;
1153   copy_updates_received   = stats_updates_received;
1154   copy_flush_received     = stats_flush_received;
1155   copy_updates_written    = stats_updates_written;
1156   copy_data_sets_written  = stats_data_sets_written;
1157   copy_journal_bytes      = stats_journal_bytes;
1158   copy_journal_rotate     = stats_journal_rotate;
1159   pthread_mutex_unlock (&stats_lock);
1160
1161   pthread_mutex_lock (&cache_lock);
1162   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1163   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1164   pthread_mutex_unlock (&cache_lock);
1165
1166   add_response_info(sock,
1167                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1168   add_response_info(sock,
1169                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1170   add_response_info(sock,
1171                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1172   add_response_info(sock,
1173                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1174   add_response_info(sock,
1175                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1176   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1177   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1178   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1179   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1180
1181   send_response(sock, RESP_OK, "Statistics follow\n");
1182
1183   return (0);
1184 } /* }}} int handle_request_stats */
1185
1186 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1187 {
1188   char *file, file_tmp[PATH_MAX];
1189   int status;
1190
1191   status = buffer_get_field (&buffer, &buffer_size, &file);
1192   if (status != 0)
1193   {
1194     return syntax_error(sock,cmd);
1195   }
1196   else
1197   {
1198     pthread_mutex_lock(&stats_lock);
1199     stats_flush_received++;
1200     pthread_mutex_unlock(&stats_lock);
1201
1202     get_abs_path(&file, file_tmp);
1203     if (!check_file_access(file, sock)) return 0;
1204
1205     status = flush_file (file);
1206     if (status == 0)
1207       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1208     else if (status == ENOENT)
1209     {
1210       /* no file in our tree; see whether it exists at all */
1211       struct stat statbuf;
1212
1213       memset(&statbuf, 0, sizeof(statbuf));
1214       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1215         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1216       else
1217         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1218     }
1219     else if (status < 0)
1220       return send_response(sock, RESP_ERR, "Internal error.\n");
1221     else
1222       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1223   }
1224
1225   /* NOTREACHED */
1226   assert(1==0);
1227 } /* }}} int handle_request_flush */
1228
1229 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1230 {
1231   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1232
1233   pthread_mutex_lock(&cache_lock);
1234   flush_old_values(-1);
1235   pthread_mutex_unlock(&cache_lock);
1236
1237   return send_response(sock, RESP_OK, "Started flush.\n");
1238 } /* }}} static int handle_request_flushall */
1239
1240 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1241 {
1242   int status;
1243   char *file, file_tmp[PATH_MAX];
1244   cache_item_t *ci;
1245
1246   status = buffer_get_field(&buffer, &buffer_size, &file);
1247   if (status != 0)
1248     return syntax_error(sock,cmd);
1249
1250   get_abs_path(&file, file_tmp);
1251
1252   pthread_mutex_lock(&cache_lock);
1253   ci = g_tree_lookup(cache_tree, file);
1254   if (ci == NULL)
1255   {
1256     pthread_mutex_unlock(&cache_lock);
1257     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1258   }
1259
1260   for (size_t i=0; i < ci->values_num; i++)
1261     add_response_info(sock, "%s\n", ci->values[i]);
1262
1263   pthread_mutex_unlock(&cache_lock);
1264   return send_response(sock, RESP_OK, "updates pending\n");
1265 } /* }}} static int handle_request_pending */
1266
1267 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1268 {
1269   int status;
1270   gboolean found;
1271   char *file, file_tmp[PATH_MAX];
1272
1273   status = buffer_get_field(&buffer, &buffer_size, &file);
1274   if (status != 0)
1275     return syntax_error(sock,cmd);
1276
1277   get_abs_path(&file, file_tmp);
1278   if (!check_file_access(file, sock)) return 0;
1279
1280   pthread_mutex_lock(&cache_lock);
1281   found = g_tree_remove(cache_tree, file);
1282   pthread_mutex_unlock(&cache_lock);
1283
1284   if (found == TRUE)
1285   {
1286     if (!JOURNAL_REPLAY(sock))
1287       journal_write("forget", file);
1288
1289     return send_response(sock, RESP_OK, "Gone!\n");
1290   }
1291   else
1292     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1293
1294   /* NOTREACHED */
1295   assert(1==0);
1296 } /* }}} static int handle_request_forget */
1297
1298 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1299 {
1300   cache_item_t *ci;
1301
1302   pthread_mutex_lock(&cache_lock);
1303
1304   ci = cache_queue_head;
1305   while (ci != NULL)
1306   {
1307     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1308     ci = ci->next;
1309   }
1310
1311   pthread_mutex_unlock(&cache_lock);
1312
1313   return send_response(sock, RESP_OK, "in queue.\n");
1314 } /* }}} int handle_request_queue */
1315
1316 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1317 {
1318   char *file, file_tmp[PATH_MAX];
1319   int values_num = 0;
1320   int status;
1321   char orig_buf[CMD_MAX];
1322
1323   cache_item_t *ci;
1324
1325   /* save it for the journal later */
1326   if (!JOURNAL_REPLAY(sock))
1327     strncpy(orig_buf, buffer, buffer_size);
1328
1329   status = buffer_get_field (&buffer, &buffer_size, &file);
1330   if (status != 0)
1331     return syntax_error(sock,cmd);
1332
1333   pthread_mutex_lock(&stats_lock);
1334   stats_updates_received++;
1335   pthread_mutex_unlock(&stats_lock);
1336
1337   get_abs_path(&file, file_tmp);
1338   if (!check_file_access(file, sock)) return 0;
1339
1340   pthread_mutex_lock (&cache_lock);
1341   ci = g_tree_lookup (cache_tree, file);
1342
1343   if (ci == NULL) /* {{{ */
1344   {
1345     struct stat statbuf;
1346     cache_item_t *tmp;
1347
1348     /* don't hold the lock while we setup; stat(2) might block */
1349     pthread_mutex_unlock(&cache_lock);
1350
1351     memset (&statbuf, 0, sizeof (statbuf));
1352     status = stat (file, &statbuf);
1353     if (status != 0)
1354     {
1355       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1356
1357       status = errno;
1358       if (status == ENOENT)
1359         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1360       else
1361         return send_response(sock, RESP_ERR,
1362                              "stat failed with error %i.\n", status);
1363     }
1364     if (!S_ISREG (statbuf.st_mode))
1365       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1366
1367     if (access(file, R_OK|W_OK) != 0)
1368       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1369                            file, rrd_strerror(errno));
1370
1371     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1372     if (ci == NULL)
1373     {
1374       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1375
1376       return send_response(sock, RESP_ERR, "malloc failed.\n");
1377     }
1378     memset (ci, 0, sizeof (cache_item_t));
1379
1380     ci->file = strdup (file);
1381     if (ci->file == NULL)
1382     {
1383       free (ci);
1384       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1385
1386       return send_response(sock, RESP_ERR, "strdup failed.\n");
1387     }
1388
1389     wipe_ci_values(ci, now);
1390     ci->flags = CI_FLAGS_IN_TREE;
1391     pthread_cond_init(&ci->flushed, NULL);
1392
1393     pthread_mutex_lock(&cache_lock);
1394
1395     /* another UPDATE might have added this entry in the meantime */
1396     tmp = g_tree_lookup (cache_tree, file);
1397     if (tmp == NULL)
1398       g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1399     else
1400     {
1401       free_cache_item (ci);
1402       ci = tmp;
1403     }
1404
1405     /* state may have changed while we were unlocked */
1406     if (state == SHUTDOWN)
1407       return -1;
1408   } /* }}} */
1409   assert (ci != NULL);
1410
1411   /* don't re-write updates in replay mode */
1412   if (!JOURNAL_REPLAY(sock))
1413     journal_write("update", orig_buf);
1414
1415   while (buffer_size > 0)
1416   {
1417     char *value;
1418     time_t stamp;
1419     char *eostamp;
1420
1421     status = buffer_get_field (&buffer, &buffer_size, &value);
1422     if (status != 0)
1423     {
1424       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1425       break;
1426     }
1427
1428     /* make sure update time is always moving forward */
1429     stamp = strtol(value, &eostamp, 10);
1430     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1431     {
1432       pthread_mutex_unlock(&cache_lock);
1433       return send_response(sock, RESP_ERR,
1434                            "Cannot find timestamp in '%s'!\n", value);
1435     }
1436     else if (stamp <= ci->last_update_stamp)
1437     {
1438       pthread_mutex_unlock(&cache_lock);
1439       return send_response(sock, RESP_ERR,
1440                            "illegal attempt to update using time %ld when last"
1441                            " update time is %ld (minimum one second step)\n",
1442                            stamp, ci->last_update_stamp);
1443     }
1444     else
1445       ci->last_update_stamp = stamp;
1446
1447     if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
1448     {
1449       RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1450       continue;
1451     }
1452
1453     values_num++;
1454   }
1455
1456   if (((now - ci->last_flush_time) >= config_write_interval)
1457       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1458       && (ci->values_num > 0))
1459   {
1460     enqueue_cache_item (ci, TAIL);
1461   }
1462
1463   pthread_mutex_unlock (&cache_lock);
1464
1465   if (values_num < 1)
1466     return send_response(sock, RESP_ERR, "No values updated.\n");
1467   else
1468     return send_response(sock, RESP_OK,
1469                          "errors, enqueued %i value(s).\n", values_num);
1470
1471   /* NOTREACHED */
1472   assert(1==0);
1473
1474 } /* }}} int handle_request_update */
1475
1476 /* we came across a "WROTE" entry during journal replay.
1477  * throw away any values that we have accumulated for this file
1478  */
1479 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1480 {
1481   cache_item_t *ci;
1482   const char *file = buffer;
1483
1484   pthread_mutex_lock(&cache_lock);
1485
1486   ci = g_tree_lookup(cache_tree, file);
1487   if (ci == NULL)
1488   {
1489     pthread_mutex_unlock(&cache_lock);
1490     return (0);
1491   }
1492
1493   if (ci->values)
1494     rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1495
1496   wipe_ci_values(ci, now);
1497   remove_from_queue(ci);
1498
1499   pthread_mutex_unlock(&cache_lock);
1500   return (0);
1501 } /* }}} int handle_request_wrote */
1502
1503 /* start "BATCH" processing */
1504 static int batch_start (HANDLER_PROTO) /* {{{ */
1505 {
1506   int status;
1507   if (sock->batch_start)
1508     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1509
1510   status = send_response(sock, RESP_OK,
1511                          "Go ahead.  End with dot '.' on its own line.\n");
1512   sock->batch_start = time(NULL);
1513   sock->batch_cmd = 0;
1514
1515   return status;
1516 } /* }}} static int batch_start */
1517
1518 /* finish "BATCH" processing and return results to the client */
1519 static int batch_done (HANDLER_PROTO) /* {{{ */
1520 {
1521   assert(sock->batch_start);
1522   sock->batch_start = 0;
1523   sock->batch_cmd  = 0;
1524   return send_response(sock, RESP_OK, "errors\n");
1525 } /* }}} static int batch_done */
1526
1527 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1528 {
1529   return -1;
1530 } /* }}} static int handle_request_quit */
1531
1532 static command_t list_of_commands[] = { /* {{{ */
1533   {
1534     "UPDATE",
1535     handle_request_update,
1536     CMD_CONTEXT_ANY,
1537     "UPDATE <filename> <values> [<values> ...]\n"
1538     ,
1539     "Adds the given file to the internal cache if it is not yet known and\n"
1540     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1541     "for details.\n"
1542     "\n"
1543     "Each <values> has the following form:\n"
1544     "  <values> = <time>:<value>[:<value>[...]]\n"
1545     "See the rrdupdate(1) manpage for details.\n"
1546   },
1547   {
1548     "WROTE",
1549     handle_request_wrote,
1550     CMD_CONTEXT_JOURNAL,
1551     NULL,
1552     NULL
1553   },
1554   {
1555     "FLUSH",
1556     handle_request_flush,
1557     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1558     "FLUSH <filename>\n"
1559     ,
1560     "Adds the given filename to the head of the update queue and returns\n"
1561     "after it has been dequeued.\n"
1562   },
1563   {
1564     "FLUSHALL",
1565     handle_request_flushall,
1566     CMD_CONTEXT_CLIENT,
1567     "FLUSHALL\n"
1568     ,
1569     "Triggers writing of all pending updates.  Returns immediately.\n"
1570   },
1571   {
1572     "PENDING",
1573     handle_request_pending,
1574     CMD_CONTEXT_CLIENT,
1575     "PENDING <filename>\n"
1576     ,
1577     "Shows any 'pending' updates for a file, in order.\n"
1578     "The updates shown have not yet been written to the underlying RRD file.\n"
1579   },
1580   {
1581     "FORGET",
1582     handle_request_forget,
1583     CMD_CONTEXT_ANY,
1584     "FORGET <filename>\n"
1585     ,
1586     "Removes the file completely from the cache.\n"
1587     "Any pending updates for the file will be lost.\n"
1588   },
1589   {
1590     "QUEUE",
1591     handle_request_queue,
1592     CMD_CONTEXT_CLIENT,
1593     "QUEUE\n"
1594     ,
1595         "Shows all files in the output queue.\n"
1596     "The output is zero or more lines in the following format:\n"
1597     "(where <num_vals> is the number of values to be written)\n"
1598     "\n"
1599     "<num_vals> <filename>\n"
1600   },
1601   {
1602     "STATS",
1603     handle_request_stats,
1604     CMD_CONTEXT_CLIENT,
1605     "STATS\n"
1606     ,
1607     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1608     "a description of the values.\n"
1609   },
1610   {
1611     "HELP",
1612     handle_request_help,
1613     CMD_CONTEXT_CLIENT,
1614     "HELP [<command>]\n",
1615     NULL, /* special! */
1616   },
1617   {
1618     "BATCH",
1619     batch_start,
1620     CMD_CONTEXT_CLIENT,
1621     "BATCH\n"
1622     ,
1623     "The 'BATCH' command permits the client to initiate a bulk load\n"
1624     "   of commands to rrdcached.\n"
1625     "\n"
1626     "Usage:\n"
1627     "\n"
1628     "    client: BATCH\n"
1629     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1630     "    client: command #1\n"
1631     "    client: command #2\n"
1632     "    client: ... and so on\n"
1633     "    client: .\n"
1634     "    server: 2 errors\n"
1635     "    server: 7 message for command #7\n"
1636     "    server: 9 message for command #9\n"
1637     "\n"
1638     "For more information, consult the rrdcached(1) documentation.\n"
1639   },
1640   {
1641     ".",   /* BATCH terminator */
1642     batch_done,
1643     CMD_CONTEXT_BATCH,
1644     NULL,
1645     NULL
1646   },
1647   {
1648     "QUIT",
1649     handle_request_quit,
1650     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1651     "QUIT\n"
1652     ,
1653     "Disconnect from rrdcached.\n"
1654   }
1655 }; /* }}} command_t list_of_commands[] */
1656 static size_t list_of_commands_len = sizeof (list_of_commands)
1657   / sizeof (list_of_commands[0]);
1658
1659 static command_t *find_command(char *cmd)
1660 {
1661   size_t i;
1662
1663   for (i = 0; i < list_of_commands_len; i++)
1664     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1665       return (&list_of_commands[i]);
1666   return NULL;
1667 }
1668
1669 /* We currently use the index in the `list_of_commands' array as a bit position
1670  * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
1671  * outside these functions so that switching to a more elegant storage method
1672  * is easily possible. */
1673 static ssize_t find_command_index (const char *cmd) /* {{{ */
1674 {
1675   size_t i;
1676
1677   for (i = 0; i < list_of_commands_len; i++)
1678     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1679       return ((ssize_t) i);
1680   return (-1);
1681 } /* }}} ssize_t find_command_index */
1682
1683 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
1684     const char *cmd)
1685 {
1686   ssize_t i;
1687
1688   if (JOURNAL_REPLAY(sock))
1689     return (1);
1690
1691   if (cmd == NULL)
1692     return (-1);
1693
1694   if ((strcasecmp ("QUIT", cmd) == 0)
1695       || (strcasecmp ("HELP", cmd) == 0))
1696     return (1);
1697   else if (strcmp (".", cmd) == 0)
1698     cmd = "BATCH";
1699
1700   i = find_command_index (cmd);
1701   if (i < 0)
1702     return (-1);
1703   assert (i < 32);
1704
1705   if ((sock->permissions & (1 << i)) != 0)
1706     return (1);
1707   return (0);
1708 } /* }}} int socket_permission_check */
1709
1710 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
1711     const char *cmd)
1712 {
1713   ssize_t i;
1714
1715   i = find_command_index (cmd);
1716   if (i < 0)
1717     return (-1);
1718   assert (i < 32);
1719
1720   sock->permissions |= (1 << i);
1721   return (0);
1722 } /* }}} int socket_permission_add */
1723
1724 /* check whether commands are received in the expected context */
1725 static int command_check_context(listen_socket_t *sock, command_t *cmd)
1726 {
1727   if (JOURNAL_REPLAY(sock))
1728     return (cmd->context & CMD_CONTEXT_JOURNAL);
1729   else if (sock->batch_start)
1730     return (cmd->context & CMD_CONTEXT_BATCH);
1731   else
1732     return (cmd->context & CMD_CONTEXT_CLIENT);
1733
1734   /* NOTREACHED */
1735   assert(1==0);
1736 }
1737
1738 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1739 {
1740   int status;
1741   char *cmd_str;
1742   char *resp_txt;
1743   command_t *help = NULL;
1744
1745   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1746   if (status == 0)
1747     help = find_command(cmd_str);
1748
1749   if (help && (help->syntax || help->help))
1750   {
1751     char tmp[CMD_MAX];
1752
1753     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1754     resp_txt = tmp;
1755
1756     if (help->syntax)
1757       add_response_info(sock, "Usage: %s\n", help->syntax);
1758
1759     if (help->help)
1760       add_response_info(sock, "%s\n", help->help);
1761   }
1762   else
1763   {
1764     size_t i;
1765
1766     resp_txt = "Command overview\n";
1767
1768     for (i = 0; i < list_of_commands_len; i++)
1769     {
1770       if (list_of_commands[i].syntax == NULL)
1771         continue;
1772       add_response_info (sock, "%s", list_of_commands[i].syntax);
1773     }
1774   }
1775
1776   return send_response(sock, RESP_OK, resp_txt);
1777 } /* }}} int handle_request_help */
1778
1779 static int handle_request (DISPATCH_PROTO) /* {{{ */
1780 {
1781   char *buffer_ptr = buffer;
1782   char *cmd_str = NULL;
1783   command_t *cmd = NULL;
1784   int status;
1785
1786   assert (buffer[buffer_size - 1] == '\0');
1787
1788   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1789   if (status != 0)
1790   {
1791     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1792     return (-1);
1793   }
1794
1795   if (sock != NULL && sock->batch_start)
1796     sock->batch_cmd++;
1797
1798   cmd = find_command(cmd_str);
1799   if (!cmd)
1800     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1801
1802   if (!socket_permission_check (sock, cmd->cmd))
1803     return send_response(sock, RESP_ERR, "Permission denied.\n");
1804
1805   if (!command_check_context(sock, cmd))
1806     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1807
1808   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1809 } /* }}} int handle_request */
1810
1811 static void journal_set_free (journal_set *js) /* {{{ */
1812 {
1813   if (js == NULL)
1814     return;
1815
1816   rrd_free_ptrs((void ***) &js->files, &js->files_num);
1817
1818   free(js);
1819 } /* }}} journal_set_free */
1820
1821 static void journal_set_remove (journal_set *js) /* {{{ */
1822 {
1823   if (js == NULL)
1824     return;
1825
1826   for (uint i=0; i < js->files_num; i++)
1827   {
1828     RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
1829     unlink(js->files[i]);
1830   }
1831 } /* }}} journal_set_remove */
1832
1833 /* close current journal file handle.
1834  * MUST hold journal_lock before calling */
1835 static void journal_close(void) /* {{{ */
1836 {
1837   if (journal_fh != NULL)
1838   {
1839     if (fclose(journal_fh) != 0)
1840       RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
1841   }
1842
1843   journal_fh = NULL;
1844   journal_size = 0;
1845 } /* }}} journal_close */
1846
1847 /* MUST hold journal_lock before calling */
1848 static void journal_new_file(void) /* {{{ */
1849 {
1850   struct timeval now;
1851   int  new_fd;
1852   char new_file[PATH_MAX + 1];
1853
1854   assert(journal_dir != NULL);
1855   assert(journal_cur != NULL);
1856
1857   journal_close();
1858
1859   gettimeofday(&now, NULL);
1860   /* this format assures that the files sort in strcmp() order */
1861   snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
1862            journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
1863
1864   new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
1865                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1866   if (new_fd < 0)
1867     goto error;
1868
1869   journal_fh = fdopen(new_fd, "a");
1870   if (journal_fh == NULL)
1871     goto error;
1872
1873   journal_size = ftell(journal_fh);
1874   RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
1875
1876   /* record the file in the journal set */
1877   rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
1878
1879   return;
1880
1881 error:
1882   RRDD_LOG(LOG_CRIT,
1883            "JOURNALING DISABLED: Error while trying to create %s : %s",
1884            new_file, rrd_strerror(errno));
1885   RRDD_LOG(LOG_CRIT,
1886            "JOURNALING DISABLED: All values will be flushed at shutdown");
1887
1888   close(new_fd);
1889   config_flush_at_shutdown = 1;
1890
1891 } /* }}} journal_new_file */
1892
1893 /* MUST NOT hold journal_lock before calling this */
1894 static void journal_rotate(void) /* {{{ */
1895 {
1896   journal_set *old_js = NULL;
1897
1898   if (journal_dir == NULL)
1899     return;
1900
1901   RRDD_LOG(LOG_DEBUG, "rotating journals");
1902
1903   pthread_mutex_lock(&stats_lock);
1904   ++stats_journal_rotate;
1905   pthread_mutex_unlock(&stats_lock);
1906
1907   pthread_mutex_lock(&journal_lock);
1908
1909   journal_close();
1910
1911   /* rotate the journal sets */
1912   old_js = journal_old;
1913   journal_old = journal_cur;
1914   journal_cur = calloc(1, sizeof(journal_set));
1915
1916   if (journal_cur != NULL)
1917     journal_new_file();
1918   else
1919     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
1920
1921   pthread_mutex_unlock(&journal_lock);
1922
1923   journal_set_remove(old_js);
1924   journal_set_free  (old_js);
1925
1926 } /* }}} static void journal_rotate */
1927
1928 /* MUST hold journal_lock when calling */
1929 static void journal_done(void) /* {{{ */
1930 {
1931   if (journal_cur == NULL)
1932     return;
1933
1934   journal_close();
1935
1936   if (config_flush_at_shutdown)
1937   {
1938     RRDD_LOG(LOG_INFO, "removing journals");
1939     journal_set_remove(journal_old);
1940     journal_set_remove(journal_cur);
1941   }
1942   else
1943   {
1944     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1945              "journals will be used at next startup");
1946   }
1947
1948   journal_set_free(journal_cur);
1949   journal_set_free(journal_old);
1950   free(journal_dir);
1951
1952 } /* }}} static void journal_done */
1953
1954 static int journal_write(char *cmd, char *args) /* {{{ */
1955 {
1956   int chars;
1957
1958   if (journal_fh == NULL)
1959     return 0;
1960
1961   pthread_mutex_lock(&journal_lock);
1962   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1963   journal_size += chars;
1964
1965   if (journal_size > JOURNAL_MAX)
1966     journal_new_file();
1967
1968   pthread_mutex_unlock(&journal_lock);
1969
1970   if (chars > 0)
1971   {
1972     pthread_mutex_lock(&stats_lock);
1973     stats_journal_bytes += chars;
1974     pthread_mutex_unlock(&stats_lock);
1975   }
1976
1977   return chars;
1978 } /* }}} static int journal_write */
1979
1980 static int journal_replay (const char *file) /* {{{ */
1981 {
1982   FILE *fh;
1983   int entry_cnt = 0;
1984   int fail_cnt = 0;
1985   uint64_t line = 0;
1986   char entry[CMD_MAX];
1987   time_t now;
1988
1989   if (file == NULL) return 0;
1990
1991   {
1992     char *reason = "unknown error";
1993     int status = 0;
1994     struct stat statbuf;
1995
1996     memset(&statbuf, 0, sizeof(statbuf));
1997     if (stat(file, &statbuf) != 0)
1998     {
1999       reason = "stat error";
2000       status = errno;
2001     }
2002     else if (!S_ISREG(statbuf.st_mode))
2003     {
2004       reason = "not a regular file";
2005       status = EPERM;
2006     }
2007     if (statbuf.st_uid != daemon_uid)
2008     {
2009       reason = "not owned by daemon user";
2010       status = EACCES;
2011     }
2012     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2013     {
2014       reason = "must not be user/group writable";
2015       status = EACCES;
2016     }
2017
2018     if (status != 0)
2019     {
2020       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2021                file, rrd_strerror(status), reason);
2022       return 0;
2023     }
2024   }
2025
2026   fh = fopen(file, "r");
2027   if (fh == NULL)
2028   {
2029     if (errno != ENOENT)
2030       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2031                file, rrd_strerror(errno));
2032     return 0;
2033   }
2034   else
2035     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2036
2037   now = time(NULL);
2038
2039   while(!feof(fh))
2040   {
2041     size_t entry_len;
2042
2043     ++line;
2044     if (fgets(entry, sizeof(entry), fh) == NULL)
2045       break;
2046     entry_len = strlen(entry);
2047
2048     /* check \n termination in case journal writing crashed mid-line */
2049     if (entry_len == 0)
2050       continue;
2051     else if (entry[entry_len - 1] != '\n')
2052     {
2053       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2054       ++fail_cnt;
2055       continue;
2056     }
2057
2058     entry[entry_len - 1] = '\0';
2059
2060     if (handle_request(NULL, now, entry, entry_len) == 0)
2061       ++entry_cnt;
2062     else
2063       ++fail_cnt;
2064   }
2065
2066   fclose(fh);
2067
2068   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2069            entry_cnt, fail_cnt);
2070
2071   return entry_cnt > 0 ? 1 : 0;
2072 } /* }}} static int journal_replay */
2073
2074 static int journal_sort(const void *v1, const void *v2)
2075 {
2076   char **jn1 = (char **) v1;
2077   char **jn2 = (char **) v2;
2078
2079   return strcmp(*jn1,*jn2);
2080 }
2081
2082 static void journal_init(void) /* {{{ */
2083 {
2084   int had_journal = 0;
2085   DIR *dir;
2086   struct dirent *dent;
2087   char path[PATH_MAX+1];
2088
2089   if (journal_dir == NULL) return;
2090
2091   pthread_mutex_lock(&journal_lock);
2092
2093   journal_cur = calloc(1, sizeof(journal_set));
2094   if (journal_cur == NULL)
2095   {
2096     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2097     return;
2098   }
2099
2100   RRDD_LOG(LOG_INFO, "checking for journal files");
2101
2102   /* Handle old journal files during transition.  This gives them the
2103    * correct sort order.  TODO: remove after first release
2104    */
2105   {
2106     char old_path[PATH_MAX+1];
2107     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2108     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2109     rename(old_path, path);
2110
2111     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE        );
2112     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2113     rename(old_path, path);
2114   }
2115
2116   dir = opendir(journal_dir);
2117   while ((dent = readdir(dir)) != NULL)
2118   {
2119     /* looks like a journal file? */
2120     if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2121       continue;
2122
2123     snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2124
2125     if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2126     {
2127       RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2128                dent->d_name);
2129       break;
2130     }
2131   }
2132   closedir(dir);
2133
2134   qsort(journal_cur->files, journal_cur->files_num,
2135         sizeof(journal_cur->files[0]), journal_sort);
2136
2137   for (uint i=0; i < journal_cur->files_num; i++)
2138     had_journal += journal_replay(journal_cur->files[i]);
2139
2140   journal_new_file();
2141
2142   /* it must have been a crash.  start a flush */
2143   if (had_journal && config_flush_at_shutdown)
2144     flush_old_values(-1);
2145
2146   pthread_mutex_unlock(&journal_lock);
2147
2148   RRDD_LOG(LOG_INFO, "journal processing complete");
2149
2150 } /* }}} static void journal_init */
2151
2152 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2153 {
2154   assert(sock != NULL);
2155
2156   free(sock->rbuf);  sock->rbuf = NULL;
2157   free(sock->wbuf);  sock->wbuf = NULL;
2158   free(sock);
2159 } /* }}} void free_listen_socket */
2160
2161 static void close_connection(listen_socket_t *sock) /* {{{ */
2162 {
2163   if (sock->fd >= 0)
2164   {
2165     close(sock->fd);
2166     sock->fd = -1;
2167   }
2168
2169   free_listen_socket(sock);
2170
2171 } /* }}} void close_connection */
2172
2173 static void *connection_thread_main (void *args) /* {{{ */
2174 {
2175   listen_socket_t *sock;
2176   int fd;
2177
2178   sock = (listen_socket_t *) args;
2179   fd = sock->fd;
2180
2181   /* init read buffers */
2182   sock->next_read = sock->next_cmd = 0;
2183   sock->rbuf = malloc(RBUF_SIZE);
2184   if (sock->rbuf == NULL)
2185   {
2186     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2187     close_connection(sock);
2188     return NULL;
2189   }
2190
2191   pthread_mutex_lock (&connection_threads_lock);
2192   connection_threads_num++;
2193   pthread_mutex_unlock (&connection_threads_lock);
2194
2195   while (state == RUNNING)
2196   {
2197     char *cmd;
2198     ssize_t cmd_len;
2199     ssize_t rbytes;
2200     time_t now;
2201
2202     struct pollfd pollfd;
2203     int status;
2204
2205     pollfd.fd = fd;
2206     pollfd.events = POLLIN | POLLPRI;
2207     pollfd.revents = 0;
2208
2209     status = poll (&pollfd, 1, /* timeout = */ 500);
2210     if (state != RUNNING)
2211       break;
2212     else if (status == 0) /* timeout */
2213       continue;
2214     else if (status < 0) /* error */
2215     {
2216       status = errno;
2217       if (status != EINTR)
2218         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2219       continue;
2220     }
2221
2222     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2223       break;
2224     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2225     {
2226       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2227           "poll(2) returned something unexpected: %#04hx",
2228           pollfd.revents);
2229       break;
2230     }
2231
2232     rbytes = read(fd, sock->rbuf + sock->next_read,
2233                   RBUF_SIZE - sock->next_read);
2234     if (rbytes < 0)
2235     {
2236       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2237       break;
2238     }
2239     else if (rbytes == 0)
2240       break; /* eof */
2241
2242     sock->next_read += rbytes;
2243
2244     if (sock->batch_start)
2245       now = sock->batch_start;
2246     else
2247       now = time(NULL);
2248
2249     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2250     {
2251       status = handle_request (sock, now, cmd, cmd_len+1);
2252       if (status != 0)
2253         goto out_close;
2254     }
2255   }
2256
2257 out_close:
2258   close_connection(sock);
2259
2260   /* Remove this thread from the connection threads list */
2261   pthread_mutex_lock (&connection_threads_lock);
2262   connection_threads_num--;
2263   if (connection_threads_num <= 0)
2264     pthread_cond_broadcast(&connection_threads_done);
2265   pthread_mutex_unlock (&connection_threads_lock);
2266
2267   return (NULL);
2268 } /* }}} void *connection_thread_main */
2269
2270 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2271 {
2272   int fd;
2273   struct sockaddr_un sa;
2274   listen_socket_t *temp;
2275   int status;
2276   const char *path;
2277   char *path_copy, *dir;
2278
2279   path = sock->addr;
2280   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2281     path += strlen("unix:");
2282
2283   /* dirname may modify its argument */
2284   path_copy = strdup(path);
2285   if (path_copy == NULL)
2286   {
2287     fprintf(stderr, "rrdcached: strdup(): %s\n",
2288         rrd_strerror(errno));
2289     return (-1);
2290   }
2291
2292   dir = dirname(path_copy);
2293   if (rrd_mkdir_p(dir, 0777) != 0)
2294   {
2295     fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2296         dir, rrd_strerror(errno));
2297     return (-1);
2298   }
2299
2300   free(path_copy);
2301
2302   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2303       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2304   if (temp == NULL)
2305   {
2306     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2307     return (-1);
2308   }
2309   listen_fds = temp;
2310   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2311
2312   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2313   if (fd < 0)
2314   {
2315     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2316              rrd_strerror(errno));
2317     return (-1);
2318   }
2319
2320   memset (&sa, 0, sizeof (sa));
2321   sa.sun_family = AF_UNIX;
2322   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2323
2324   /* if we've gotten this far, we own the pid file.  any daemon started
2325    * with the same args must not be alive.  therefore, ensure that we can
2326    * create the socket...
2327    */
2328   unlink(path);
2329
2330   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2331   if (status != 0)
2332   {
2333     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2334              path, rrd_strerror(errno));
2335     close (fd);
2336     return (-1);
2337   }
2338
2339   /* tweak the sockets group ownership */
2340   if (sock->socket_group != (gid_t)-1)
2341   {
2342     if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2343          (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2344     {
2345       fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2346     }
2347   }
2348
2349   if (sock->socket_permissions != (mode_t)-1)
2350   {
2351     if (chmod(path, sock->socket_permissions) != 0)
2352       fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2353           (unsigned int)sock->socket_permissions, strerror(errno));
2354   }
2355
2356   status = listen (fd, /* backlog = */ 10);
2357   if (status != 0)
2358   {
2359     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2360              path, rrd_strerror(errno));
2361     close (fd);
2362     unlink (path);
2363     return (-1);
2364   }
2365
2366   listen_fds[listen_fds_num].fd = fd;
2367   listen_fds[listen_fds_num].family = PF_UNIX;
2368   strncpy(listen_fds[listen_fds_num].addr, path,
2369           sizeof (listen_fds[listen_fds_num].addr) - 1);
2370   listen_fds_num++;
2371
2372   return (0);
2373 } /* }}} int open_listen_socket_unix */
2374
2375 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2376 {
2377   struct addrinfo ai_hints;
2378   struct addrinfo *ai_res;
2379   struct addrinfo *ai_ptr;
2380   char addr_copy[NI_MAXHOST];
2381   char *addr;
2382   char *port;
2383   int status;
2384
2385   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2386   addr_copy[sizeof (addr_copy) - 1] = 0;
2387   addr = addr_copy;
2388
2389   memset (&ai_hints, 0, sizeof (ai_hints));
2390   ai_hints.ai_flags = 0;
2391 #ifdef AI_ADDRCONFIG
2392   ai_hints.ai_flags |= AI_ADDRCONFIG;
2393 #endif
2394   ai_hints.ai_family = AF_UNSPEC;
2395   ai_hints.ai_socktype = SOCK_STREAM;
2396
2397   port = NULL;
2398   if (*addr == '[') /* IPv6+port format */
2399   {
2400     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2401     addr++;
2402
2403     port = strchr (addr, ']');
2404     if (port == NULL)
2405     {
2406       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2407       return (-1);
2408     }
2409     *port = 0;
2410     port++;
2411
2412     if (*port == ':')
2413       port++;
2414     else if (*port == 0)
2415       port = NULL;
2416     else
2417     {
2418       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2419       return (-1);
2420     }
2421   } /* if (*addr == '[') */
2422   else
2423   {
2424     port = rindex(addr, ':');
2425     if (port != NULL)
2426     {
2427       *port = 0;
2428       port++;
2429     }
2430   }
2431   ai_res = NULL;
2432   status = getaddrinfo (addr,
2433                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2434                         &ai_hints, &ai_res);
2435   if (status != 0)
2436   {
2437     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2438              addr, gai_strerror (status));
2439     return (-1);
2440   }
2441
2442   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2443   {
2444     int fd;
2445     listen_socket_t *temp;
2446     int one = 1;
2447
2448     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2449         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2450     if (temp == NULL)
2451     {
2452       fprintf (stderr,
2453                "rrdcached: open_listen_socket_network: realloc failed.\n");
2454       continue;
2455     }
2456     listen_fds = temp;
2457     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2458
2459     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2460     if (fd < 0)
2461     {
2462       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2463                rrd_strerror(errno));
2464       continue;
2465     }
2466
2467     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2468
2469     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2470     if (status != 0)
2471     {
2472       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2473                sock->addr, rrd_strerror(errno));
2474       close (fd);
2475       continue;
2476     }
2477
2478     status = listen (fd, /* backlog = */ 10);
2479     if (status != 0)
2480     {
2481       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2482                sock->addr, rrd_strerror(errno));
2483       close (fd);
2484       freeaddrinfo(ai_res);
2485       return (-1);
2486     }
2487
2488     listen_fds[listen_fds_num].fd = fd;
2489     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2490     listen_fds_num++;
2491   } /* for (ai_ptr) */
2492
2493   freeaddrinfo(ai_res);
2494   return (0);
2495 } /* }}} static int open_listen_socket_network */
2496
2497 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2498 {
2499   assert(sock != NULL);
2500   assert(sock->addr != NULL);
2501
2502   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2503       || sock->addr[0] == '/')
2504     return (open_listen_socket_unix(sock));
2505   else
2506     return (open_listen_socket_network(sock));
2507 } /* }}} int open_listen_socket */
2508
2509 static int close_listen_sockets (void) /* {{{ */
2510 {
2511   size_t i;
2512
2513   for (i = 0; i < listen_fds_num; i++)
2514   {
2515     close (listen_fds[i].fd);
2516
2517     if (listen_fds[i].family == PF_UNIX)
2518       unlink(listen_fds[i].addr);
2519   }
2520
2521   free (listen_fds);
2522   listen_fds = NULL;
2523   listen_fds_num = 0;
2524
2525   return (0);
2526 } /* }}} int close_listen_sockets */
2527
2528 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2529 {
2530   struct pollfd *pollfds;
2531   int pollfds_num;
2532   int status;
2533   int i;
2534
2535   if (listen_fds_num < 1)
2536   {
2537     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2538     return (NULL);
2539   }
2540
2541   pollfds_num = listen_fds_num;
2542   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2543   if (pollfds == NULL)
2544   {
2545     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2546     return (NULL);
2547   }
2548   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2549
2550   RRDD_LOG(LOG_INFO, "listening for connections");
2551
2552   while (state == RUNNING)
2553   {
2554     for (i = 0; i < pollfds_num; i++)
2555     {
2556       pollfds[i].fd = listen_fds[i].fd;
2557       pollfds[i].events = POLLIN | POLLPRI;
2558       pollfds[i].revents = 0;
2559     }
2560
2561     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2562     if (state != RUNNING)
2563       break;
2564     else if (status == 0) /* timeout */
2565       continue;
2566     else if (status < 0) /* error */
2567     {
2568       status = errno;
2569       if (status != EINTR)
2570       {
2571         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2572       }
2573       continue;
2574     }
2575
2576     for (i = 0; i < pollfds_num; i++)
2577     {
2578       listen_socket_t *client_sock;
2579       struct sockaddr_storage client_sa;
2580       socklen_t client_sa_size;
2581       pthread_t tid;
2582       pthread_attr_t attr;
2583
2584       if (pollfds[i].revents == 0)
2585         continue;
2586
2587       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2588       {
2589         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2590             "poll(2) returned something unexpected for listen FD #%i.",
2591             pollfds[i].fd);
2592         continue;
2593       }
2594
2595       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2596       if (client_sock == NULL)
2597       {
2598         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2599         continue;
2600       }
2601       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2602
2603       client_sa_size = sizeof (client_sa);
2604       client_sock->fd = accept (pollfds[i].fd,
2605           (struct sockaddr *) &client_sa, &client_sa_size);
2606       if (client_sock->fd < 0)
2607       {
2608         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2609         free(client_sock);
2610         continue;
2611       }
2612
2613       pthread_attr_init (&attr);
2614       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2615
2616       status = pthread_create (&tid, &attr, connection_thread_main,
2617                                client_sock);
2618       if (status != 0)
2619       {
2620         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2621         close_connection(client_sock);
2622         continue;
2623       }
2624     } /* for (pollfds_num) */
2625   } /* while (state == RUNNING) */
2626
2627   RRDD_LOG(LOG_INFO, "starting shutdown");
2628
2629   close_listen_sockets ();
2630
2631   pthread_mutex_lock (&connection_threads_lock);
2632   while (connection_threads_num > 0)
2633     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2634   pthread_mutex_unlock (&connection_threads_lock);
2635
2636   free(pollfds);
2637
2638   return (NULL);
2639 } /* }}} void *listen_thread_main */
2640
2641 static int daemonize (void) /* {{{ */
2642 {
2643   int pid_fd;
2644   char *base_dir;
2645
2646   daemon_uid = geteuid();
2647
2648   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2649   if (pid_fd < 0)
2650     pid_fd = check_pidfile();
2651   if (pid_fd < 0)
2652     return pid_fd;
2653
2654   /* open all the listen sockets */
2655   if (config_listen_address_list_len > 0)
2656   {
2657     for (size_t i = 0; i < config_listen_address_list_len; i++)
2658       open_listen_socket (config_listen_address_list[i]);
2659
2660     rrd_free_ptrs((void ***) &config_listen_address_list,
2661                   &config_listen_address_list_len);
2662   }
2663   else
2664   {
2665     listen_socket_t sock;
2666     memset(&sock, 0, sizeof(sock));
2667     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2668     open_listen_socket (&sock);
2669   }
2670
2671   if (listen_fds_num < 1)
2672   {
2673     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2674     goto error;
2675   }
2676
2677   if (!stay_foreground)
2678   {
2679     pid_t child;
2680
2681     child = fork ();
2682     if (child < 0)
2683     {
2684       fprintf (stderr, "daemonize: fork(2) failed.\n");
2685       goto error;
2686     }
2687     else if (child > 0)
2688       exit(0);
2689
2690     /* Become session leader */
2691     setsid ();
2692
2693     /* Open the first three file descriptors to /dev/null */
2694     close (2);
2695     close (1);
2696     close (0);
2697
2698     open ("/dev/null", O_RDWR);
2699     if (dup(0) == -1 || dup(0) == -1){
2700         RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2701     }
2702   } /* if (!stay_foreground) */
2703
2704   /* Change into the /tmp directory. */
2705   base_dir = (config_base_dir != NULL)
2706     ? config_base_dir
2707     : "/tmp";
2708
2709   if (chdir (base_dir) != 0)
2710   {
2711     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2712     goto error;
2713   }
2714
2715   install_signal_handlers();
2716
2717   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2718   RRDD_LOG(LOG_INFO, "starting up");
2719
2720   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2721                                 (GDestroyNotify) free_cache_item);
2722   if (cache_tree == NULL)
2723   {
2724     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2725     goto error;
2726   }
2727
2728   return write_pidfile (pid_fd);
2729
2730 error:
2731   remove_pidfile();
2732   return -1;
2733 } /* }}} int daemonize */
2734
2735 static int cleanup (void) /* {{{ */
2736 {
2737   pthread_cond_broadcast (&flush_cond);
2738   pthread_join (flush_thread, NULL);
2739
2740   pthread_cond_broadcast (&queue_cond);
2741   for (int i = 0; i < config_queue_threads; i++)
2742     pthread_join (queue_threads[i], NULL);
2743
2744   if (config_flush_at_shutdown)
2745   {
2746     assert(cache_queue_head == NULL);
2747     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2748   }
2749
2750   free(queue_threads);
2751   free(config_base_dir);
2752
2753   pthread_mutex_lock(&cache_lock);
2754   g_tree_destroy(cache_tree);
2755
2756   pthread_mutex_lock(&journal_lock);
2757   journal_done();
2758
2759   RRDD_LOG(LOG_INFO, "goodbye");
2760   closelog ();
2761
2762   remove_pidfile ();
2763   free(config_pid_file);
2764
2765   return (0);
2766 } /* }}} int cleanup */
2767
2768 static int read_options (int argc, char **argv) /* {{{ */
2769 {
2770   int option;
2771   int status = 0;
2772
2773   char **permissions = NULL;
2774   size_t permissions_len = 0;
2775
2776   gid_t  socket_group = (gid_t)-1;
2777   mode_t socket_permissions = (mode_t)-1;
2778
2779   while ((option = getopt(argc, argv, "gl:s:m:P:f:w:z:t:Bb:p:Fj:h?")) != -1)
2780   {
2781     switch (option)
2782     {
2783       case 'g':
2784         stay_foreground=1;
2785         break;
2786
2787       case 'l':
2788       {
2789         listen_socket_t *new;
2790
2791         new = malloc(sizeof(listen_socket_t));
2792         if (new == NULL)
2793         {
2794           fprintf(stderr, "read_options: malloc failed.\n");
2795           return(2);
2796         }
2797         memset(new, 0, sizeof(listen_socket_t));
2798
2799         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2800
2801         /* Add permissions to the socket {{{ */
2802         if (permissions_len != 0)
2803         {
2804           size_t i;
2805           for (i = 0; i < permissions_len; i++)
2806           {
2807             status = socket_permission_add (new, permissions[i]);
2808             if (status != 0)
2809             {
2810               fprintf (stderr, "read_options: Adding permission \"%s\" to "
2811                   "socket failed. Most likely, this permission doesn't "
2812                   "exist. Check your command line.\n", permissions[i]);
2813               status = 4;
2814             }
2815           }
2816         }
2817         else /* if (permissions_len == 0) */
2818         {
2819           /* Add permission for ALL commands to the socket. */
2820           size_t i;
2821           for (i = 0; i < list_of_commands_len; i++)
2822           {
2823             status = socket_permission_add (new, list_of_commands[i].cmd);
2824             if (status != 0)
2825             {
2826               fprintf (stderr, "read_options: Adding permission \"%s\" to "
2827                   "socket failed. This should never happen, ever! Sorry.\n",
2828                   permissions[i]);
2829               status = 4;
2830             }
2831           }
2832         }
2833         /* }}} Done adding permissions. */
2834
2835         new->socket_group = socket_group;
2836         new->socket_permissions = socket_permissions;
2837
2838         if (!rrd_add_ptr((void ***)&config_listen_address_list,
2839                          &config_listen_address_list_len, new))
2840         {
2841           fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2842           return (2);
2843         }
2844       }
2845       break;
2846
2847       /* set socket group permissions */
2848       case 's':
2849       {
2850         gid_t group_gid;
2851         struct group *grp;
2852
2853         group_gid = strtoul(optarg, NULL, 10);
2854         if (errno != EINVAL && group_gid>0)
2855         {
2856           /* we were passed a number */
2857           grp = getgrgid(group_gid);
2858         }
2859         else
2860         {
2861           grp = getgrnam(optarg);
2862         }
2863
2864         if (grp)
2865         {
2866           socket_group = grp->gr_gid;
2867         }
2868         else
2869         {
2870           /* no idea what the user wanted... */
2871           fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
2872           return (5);
2873         }
2874       }
2875       break;
2876
2877       /* set socket file permissions */
2878       case 'm':
2879       {
2880         long  tmp;
2881         char *endptr = NULL;
2882
2883         tmp = strtol (optarg, &endptr, 8);
2884         if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
2885             || (tmp > 07777) || (tmp < 0)) {
2886           fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
2887               optarg);
2888           return (5);
2889         }
2890
2891         socket_permissions = (mode_t)tmp;
2892       }
2893       break;
2894
2895       case 'P':
2896       {
2897         char *optcopy;
2898         char *saveptr;
2899         char *dummy;
2900         char *ptr;
2901
2902         rrd_free_ptrs ((void *) &permissions, &permissions_len);
2903
2904         optcopy = strdup (optarg);
2905         dummy = optcopy;
2906         saveptr = NULL;
2907         while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
2908         {
2909           dummy = NULL;
2910           rrd_add_strdup ((void *) &permissions, &permissions_len, ptr);
2911         }
2912
2913         free (optcopy);
2914       }
2915       break;
2916
2917       case 'f':
2918       {
2919         int temp;
2920
2921         temp = atoi (optarg);
2922         if (temp > 0)
2923           config_flush_interval = temp;
2924         else
2925         {
2926           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2927           status = 3;
2928         }
2929       }
2930       break;
2931
2932       case 'w':
2933       {
2934         int temp;
2935
2936         temp = atoi (optarg);
2937         if (temp > 0)
2938           config_write_interval = temp;
2939         else
2940         {
2941           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2942           status = 2;
2943         }
2944       }
2945       break;
2946
2947       case 'z':
2948       {
2949         int temp;
2950
2951         temp = atoi(optarg);
2952         if (temp > 0)
2953           config_write_jitter = temp;
2954         else
2955         {
2956           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2957           status = 2;
2958         }
2959
2960         break;
2961       }
2962
2963       case 't':
2964       {
2965         int threads;
2966         threads = atoi(optarg);
2967         if (threads >= 1)
2968           config_queue_threads = threads;
2969         else
2970         {
2971           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2972           return 1;
2973         }
2974       }
2975       break;
2976
2977       case 'B':
2978         config_write_base_only = 1;
2979         break;
2980
2981       case 'b':
2982       {
2983         size_t len;
2984         char base_realpath[PATH_MAX];
2985
2986         if (config_base_dir != NULL)
2987           free (config_base_dir);
2988         config_base_dir = strdup (optarg);
2989         if (config_base_dir == NULL)
2990         {
2991           fprintf (stderr, "read_options: strdup failed.\n");
2992           return (3);
2993         }
2994
2995         if (rrd_mkdir_p (config_base_dir, 0777) != 0)
2996         {
2997           fprintf (stderr, "Failed to create base directory '%s': %s\n",
2998               config_base_dir, rrd_strerror (errno));
2999           return (3);
3000         }
3001
3002         /* make sure that the base directory is not resolved via
3003          * symbolic links.  this makes some performance-enhancing
3004          * assumptions possible (we don't have to resolve paths
3005          * that start with a "/")
3006          */
3007         if (realpath(config_base_dir, base_realpath) == NULL)
3008         {
3009           fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3010               "%s\n", config_base_dir, rrd_strerror(errno));
3011           return 5;
3012         }
3013
3014         len = strlen (config_base_dir);
3015         while ((len > 0) && (config_base_dir[len - 1] == '/'))
3016         {
3017           config_base_dir[len - 1] = 0;
3018           len--;
3019         }
3020
3021         if (len < 1)
3022         {
3023           fprintf (stderr, "Invalid base directory: %s\n", optarg);
3024           return (4);
3025         }
3026
3027         _config_base_dir_len = len;
3028
3029         len = strlen (base_realpath);
3030         while ((len > 0) && (base_realpath[len - 1] == '/'))
3031         {
3032           base_realpath[len - 1] = '\0';
3033           len--;
3034         }
3035
3036         if (strncmp(config_base_dir,
3037                          base_realpath, sizeof(base_realpath)) != 0)
3038         {
3039           fprintf(stderr,
3040                   "Base directory (-b) resolved via file system links!\n"
3041                   "Please consult rrdcached '-b' documentation!\n"
3042                   "Consider specifying the real directory (%s)\n",
3043                   base_realpath);
3044           return 5;
3045         }
3046       }
3047       break;
3048
3049       case 'p':
3050       {
3051         if (config_pid_file != NULL)
3052           free (config_pid_file);
3053         config_pid_file = strdup (optarg);
3054         if (config_pid_file == NULL)
3055         {
3056           fprintf (stderr, "read_options: strdup failed.\n");
3057           return (3);
3058         }
3059       }
3060       break;
3061
3062       case 'F':
3063         config_flush_at_shutdown = 1;
3064         break;
3065
3066       case 'j':
3067       {
3068         const char *dir = journal_dir = strdup(optarg);
3069
3070         status = rrd_mkdir_p(dir, 0777);
3071         if (status != 0)
3072         {
3073           fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3074               dir, rrd_strerror(errno));
3075           return 6;
3076         }
3077
3078         if (access(dir, R_OK|W_OK|X_OK) != 0)
3079         {
3080           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3081                   errno ? rrd_strerror(errno) : "");
3082           return 6;
3083         }
3084       }
3085       break;
3086
3087       case 'h':
3088       case '?':
3089         printf ("RRDCacheD %s\n"
3090             "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3091             "\n"
3092             "Usage: rrdcached [options]\n"
3093             "\n"
3094             "Valid options are:\n"
3095             "  -l <address>  Socket address to listen to.\n"
3096             "  -P <perms>    Sets the permissions to assign to all following "
3097                             "sockets\n"
3098             "  -w <seconds>  Interval in which to write data.\n"
3099             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
3100             "  -t <threads>  Number of write threads.\n"
3101             "  -f <seconds>  Interval in which to flush dead data.\n"
3102             "  -p <file>     Location of the PID-file.\n"
3103             "  -b <dir>      Base directory to change to.\n"
3104             "  -B            Restrict file access to paths within -b <dir>\n"
3105             "  -g            Do not fork and run in the foreground.\n"
3106             "  -j <dir>      Directory in which to create the journal files.\n"
3107             "  -F            Always flush all updates at shutdown\n"
3108             "  -s <id|name>  Group owner of all following UNIX sockets\n"
3109             "                (the socket will also have read/write permissions "
3110                             "for that group)\n"
3111             "  -m <mode>     File permissions (octal) of all following UNIX "
3112                             "sockets\n"
3113             "\n"
3114             "For more information and a detailed description of all options "
3115             "please refer\n"
3116             "to the rrdcached(1) manual page.\n",
3117             VERSION);
3118         if (option == 'h')
3119           status = -1;
3120         else
3121           status = 1;
3122         break;
3123     } /* switch (option) */
3124   } /* while (getopt) */
3125
3126   /* advise the user when values are not sane */
3127   if (config_flush_interval < 2 * config_write_interval)
3128     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3129             " 2x write interval (-w) !\n");
3130   if (config_write_jitter > config_write_interval)
3131     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3132             " write interval (-w) !\n");
3133
3134   if (config_write_base_only && config_base_dir == NULL)
3135     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3136             "  Consult the rrdcached documentation\n");
3137
3138   if (journal_dir == NULL)
3139     config_flush_at_shutdown = 1;
3140
3141   rrd_free_ptrs ((void *) &permissions, &permissions_len);
3142
3143   return (status);
3144 } /* }}} int read_options */
3145
3146 int main (int argc, char **argv)
3147 {
3148   int status;
3149
3150   status = read_options (argc, argv);
3151   if (status != 0)
3152   {
3153     if (status < 0)
3154       status = 0;
3155     return (status);
3156   }
3157
3158   status = daemonize ();
3159   if (status != 0)
3160   {
3161     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3162     return (1);
3163   }
3164
3165   journal_init();
3166
3167   /* start the queue threads */
3168   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3169   if (queue_threads == NULL)
3170   {
3171     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3172     cleanup();
3173     return (1);
3174   }
3175   for (int i = 0; i < config_queue_threads; i++)
3176   {
3177     memset (&queue_threads[i], 0, sizeof (*queue_threads));
3178     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3179     if (status != 0)
3180     {
3181       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3182       cleanup();
3183       return (1);
3184     }
3185   }
3186
3187   /* start the flush thread */
3188   memset(&flush_thread, 0, sizeof(flush_thread));
3189   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3190   if (status != 0)
3191   {
3192     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3193     cleanup();
3194     return (1);
3195   }
3196
3197   listen_thread_main (NULL);
3198   cleanup ();
3199
3200   return (0);
3201 } /* int main */
3202
3203 /*
3204  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3205  */