ba9c2adfa979cf955bd1858507b442af01ed2b3a
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008,2009 Florian octo Forster
4  * Copyright (C) 2008,2009 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
73
74 #include "rrd.h"
75 #include "rrd_client.h"
76
77 #include <stdlib.h>
78
79 #ifndef WIN32
80 #ifdef HAVE_STDINT_H
81 #  include <stdint.h>
82 #endif
83 #include <unistd.h>
84 #include <strings.h>
85 #include <inttypes.h>
86 #include <sys/socket.h>
87
88 #else
89
90 #endif
91 #include <stdio.h>
92 #include <string.h>
93
94 #include <sys/types.h>
95 #include <sys/stat.h>
96 #include <dirent.h>
97 #include <fcntl.h>
98 #include <signal.h>
99 #include <sys/un.h>
100 #include <netdb.h>
101 #include <poll.h>
102 #include <syslog.h>
103 #include <pthread.h>
104 #include <errno.h>
105 #include <assert.h>
106 #include <sys/time.h>
107 #include <time.h>
108 #include <libgen.h>
109 #include <grp.h>
110
111 #include <glib-2.0/glib.h>
112 /* }}} */
113
114 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
115
116 #ifndef __GNUC__
117 # define __attribute__(x) /**/
118 #endif
119
120 /*
121  * Types
122  */
123 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
124
125 struct listen_socket_s
126 {
127   int fd;
128   char addr[PATH_MAX + 1];
129   int family;
130
131   /* state for BATCH processing */
132   time_t batch_start;
133   int batch_cmd;
134
135   /* buffered IO */
136   char *rbuf;
137   off_t next_cmd;
138   off_t next_read;
139
140   char *wbuf;
141   ssize_t wbuf_len;
142
143   uint32_t permissions;
144
145   gid_t  socket_group;
146   mode_t socket_permissions;
147 };
148 typedef struct listen_socket_s listen_socket_t;
149
150 struct command_s;
151 typedef struct command_s command_t;
152 /* note: guard against "unused" warnings in the handlers */
153 #define DISPATCH_PROTO  listen_socket_t *sock   __attribute__((unused)),\
154                         time_t now              __attribute__((unused)),\
155                         char  *buffer           __attribute__((unused)),\
156                         size_t buffer_size      __attribute__((unused))
157
158 #define HANDLER_PROTO   command_t *cmd          __attribute__((unused)),\
159                         DISPATCH_PROTO
160
161 struct command_s {
162   char   *cmd;
163   int (*handler)(HANDLER_PROTO);
164
165   char  context;                /* where we expect to see it */
166 #define CMD_CONTEXT_CLIENT      (1<<0)
167 #define CMD_CONTEXT_BATCH       (1<<1)
168 #define CMD_CONTEXT_JOURNAL     (1<<2)
169 #define CMD_CONTEXT_ANY         (0x7f)
170
171   char *syntax;
172   char *help;
173 };
174
175 struct cache_item_s;
176 typedef struct cache_item_s cache_item_t;
177 struct cache_item_s
178 {
179   char *file;
180   char **values;
181   size_t values_num;
182   time_t last_flush_time;
183   time_t last_update_stamp;
184 #define CI_FLAGS_IN_TREE  (1<<0)
185 #define CI_FLAGS_IN_QUEUE (1<<1)
186   int flags;
187   pthread_cond_t  flushed;
188   cache_item_t *prev;
189   cache_item_t *next;
190 };
191
192 struct callback_flush_data_s
193 {
194   time_t now;
195   time_t abs_timeout;
196   char **keys;
197   size_t keys_num;
198 };
199 typedef struct callback_flush_data_s callback_flush_data_t;
200
201 enum queue_side_e
202 {
203   HEAD,
204   TAIL
205 };
206 typedef enum queue_side_e queue_side_t;
207
208 /* describe a set of journal files */
209 typedef struct {
210   char **files;
211   size_t files_num;
212 } journal_set;
213
214 /* max length of socket command or response */
215 #define CMD_MAX 4096
216 #define RBUF_SIZE (CMD_MAX*2)
217
218 /*
219  * Variables
220  */
221 static int stay_foreground = 0;
222 static uid_t daemon_uid;
223
224 static listen_socket_t *listen_fds = NULL;
225 static size_t listen_fds_num = 0;
226
227 enum {
228   RUNNING,              /* normal operation */
229   FLUSHING,             /* flushing remaining values */
230   SHUTDOWN              /* shutting down */
231 } state = RUNNING;
232
233 static pthread_t *queue_threads;
234 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
235 static int config_queue_threads = 4;
236
237 static pthread_t flush_thread;
238 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
239
240 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
241 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
242 static int connection_threads_num = 0;
243
244 /* Cache stuff */
245 static GTree          *cache_tree = NULL;
246 static cache_item_t   *cache_queue_head = NULL;
247 static cache_item_t   *cache_queue_tail = NULL;
248 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
249
250 static int config_write_interval = 300;
251 static int config_write_jitter   = 0;
252 static int config_flush_interval = 3600;
253 static int config_flush_at_shutdown = 0;
254 static char *config_pid_file = NULL;
255 static char *config_base_dir = NULL;
256 static size_t _config_base_dir_len = 0;
257 static int config_write_base_only = 0;
258
259 static listen_socket_t **config_listen_address_list = NULL;
260 static size_t config_listen_address_list_len = 0;
261
262 static uint64_t stats_queue_length = 0;
263 static uint64_t stats_updates_received = 0;
264 static uint64_t stats_flush_received = 0;
265 static uint64_t stats_updates_written = 0;
266 static uint64_t stats_data_sets_written = 0;
267 static uint64_t stats_journal_bytes = 0;
268 static uint64_t stats_journal_rotate = 0;
269 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
270
271 /* Journaled updates */
272 #define JOURNAL_REPLAY(s) ((s) == NULL)
273 #define JOURNAL_BASE "rrd.journal"
274 static journal_set *journal_cur = NULL;
275 static journal_set *journal_old = NULL;
276 static char *journal_dir = NULL;
277 static FILE *journal_fh = NULL;         /* current journal file handle */
278 static long  journal_size = 0;          /* current journal size */
279 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
280 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
281 static int journal_write(char *cmd, char *args);
282 static void journal_done(void);
283 static void journal_rotate(void);
284
285 /* prototypes for forward refernces */
286 static int handle_request_help (HANDLER_PROTO);
287
288 /* 
289  * Functions
290  */
291 static void sig_common (const char *sig) /* {{{ */
292 {
293   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
294   state = FLUSHING;
295   pthread_cond_broadcast(&flush_cond);
296   pthread_cond_broadcast(&queue_cond);
297 } /* }}} void sig_common */
298
299 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
300 {
301   sig_common("INT");
302 } /* }}} void sig_int_handler */
303
304 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
305 {
306   sig_common("TERM");
307 } /* }}} void sig_term_handler */
308
309 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
310 {
311   config_flush_at_shutdown = 1;
312   sig_common("USR1");
313 } /* }}} void sig_usr1_handler */
314
315 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
316 {
317   config_flush_at_shutdown = 0;
318   sig_common("USR2");
319 } /* }}} void sig_usr2_handler */
320
321 static void install_signal_handlers(void) /* {{{ */
322 {
323   /* These structures are static, because `sigaction' behaves weird if the are
324    * overwritten.. */
325   static struct sigaction sa_int;
326   static struct sigaction sa_term;
327   static struct sigaction sa_pipe;
328   static struct sigaction sa_usr1;
329   static struct sigaction sa_usr2;
330
331   /* Install signal handlers */
332   memset (&sa_int, 0, sizeof (sa_int));
333   sa_int.sa_handler = sig_int_handler;
334   sigaction (SIGINT, &sa_int, NULL);
335
336   memset (&sa_term, 0, sizeof (sa_term));
337   sa_term.sa_handler = sig_term_handler;
338   sigaction (SIGTERM, &sa_term, NULL);
339
340   memset (&sa_pipe, 0, sizeof (sa_pipe));
341   sa_pipe.sa_handler = SIG_IGN;
342   sigaction (SIGPIPE, &sa_pipe, NULL);
343
344   memset (&sa_pipe, 0, sizeof (sa_usr1));
345   sa_usr1.sa_handler = sig_usr1_handler;
346   sigaction (SIGUSR1, &sa_usr1, NULL);
347
348   memset (&sa_usr2, 0, sizeof (sa_usr2));
349   sa_usr2.sa_handler = sig_usr2_handler;
350   sigaction (SIGUSR2, &sa_usr2, NULL);
351
352 } /* }}} void install_signal_handlers */
353
354 static int open_pidfile(char *action, int oflag) /* {{{ */
355 {
356   int fd;
357   const char *file;
358   char *file_copy, *dir;
359
360   file = (config_pid_file != NULL)
361     ? config_pid_file
362     : LOCALSTATEDIR "/run/rrdcached.pid";
363
364   /* dirname may modify its argument */
365   file_copy = strdup(file);
366   if (file_copy == NULL)
367   {
368     fprintf(stderr, "rrdcached: strdup(): %s\n",
369         rrd_strerror(errno));
370     return -1;
371   }
372
373   dir = dirname(file_copy);
374   if (rrd_mkdir_p(dir, 0777) != 0)
375   {
376     fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
377         dir, rrd_strerror(errno));
378     return -1;
379   }
380
381   free(file_copy);
382
383   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
384   if (fd < 0)
385     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
386             action, file, rrd_strerror(errno));
387
388   return(fd);
389 } /* }}} static int open_pidfile */
390
391 /* check existing pid file to see whether a daemon is running */
392 static int check_pidfile(void)
393 {
394   int pid_fd;
395   pid_t pid;
396   char pid_str[16];
397
398   pid_fd = open_pidfile("open", O_RDWR);
399   if (pid_fd < 0)
400     return pid_fd;
401
402   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
403     return -1;
404
405   pid = atoi(pid_str);
406   if (pid <= 0)
407     return -1;
408
409   /* another running process that we can signal COULD be
410    * a competing rrdcached */
411   if (pid != getpid() && kill(pid, 0) == 0)
412   {
413     fprintf(stderr,
414             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
415     close(pid_fd);
416     return -1;
417   }
418
419   lseek(pid_fd, 0, SEEK_SET);
420   if (ftruncate(pid_fd, 0) == -1)
421   {
422     fprintf(stderr,
423             "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
424     close(pid_fd);
425     return -1;
426   }
427
428   fprintf(stderr,
429           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
430           "rrdcached: starting normally.\n", pid);
431
432   return pid_fd;
433 } /* }}} static int check_pidfile */
434
435 static int write_pidfile (int fd) /* {{{ */
436 {
437   pid_t pid;
438   FILE *fh;
439
440   pid = getpid ();
441
442   fh = fdopen (fd, "w");
443   if (fh == NULL)
444   {
445     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
446     close(fd);
447     return (-1);
448   }
449
450   fprintf (fh, "%i\n", (int) pid);
451   fclose (fh);
452
453   return (0);
454 } /* }}} int write_pidfile */
455
456 static int remove_pidfile (void) /* {{{ */
457 {
458   char *file;
459   int status;
460
461   file = (config_pid_file != NULL)
462     ? config_pid_file
463     : LOCALSTATEDIR "/run/rrdcached.pid";
464
465   status = unlink (file);
466   if (status == 0)
467     return (0);
468   return (errno);
469 } /* }}} int remove_pidfile */
470
471 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
472 {
473   char *eol;
474
475   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
476                sock->next_read - sock->next_cmd);
477
478   if (eol == NULL)
479   {
480     /* no commands left, move remainder back to front of rbuf */
481     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
482             sock->next_read - sock->next_cmd);
483     sock->next_read -= sock->next_cmd;
484     sock->next_cmd = 0;
485     *len = 0;
486     return NULL;
487   }
488   else
489   {
490     char *cmd = sock->rbuf + sock->next_cmd;
491     *eol = '\0';
492
493     sock->next_cmd = eol - sock->rbuf + 1;
494
495     if (eol > sock->rbuf && *(eol-1) == '\r')
496       *(--eol) = '\0'; /* handle "\r\n" EOL */
497
498     *len = eol - cmd;
499
500     return cmd;
501   }
502
503   /* NOTREACHED */
504   assert(1==0);
505 } /* }}} char *next_cmd */
506
507 /* add the characters directly to the write buffer */
508 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
509 {
510   char *new_buf;
511
512   assert(sock != NULL);
513
514   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
515   if (new_buf == NULL)
516   {
517     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
518     return -1;
519   }
520
521   strncpy(new_buf + sock->wbuf_len, str, len + 1);
522
523   sock->wbuf = new_buf;
524   sock->wbuf_len += len;
525
526   return 0;
527 } /* }}} static int add_to_wbuf */
528
529 /* add the text to the "extra" info that's sent after the status line */
530 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
531 {
532   va_list argp;
533   char buffer[CMD_MAX];
534   int len;
535
536   if (JOURNAL_REPLAY(sock)) return 0;
537   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
538
539   va_start(argp, fmt);
540 #ifdef HAVE_VSNPRINTF
541   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
542 #else
543   len = vsprintf(buffer, fmt, argp);
544 #endif
545   va_end(argp);
546   if (len < 0)
547   {
548     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
549     return -1;
550   }
551
552   return add_to_wbuf(sock, buffer, len);
553 } /* }}} static int add_response_info */
554
555 static int count_lines(char *str) /* {{{ */
556 {
557   int lines = 0;
558
559   if (str != NULL)
560   {
561     while ((str = strchr(str, '\n')) != NULL)
562     {
563       ++lines;
564       ++str;
565     }
566   }
567
568   return lines;
569 } /* }}} static int count_lines */
570
571 /* send the response back to the user.
572  * returns 0 on success, -1 on error
573  * write buffer is always zeroed after this call */
574 static int send_response (listen_socket_t *sock, response_code rc,
575                           char *fmt, ...) /* {{{ */
576 {
577   va_list argp;
578   char buffer[CMD_MAX];
579   int lines;
580   ssize_t wrote;
581   int rclen, len;
582
583   if (JOURNAL_REPLAY(sock)) return rc;
584
585   if (sock->batch_start)
586   {
587     if (rc == RESP_OK)
588       return rc; /* no response on success during BATCH */
589     lines = sock->batch_cmd;
590   }
591   else if (rc == RESP_OK)
592     lines = count_lines(sock->wbuf);
593   else
594     lines = -1;
595
596   rclen = sprintf(buffer, "%d ", lines);
597   va_start(argp, fmt);
598 #ifdef HAVE_VSNPRINTF
599   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
600 #else
601   len = vsprintf(buffer+rclen, fmt, argp);
602 #endif
603   va_end(argp);
604   if (len < 0)
605     return -1;
606
607   len += rclen;
608
609   /* append the result to the wbuf, don't write to the user */
610   if (sock->batch_start)
611     return add_to_wbuf(sock, buffer, len);
612
613   /* first write must be complete */
614   if (len != write(sock->fd, buffer, len))
615   {
616     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
617     return -1;
618   }
619
620   if (sock->wbuf != NULL && rc == RESP_OK)
621   {
622     wrote = 0;
623     while (wrote < sock->wbuf_len)
624     {
625       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
626       if (wb <= 0)
627       {
628         RRDD_LOG(LOG_INFO, "send_response: could not write results");
629         return -1;
630       }
631       wrote += wb;
632     }
633   }
634
635   free(sock->wbuf); sock->wbuf = NULL;
636   sock->wbuf_len = 0;
637
638   return 0;
639 } /* }}} */
640
641 static void wipe_ci_values(cache_item_t *ci, time_t when)
642 {
643   ci->values = NULL;
644   ci->values_num = 0;
645
646   ci->last_flush_time = when;
647   if (config_write_jitter > 0)
648     ci->last_flush_time += (rrd_random() % config_write_jitter);
649 }
650
651 /* remove_from_queue
652  * remove a "cache_item_t" item from the queue.
653  * must hold 'cache_lock' when calling this
654  */
655 static void remove_from_queue(cache_item_t *ci) /* {{{ */
656 {
657   if (ci == NULL) return;
658   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
659
660   if (ci->prev == NULL)
661     cache_queue_head = ci->next; /* reset head */
662   else
663     ci->prev->next = ci->next;
664
665   if (ci->next == NULL)
666     cache_queue_tail = ci->prev; /* reset the tail */
667   else
668     ci->next->prev = ci->prev;
669
670   ci->next = ci->prev = NULL;
671   ci->flags &= ~CI_FLAGS_IN_QUEUE;
672
673   pthread_mutex_lock (&stats_lock);
674   assert (stats_queue_length > 0);
675   stats_queue_length--;
676   pthread_mutex_unlock (&stats_lock);
677
678 } /* }}} static void remove_from_queue */
679
680 /* free the resources associated with the cache_item_t
681  * must hold cache_lock when calling this function
682  */
683 static void *free_cache_item(cache_item_t *ci) /* {{{ */
684 {
685   if (ci == NULL) return NULL;
686
687   remove_from_queue(ci);
688
689   for (size_t i=0; i < ci->values_num; i++)
690     free(ci->values[i]);
691
692   free (ci->values);
693   free (ci->file);
694
695   /* in case anyone is waiting */
696   pthread_cond_broadcast(&ci->flushed);
697   pthread_cond_destroy(&ci->flushed);
698
699   free (ci);
700
701   return NULL;
702 } /* }}} static void *free_cache_item */
703
704 /*
705  * enqueue_cache_item:
706  * `cache_lock' must be acquired before calling this function!
707  */
708 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
709     queue_side_t side)
710 {
711   if (ci == NULL)
712     return (-1);
713
714   if (ci->values_num == 0)
715     return (0);
716
717   if (side == HEAD)
718   {
719     if (cache_queue_head == ci)
720       return 0;
721
722     /* remove if further down in queue */
723     remove_from_queue(ci);
724
725     ci->prev = NULL;
726     ci->next = cache_queue_head;
727     if (ci->next != NULL)
728       ci->next->prev = ci;
729     cache_queue_head = ci;
730
731     if (cache_queue_tail == NULL)
732       cache_queue_tail = cache_queue_head;
733   }
734   else /* (side == TAIL) */
735   {
736     /* We don't move values back in the list.. */
737     if (ci->flags & CI_FLAGS_IN_QUEUE)
738       return (0);
739
740     assert (ci->next == NULL);
741     assert (ci->prev == NULL);
742
743     ci->prev = cache_queue_tail;
744
745     if (cache_queue_tail == NULL)
746       cache_queue_head = ci;
747     else
748       cache_queue_tail->next = ci;
749
750     cache_queue_tail = ci;
751   }
752
753   ci->flags |= CI_FLAGS_IN_QUEUE;
754
755   pthread_cond_signal(&queue_cond);
756   pthread_mutex_lock (&stats_lock);
757   stats_queue_length++;
758   pthread_mutex_unlock (&stats_lock);
759
760   return (0);
761 } /* }}} int enqueue_cache_item */
762
763 /*
764  * tree_callback_flush:
765  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
766  * while this is in progress.
767  */
768 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
769     gpointer data)
770 {
771   cache_item_t *ci;
772   callback_flush_data_t *cfd;
773
774   ci = (cache_item_t *) value;
775   cfd = (callback_flush_data_t *) data;
776
777   if (ci->flags & CI_FLAGS_IN_QUEUE)
778     return FALSE;
779
780   if (ci->values_num > 0
781       && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
782   {
783     enqueue_cache_item (ci, TAIL);
784   }
785   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
786       && (ci->values_num <= 0))
787   {
788     assert ((char *) key == ci->file);
789     if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
790     {
791       RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
792       return (FALSE);
793     }
794   }
795
796   return (FALSE);
797 } /* }}} gboolean tree_callback_flush */
798
799 static int flush_old_values (int max_age)
800 {
801   callback_flush_data_t cfd;
802   size_t k;
803
804   memset (&cfd, 0, sizeof (cfd));
805   /* Pass the current time as user data so that we don't need to call
806    * `time' for each node. */
807   cfd.now = time (NULL);
808   cfd.keys = NULL;
809   cfd.keys_num = 0;
810
811   if (max_age > 0)
812     cfd.abs_timeout = cfd.now - max_age;
813   else
814     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
815
816   /* `tree_callback_flush' will return the keys of all values that haven't
817    * been touched in the last `config_flush_interval' seconds in `cfd'.
818    * The char*'s in this array point to the same memory as ci->file, so we
819    * don't need to free them separately. */
820   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
821
822   for (k = 0; k < cfd.keys_num; k++)
823   {
824     gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
825     /* should never fail, since we have held the cache_lock
826      * the entire time */
827     assert(status == TRUE);
828   }
829
830   if (cfd.keys != NULL)
831   {
832     free (cfd.keys);
833     cfd.keys = NULL;
834   }
835
836   return (0);
837 } /* int flush_old_values */
838
839 static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
840 {
841   struct timeval now;
842   struct timespec next_flush;
843   int status;
844
845   gettimeofday (&now, NULL);
846   next_flush.tv_sec = now.tv_sec + config_flush_interval;
847   next_flush.tv_nsec = 1000 * now.tv_usec;
848
849   pthread_mutex_lock(&cache_lock);
850
851   while (state == RUNNING)
852   {
853     gettimeofday (&now, NULL);
854     if ((now.tv_sec > next_flush.tv_sec)
855         || ((now.tv_sec == next_flush.tv_sec)
856           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
857     {
858       RRDD_LOG(LOG_DEBUG, "flushing old values");
859
860       /* Determine the time of the next cache flush. */
861       next_flush.tv_sec = now.tv_sec + config_flush_interval;
862
863       /* Flush all values that haven't been written in the last
864        * `config_write_interval' seconds. */
865       flush_old_values (config_write_interval);
866
867       /* unlock the cache while we rotate so we don't block incoming
868        * updates if the fsync() blocks on disk I/O */
869       pthread_mutex_unlock(&cache_lock);
870       journal_rotate();
871       pthread_mutex_lock(&cache_lock);
872     }
873
874     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
875     if (status != 0 && status != ETIMEDOUT)
876     {
877       RRDD_LOG (LOG_ERR, "flush_thread_main: "
878                 "pthread_cond_timedwait returned %i.", status);
879     }
880   }
881
882   if (config_flush_at_shutdown)
883     flush_old_values (-1); /* flush everything */
884
885   state = SHUTDOWN;
886
887   pthread_mutex_unlock(&cache_lock);
888
889   return NULL;
890 } /* void *flush_thread_main */
891
892 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
893 {
894   pthread_mutex_lock (&cache_lock);
895
896   while (state != SHUTDOWN
897          || (cache_queue_head != NULL && config_flush_at_shutdown))
898   {
899     cache_item_t *ci;
900     char *file;
901     char **values;
902     size_t values_num;
903     int status;
904
905     /* Now, check if there's something to store away. If not, wait until
906      * something comes in. */
907     if (cache_queue_head == NULL)
908     {
909       status = pthread_cond_wait (&queue_cond, &cache_lock);
910       if ((status != 0) && (status != ETIMEDOUT))
911       {
912         RRDD_LOG (LOG_ERR, "queue_thread_main: "
913             "pthread_cond_wait returned %i.", status);
914       }
915     }
916
917     /* Check if a value has arrived. This may be NULL if we timed out or there
918      * was an interrupt such as a signal. */
919     if (cache_queue_head == NULL)
920       continue;
921
922     ci = cache_queue_head;
923
924     /* copy the relevant parts */
925     file = strdup (ci->file);
926     if (file == NULL)
927     {
928       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
929       continue;
930     }
931
932     assert(ci->values != NULL);
933     assert(ci->values_num > 0);
934
935     values = ci->values;
936     values_num = ci->values_num;
937
938     wipe_ci_values(ci, time(NULL));
939     remove_from_queue(ci);
940
941     pthread_mutex_unlock (&cache_lock);
942
943     rrd_clear_error ();
944     status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
945     if (status != 0)
946     {
947       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
948           "rrd_update_r (%s) failed with status %i. (%s)",
949           file, status, rrd_get_error());
950     }
951
952     journal_write("wrote", file);
953
954     /* Search again in the tree.  It's possible someone issued a "FORGET"
955      * while we were writing the update values. */
956     pthread_mutex_lock(&cache_lock);
957     ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
958     if (ci)
959       pthread_cond_broadcast(&ci->flushed);
960     pthread_mutex_unlock(&cache_lock);
961
962     if (status == 0)
963     {
964       pthread_mutex_lock (&stats_lock);
965       stats_updates_written++;
966       stats_data_sets_written += values_num;
967       pthread_mutex_unlock (&stats_lock);
968     }
969
970     rrd_free_ptrs((void ***) &values, &values_num);
971     free(file);
972
973     pthread_mutex_lock (&cache_lock);
974   }
975   pthread_mutex_unlock (&cache_lock);
976
977   return (NULL);
978 } /* }}} void *queue_thread_main */
979
980 static int buffer_get_field (char **buffer_ret, /* {{{ */
981     size_t *buffer_size_ret, char **field_ret)
982 {
983   char *buffer;
984   size_t buffer_pos;
985   size_t buffer_size;
986   char *field;
987   size_t field_size;
988   int status;
989
990   buffer = *buffer_ret;
991   buffer_pos = 0;
992   buffer_size = *buffer_size_ret;
993   field = *buffer_ret;
994   field_size = 0;
995
996   if (buffer_size <= 0)
997     return (-1);
998
999   /* This is ensured by `handle_request'. */
1000   assert (buffer[buffer_size - 1] == '\0');
1001
1002   status = -1;
1003   while (buffer_pos < buffer_size)
1004   {
1005     /* Check for end-of-field or end-of-buffer */
1006     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1007     {
1008       field[field_size] = 0;
1009       field_size++;
1010       buffer_pos++;
1011       status = 0;
1012       break;
1013     }
1014     /* Handle escaped characters. */
1015     else if (buffer[buffer_pos] == '\\')
1016     {
1017       if (buffer_pos >= (buffer_size - 1))
1018         break;
1019       buffer_pos++;
1020       field[field_size] = buffer[buffer_pos];
1021       field_size++;
1022       buffer_pos++;
1023     }
1024     /* Normal operation */ 
1025     else
1026     {
1027       field[field_size] = buffer[buffer_pos];
1028       field_size++;
1029       buffer_pos++;
1030     }
1031   } /* while (buffer_pos < buffer_size) */
1032
1033   if (status != 0)
1034     return (status);
1035
1036   *buffer_ret = buffer + buffer_pos;
1037   *buffer_size_ret = buffer_size - buffer_pos;
1038   *field_ret = field;
1039
1040   return (0);
1041 } /* }}} int buffer_get_field */
1042
1043 /* if we're restricting writes to the base directory,
1044  * check whether the file falls within the dir
1045  * returns 1 if OK, otherwise 0
1046  */
1047 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1048 {
1049   assert(file != NULL);
1050
1051   if (!config_write_base_only
1052       || JOURNAL_REPLAY(sock)
1053       || config_base_dir == NULL)
1054     return 1;
1055
1056   if (strstr(file, "../") != NULL) goto err;
1057
1058   /* relative paths without "../" are ok */
1059   if (*file != '/') return 1;
1060
1061   /* file must be of the format base + "/" + <1+ char filename> */
1062   if (strlen(file) < _config_base_dir_len + 2) goto err;
1063   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1064   if (*(file + _config_base_dir_len) != '/') goto err;
1065
1066   return 1;
1067
1068 err:
1069   if (sock != NULL && sock->fd >= 0)
1070     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1071
1072   return 0;
1073 } /* }}} static int check_file_access */
1074
1075 /* when using a base dir, convert relative paths to absolute paths.
1076  * if necessary, modifies the "filename" pointer to point
1077  * to the new path created in "tmp".  "tmp" is provided
1078  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1079  *
1080  * this allows us to optimize for the expected case (absolute path)
1081  * with a no-op.
1082  */
1083 static void get_abs_path(char **filename, char *tmp)
1084 {
1085   assert(tmp != NULL);
1086   assert(filename != NULL && *filename != NULL);
1087
1088   if (config_base_dir == NULL || **filename == '/')
1089     return;
1090
1091   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1092   *filename = tmp;
1093 } /* }}} static int get_abs_path */
1094
1095 static int flush_file (const char *filename) /* {{{ */
1096 {
1097   cache_item_t *ci;
1098
1099   pthread_mutex_lock (&cache_lock);
1100
1101   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1102   if (ci == NULL)
1103   {
1104     pthread_mutex_unlock (&cache_lock);
1105     return (ENOENT);
1106   }
1107
1108   if (ci->values_num > 0)
1109   {
1110     /* Enqueue at head */
1111     enqueue_cache_item (ci, HEAD);
1112     pthread_cond_wait(&ci->flushed, &cache_lock);
1113   }
1114
1115   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1116    * may have been purged during our cond_wait() */
1117
1118   pthread_mutex_unlock(&cache_lock);
1119
1120   return (0);
1121 } /* }}} int flush_file */
1122
1123 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1124 {
1125   char *err = "Syntax error.\n";
1126
1127   if (cmd && cmd->syntax)
1128     err = cmd->syntax;
1129
1130   return send_response(sock, RESP_ERR, "Usage: %s", err);
1131 } /* }}} static int syntax_error() */
1132
1133 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1134 {
1135   uint64_t copy_queue_length;
1136   uint64_t copy_updates_received;
1137   uint64_t copy_flush_received;
1138   uint64_t copy_updates_written;
1139   uint64_t copy_data_sets_written;
1140   uint64_t copy_journal_bytes;
1141   uint64_t copy_journal_rotate;
1142
1143   uint64_t tree_nodes_number;
1144   uint64_t tree_depth;
1145
1146   pthread_mutex_lock (&stats_lock);
1147   copy_queue_length       = stats_queue_length;
1148   copy_updates_received   = stats_updates_received;
1149   copy_flush_received     = stats_flush_received;
1150   copy_updates_written    = stats_updates_written;
1151   copy_data_sets_written  = stats_data_sets_written;
1152   copy_journal_bytes      = stats_journal_bytes;
1153   copy_journal_rotate     = stats_journal_rotate;
1154   pthread_mutex_unlock (&stats_lock);
1155
1156   pthread_mutex_lock (&cache_lock);
1157   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1158   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1159   pthread_mutex_unlock (&cache_lock);
1160
1161   add_response_info(sock,
1162                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1163   add_response_info(sock,
1164                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1165   add_response_info(sock,
1166                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1167   add_response_info(sock,
1168                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1169   add_response_info(sock,
1170                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1171   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1172   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1173   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1174   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1175
1176   send_response(sock, RESP_OK, "Statistics follow\n");
1177
1178   return (0);
1179 } /* }}} int handle_request_stats */
1180
1181 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1182 {
1183   char *file, file_tmp[PATH_MAX];
1184   int status;
1185
1186   status = buffer_get_field (&buffer, &buffer_size, &file);
1187   if (status != 0)
1188   {
1189     return syntax_error(sock,cmd);
1190   }
1191   else
1192   {
1193     pthread_mutex_lock(&stats_lock);
1194     stats_flush_received++;
1195     pthread_mutex_unlock(&stats_lock);
1196
1197     get_abs_path(&file, file_tmp);
1198     if (!check_file_access(file, sock)) return 0;
1199
1200     status = flush_file (file);
1201     if (status == 0)
1202       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1203     else if (status == ENOENT)
1204     {
1205       /* no file in our tree; see whether it exists at all */
1206       struct stat statbuf;
1207
1208       memset(&statbuf, 0, sizeof(statbuf));
1209       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1210         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1211       else
1212         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1213     }
1214     else if (status < 0)
1215       return send_response(sock, RESP_ERR, "Internal error.\n");
1216     else
1217       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1218   }
1219
1220   /* NOTREACHED */
1221   assert(1==0);
1222 } /* }}} int handle_request_flush */
1223
1224 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1225 {
1226   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1227
1228   pthread_mutex_lock(&cache_lock);
1229   flush_old_values(-1);
1230   pthread_mutex_unlock(&cache_lock);
1231
1232   return send_response(sock, RESP_OK, "Started flush.\n");
1233 } /* }}} static int handle_request_flushall */
1234
1235 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1236 {
1237   int status;
1238   char *file, file_tmp[PATH_MAX];
1239   cache_item_t *ci;
1240
1241   status = buffer_get_field(&buffer, &buffer_size, &file);
1242   if (status != 0)
1243     return syntax_error(sock,cmd);
1244
1245   get_abs_path(&file, file_tmp);
1246
1247   pthread_mutex_lock(&cache_lock);
1248   ci = g_tree_lookup(cache_tree, file);
1249   if (ci == NULL)
1250   {
1251     pthread_mutex_unlock(&cache_lock);
1252     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1253   }
1254
1255   for (size_t i=0; i < ci->values_num; i++)
1256     add_response_info(sock, "%s\n", ci->values[i]);
1257
1258   pthread_mutex_unlock(&cache_lock);
1259   return send_response(sock, RESP_OK, "updates pending\n");
1260 } /* }}} static int handle_request_pending */
1261
1262 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1263 {
1264   int status;
1265   gboolean found;
1266   char *file, file_tmp[PATH_MAX];
1267
1268   status = buffer_get_field(&buffer, &buffer_size, &file);
1269   if (status != 0)
1270     return syntax_error(sock,cmd);
1271
1272   get_abs_path(&file, file_tmp);
1273   if (!check_file_access(file, sock)) return 0;
1274
1275   pthread_mutex_lock(&cache_lock);
1276   found = g_tree_remove(cache_tree, file);
1277   pthread_mutex_unlock(&cache_lock);
1278
1279   if (found == TRUE)
1280   {
1281     if (!JOURNAL_REPLAY(sock))
1282       journal_write("forget", file);
1283
1284     return send_response(sock, RESP_OK, "Gone!\n");
1285   }
1286   else
1287     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1288
1289   /* NOTREACHED */
1290   assert(1==0);
1291 } /* }}} static int handle_request_forget */
1292
1293 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1294 {
1295   cache_item_t *ci;
1296
1297   pthread_mutex_lock(&cache_lock);
1298
1299   ci = cache_queue_head;
1300   while (ci != NULL)
1301   {
1302     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1303     ci = ci->next;
1304   }
1305
1306   pthread_mutex_unlock(&cache_lock);
1307
1308   return send_response(sock, RESP_OK, "in queue.\n");
1309 } /* }}} int handle_request_queue */
1310
1311 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1312 {
1313   char *file, file_tmp[PATH_MAX];
1314   int values_num = 0;
1315   int status;
1316   char orig_buf[CMD_MAX];
1317
1318   cache_item_t *ci;
1319
1320   /* save it for the journal later */
1321   if (!JOURNAL_REPLAY(sock))
1322     strncpy(orig_buf, buffer, buffer_size);
1323
1324   status = buffer_get_field (&buffer, &buffer_size, &file);
1325   if (status != 0)
1326     return syntax_error(sock,cmd);
1327
1328   pthread_mutex_lock(&stats_lock);
1329   stats_updates_received++;
1330   pthread_mutex_unlock(&stats_lock);
1331
1332   get_abs_path(&file, file_tmp);
1333   if (!check_file_access(file, sock)) return 0;
1334
1335   pthread_mutex_lock (&cache_lock);
1336   ci = g_tree_lookup (cache_tree, file);
1337
1338   if (ci == NULL) /* {{{ */
1339   {
1340     struct stat statbuf;
1341     cache_item_t *tmp;
1342
1343     /* don't hold the lock while we setup; stat(2) might block */
1344     pthread_mutex_unlock(&cache_lock);
1345
1346     memset (&statbuf, 0, sizeof (statbuf));
1347     status = stat (file, &statbuf);
1348     if (status != 0)
1349     {
1350       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1351
1352       status = errno;
1353       if (status == ENOENT)
1354         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1355       else
1356         return send_response(sock, RESP_ERR,
1357                              "stat failed with error %i.\n", status);
1358     }
1359     if (!S_ISREG (statbuf.st_mode))
1360       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1361
1362     if (access(file, R_OK|W_OK) != 0)
1363       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1364                            file, rrd_strerror(errno));
1365
1366     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1367     if (ci == NULL)
1368     {
1369       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1370
1371       return send_response(sock, RESP_ERR, "malloc failed.\n");
1372     }
1373     memset (ci, 0, sizeof (cache_item_t));
1374
1375     ci->file = strdup (file);
1376     if (ci->file == NULL)
1377     {
1378       free (ci);
1379       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1380
1381       return send_response(sock, RESP_ERR, "strdup failed.\n");
1382     }
1383
1384     wipe_ci_values(ci, now);
1385     ci->flags = CI_FLAGS_IN_TREE;
1386     pthread_cond_init(&ci->flushed, NULL);
1387
1388     pthread_mutex_lock(&cache_lock);
1389
1390     /* another UPDATE might have added this entry in the meantime */
1391     tmp = g_tree_lookup (cache_tree, file);
1392     if (tmp == NULL)
1393       g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1394     else
1395     {
1396       free_cache_item (ci);
1397       ci = tmp;
1398     }
1399
1400     /* state may have changed while we were unlocked */
1401     if (state == SHUTDOWN)
1402       return -1;
1403   } /* }}} */
1404   assert (ci != NULL);
1405
1406   /* don't re-write updates in replay mode */
1407   if (!JOURNAL_REPLAY(sock))
1408     journal_write("update", orig_buf);
1409
1410   while (buffer_size > 0)
1411   {
1412     char *value;
1413     time_t stamp;
1414     char *eostamp;
1415
1416     status = buffer_get_field (&buffer, &buffer_size, &value);
1417     if (status != 0)
1418     {
1419       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1420       break;
1421     }
1422
1423     /* make sure update time is always moving forward */
1424     stamp = strtol(value, &eostamp, 10);
1425     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1426     {
1427       pthread_mutex_unlock(&cache_lock);
1428       return send_response(sock, RESP_ERR,
1429                            "Cannot find timestamp in '%s'!\n", value);
1430     }
1431     else if (stamp <= ci->last_update_stamp)
1432     {
1433       pthread_mutex_unlock(&cache_lock);
1434       return send_response(sock, RESP_ERR,
1435                            "illegal attempt to update using time %ld when last"
1436                            " update time is %ld (minimum one second step)\n",
1437                            stamp, ci->last_update_stamp);
1438     }
1439     else
1440       ci->last_update_stamp = stamp;
1441
1442     if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
1443     {
1444       RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1445       continue;
1446     }
1447
1448     values_num++;
1449   }
1450
1451   if (((now - ci->last_flush_time) >= config_write_interval)
1452       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1453       && (ci->values_num > 0))
1454   {
1455     enqueue_cache_item (ci, TAIL);
1456   }
1457
1458   pthread_mutex_unlock (&cache_lock);
1459
1460   if (values_num < 1)
1461     return send_response(sock, RESP_ERR, "No values updated.\n");
1462   else
1463     return send_response(sock, RESP_OK,
1464                          "errors, enqueued %i value(s).\n", values_num);
1465
1466   /* NOTREACHED */
1467   assert(1==0);
1468
1469 } /* }}} int handle_request_update */
1470
1471 /* we came across a "WROTE" entry during journal replay.
1472  * throw away any values that we have accumulated for this file
1473  */
1474 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1475 {
1476   cache_item_t *ci;
1477   const char *file = buffer;
1478
1479   pthread_mutex_lock(&cache_lock);
1480
1481   ci = g_tree_lookup(cache_tree, file);
1482   if (ci == NULL)
1483   {
1484     pthread_mutex_unlock(&cache_lock);
1485     return (0);
1486   }
1487
1488   if (ci->values)
1489     rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1490
1491   wipe_ci_values(ci, now);
1492   remove_from_queue(ci);
1493
1494   pthread_mutex_unlock(&cache_lock);
1495   return (0);
1496 } /* }}} int handle_request_wrote */
1497
1498 /* start "BATCH" processing */
1499 static int batch_start (HANDLER_PROTO) /* {{{ */
1500 {
1501   int status;
1502   if (sock->batch_start)
1503     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1504
1505   status = send_response(sock, RESP_OK,
1506                          "Go ahead.  End with dot '.' on its own line.\n");
1507   sock->batch_start = time(NULL);
1508   sock->batch_cmd = 0;
1509
1510   return status;
1511 } /* }}} static int batch_start */
1512
1513 /* finish "BATCH" processing and return results to the client */
1514 static int batch_done (HANDLER_PROTO) /* {{{ */
1515 {
1516   assert(sock->batch_start);
1517   sock->batch_start = 0;
1518   sock->batch_cmd  = 0;
1519   return send_response(sock, RESP_OK, "errors\n");
1520 } /* }}} static int batch_done */
1521
1522 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1523 {
1524   return -1;
1525 } /* }}} static int handle_request_quit */
1526
1527 static command_t list_of_commands[] = { /* {{{ */
1528   {
1529     "UPDATE",
1530     handle_request_update,
1531     CMD_CONTEXT_ANY,
1532     "UPDATE <filename> <values> [<values> ...]\n"
1533     ,
1534     "Adds the given file to the internal cache if it is not yet known and\n"
1535     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1536     "for details.\n"
1537     "\n"
1538     "Each <values> has the following form:\n"
1539     "  <values> = <time>:<value>[:<value>[...]]\n"
1540     "See the rrdupdate(1) manpage for details.\n"
1541   },
1542   {
1543     "WROTE",
1544     handle_request_wrote,
1545     CMD_CONTEXT_JOURNAL,
1546     NULL,
1547     NULL
1548   },
1549   {
1550     "FLUSH",
1551     handle_request_flush,
1552     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1553     "FLUSH <filename>\n"
1554     ,
1555     "Adds the given filename to the head of the update queue and returns\n"
1556     "after it has been dequeued.\n"
1557   },
1558   {
1559     "FLUSHALL",
1560     handle_request_flushall,
1561     CMD_CONTEXT_CLIENT,
1562     "FLUSHALL\n"
1563     ,
1564     "Triggers writing of all pending updates.  Returns immediately.\n"
1565   },
1566   {
1567     "PENDING",
1568     handle_request_pending,
1569     CMD_CONTEXT_CLIENT,
1570     "PENDING <filename>\n"
1571     ,
1572     "Shows any 'pending' updates for a file, in order.\n"
1573     "The updates shown have not yet been written to the underlying RRD file.\n"
1574   },
1575   {
1576     "FORGET",
1577     handle_request_forget,
1578     CMD_CONTEXT_ANY,
1579     "FORGET <filename>\n"
1580     ,
1581     "Removes the file completely from the cache.\n"
1582     "Any pending updates for the file will be lost.\n"
1583   },
1584   {
1585     "QUEUE",
1586     handle_request_queue,
1587     CMD_CONTEXT_CLIENT,
1588     "QUEUE\n"
1589     ,
1590         "Shows all files in the output queue.\n"
1591     "The output is zero or more lines in the following format:\n"
1592     "(where <num_vals> is the number of values to be written)\n"
1593     "\n"
1594     "<num_vals> <filename>\n"
1595   },
1596   {
1597     "STATS",
1598     handle_request_stats,
1599     CMD_CONTEXT_CLIENT,
1600     "STATS\n"
1601     ,
1602     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1603     "a description of the values.\n"
1604   },
1605   {
1606     "HELP",
1607     handle_request_help,
1608     CMD_CONTEXT_CLIENT,
1609     "HELP [<command>]\n",
1610     NULL, /* special! */
1611   },
1612   {
1613     "BATCH",
1614     batch_start,
1615     CMD_CONTEXT_CLIENT,
1616     "BATCH\n"
1617     ,
1618     "The 'BATCH' command permits the client to initiate a bulk load\n"
1619     "   of commands to rrdcached.\n"
1620     "\n"
1621     "Usage:\n"
1622     "\n"
1623     "    client: BATCH\n"
1624     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1625     "    client: command #1\n"
1626     "    client: command #2\n"
1627     "    client: ... and so on\n"
1628     "    client: .\n"
1629     "    server: 2 errors\n"
1630     "    server: 7 message for command #7\n"
1631     "    server: 9 message for command #9\n"
1632     "\n"
1633     "For more information, consult the rrdcached(1) documentation.\n"
1634   },
1635   {
1636     ".",   /* BATCH terminator */
1637     batch_done,
1638     CMD_CONTEXT_BATCH,
1639     NULL,
1640     NULL
1641   },
1642   {
1643     "QUIT",
1644     handle_request_quit,
1645     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1646     "QUIT\n"
1647     ,
1648     "Disconnect from rrdcached.\n"
1649   }
1650 }; /* }}} command_t list_of_commands[] */
1651 static size_t list_of_commands_len = sizeof (list_of_commands)
1652   / sizeof (list_of_commands[0]);
1653
1654 static command_t *find_command(char *cmd)
1655 {
1656   size_t i;
1657
1658   for (i = 0; i < list_of_commands_len; i++)
1659     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1660       return (&list_of_commands[i]);
1661   return NULL;
1662 }
1663
1664 /* We currently use the index in the `list_of_commands' array as a bit position
1665  * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
1666  * outside these functions so that switching to a more elegant storage method
1667  * is easily possible. */
1668 static ssize_t find_command_index (const char *cmd) /* {{{ */
1669 {
1670   size_t i;
1671
1672   for (i = 0; i < list_of_commands_len; i++)
1673     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1674       return ((ssize_t) i);
1675   return (-1);
1676 } /* }}} ssize_t find_command_index */
1677
1678 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
1679     const char *cmd)
1680 {
1681   ssize_t i;
1682
1683   if (JOURNAL_REPLAY(sock))
1684     return (1);
1685
1686   if (cmd == NULL)
1687     return (-1);
1688
1689   if ((strcasecmp ("QUIT", cmd) == 0)
1690       || (strcasecmp ("HELP", cmd) == 0))
1691     return (1);
1692   else if (strcmp (".", cmd) == 0)
1693     cmd = "BATCH";
1694
1695   i = find_command_index (cmd);
1696   if (i < 0)
1697     return (-1);
1698   assert (i < 32);
1699
1700   if ((sock->permissions & (1 << i)) != 0)
1701     return (1);
1702   return (0);
1703 } /* }}} int socket_permission_check */
1704
1705 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
1706     const char *cmd)
1707 {
1708   ssize_t i;
1709
1710   i = find_command_index (cmd);
1711   if (i < 0)
1712     return (-1);
1713   assert (i < 32);
1714
1715   sock->permissions |= (1 << i);
1716   return (0);
1717 } /* }}} int socket_permission_add */
1718
1719 /* check whether commands are received in the expected context */
1720 static int command_check_context(listen_socket_t *sock, command_t *cmd)
1721 {
1722   if (JOURNAL_REPLAY(sock))
1723     return (cmd->context & CMD_CONTEXT_JOURNAL);
1724   else if (sock->batch_start)
1725     return (cmd->context & CMD_CONTEXT_BATCH);
1726   else
1727     return (cmd->context & CMD_CONTEXT_CLIENT);
1728
1729   /* NOTREACHED */
1730   assert(1==0);
1731 }
1732
1733 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1734 {
1735   int status;
1736   char *cmd_str;
1737   char *resp_txt;
1738   command_t *help = NULL;
1739
1740   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1741   if (status == 0)
1742     help = find_command(cmd_str);
1743
1744   if (help && (help->syntax || help->help))
1745   {
1746     char tmp[CMD_MAX];
1747
1748     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1749     resp_txt = tmp;
1750
1751     if (help->syntax)
1752       add_response_info(sock, "Usage: %s\n", help->syntax);
1753
1754     if (help->help)
1755       add_response_info(sock, "%s\n", help->help);
1756   }
1757   else
1758   {
1759     size_t i;
1760
1761     resp_txt = "Command overview\n";
1762
1763     for (i = 0; i < list_of_commands_len; i++)
1764     {
1765       if (list_of_commands[i].syntax == NULL)
1766         continue;
1767       add_response_info (sock, "%s", list_of_commands[i].syntax);
1768     }
1769   }
1770
1771   return send_response(sock, RESP_OK, resp_txt);
1772 } /* }}} int handle_request_help */
1773
1774 static int handle_request (DISPATCH_PROTO) /* {{{ */
1775 {
1776   char *buffer_ptr = buffer;
1777   char *cmd_str = NULL;
1778   command_t *cmd = NULL;
1779   int status;
1780
1781   assert (buffer[buffer_size - 1] == '\0');
1782
1783   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1784   if (status != 0)
1785   {
1786     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1787     return (-1);
1788   }
1789
1790   if (sock != NULL && sock->batch_start)
1791     sock->batch_cmd++;
1792
1793   cmd = find_command(cmd_str);
1794   if (!cmd)
1795     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1796
1797   if (!socket_permission_check (sock, cmd->cmd))
1798     return send_response(sock, RESP_ERR, "Permission denied.\n");
1799
1800   if (!command_check_context(sock, cmd))
1801     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1802
1803   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1804 } /* }}} int handle_request */
1805
1806 static void journal_set_free (journal_set *js) /* {{{ */
1807 {
1808   if (js == NULL)
1809     return;
1810
1811   rrd_free_ptrs((void ***) &js->files, &js->files_num);
1812
1813   free(js);
1814 } /* }}} journal_set_free */
1815
1816 static void journal_set_remove (journal_set *js) /* {{{ */
1817 {
1818   if (js == NULL)
1819     return;
1820
1821   for (uint i=0; i < js->files_num; i++)
1822   {
1823     RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
1824     unlink(js->files[i]);
1825   }
1826 } /* }}} journal_set_remove */
1827
1828 /* close current journal file handle.
1829  * MUST hold journal_lock before calling */
1830 static void journal_close(void) /* {{{ */
1831 {
1832   if (journal_fh != NULL)
1833   {
1834     if (fclose(journal_fh) != 0)
1835       RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
1836   }
1837
1838   journal_fh = NULL;
1839   journal_size = 0;
1840 } /* }}} journal_close */
1841
1842 /* MUST hold journal_lock before calling */
1843 static void journal_new_file(void) /* {{{ */
1844 {
1845   struct timeval now;
1846   int  new_fd;
1847   char new_file[PATH_MAX + 1];
1848
1849   assert(journal_dir != NULL);
1850   assert(journal_cur != NULL);
1851
1852   journal_close();
1853
1854   gettimeofday(&now, NULL);
1855   /* this format assures that the files sort in strcmp() order */
1856   snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
1857            journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
1858
1859   new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
1860                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1861   if (new_fd < 0)
1862     goto error;
1863
1864   journal_fh = fdopen(new_fd, "a");
1865   if (journal_fh == NULL)
1866     goto error;
1867
1868   journal_size = ftell(journal_fh);
1869   RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
1870
1871   /* record the file in the journal set */
1872   rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
1873
1874   return;
1875
1876 error:
1877   RRDD_LOG(LOG_CRIT,
1878            "JOURNALING DISABLED: Error while trying to create %s : %s",
1879            new_file, rrd_strerror(errno));
1880   RRDD_LOG(LOG_CRIT,
1881            "JOURNALING DISABLED: All values will be flushed at shutdown");
1882
1883   close(new_fd);
1884   config_flush_at_shutdown = 1;
1885
1886 } /* }}} journal_new_file */
1887
1888 /* MUST NOT hold journal_lock before calling this */
1889 static void journal_rotate(void) /* {{{ */
1890 {
1891   journal_set *old_js = NULL;
1892
1893   if (journal_dir == NULL)
1894     return;
1895
1896   RRDD_LOG(LOG_DEBUG, "rotating journals");
1897
1898   pthread_mutex_lock(&stats_lock);
1899   ++stats_journal_rotate;
1900   pthread_mutex_unlock(&stats_lock);
1901
1902   pthread_mutex_lock(&journal_lock);
1903
1904   journal_close();
1905
1906   /* rotate the journal sets */
1907   old_js = journal_old;
1908   journal_old = journal_cur;
1909   journal_cur = calloc(1, sizeof(journal_set));
1910
1911   if (journal_cur != NULL)
1912     journal_new_file();
1913   else
1914     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
1915
1916   pthread_mutex_unlock(&journal_lock);
1917
1918   journal_set_remove(old_js);
1919   journal_set_free  (old_js);
1920
1921 } /* }}} static void journal_rotate */
1922
1923 /* MUST hold journal_lock when calling */
1924 static void journal_done(void) /* {{{ */
1925 {
1926   if (journal_cur == NULL)
1927     return;
1928
1929   journal_close();
1930
1931   if (config_flush_at_shutdown)
1932   {
1933     RRDD_LOG(LOG_INFO, "removing journals");
1934     journal_set_remove(journal_old);
1935     journal_set_remove(journal_cur);
1936   }
1937   else
1938   {
1939     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1940              "journals will be used at next startup");
1941   }
1942
1943   journal_set_free(journal_cur);
1944   journal_set_free(journal_old);
1945   free(journal_dir);
1946
1947 } /* }}} static void journal_done */
1948
1949 static int journal_write(char *cmd, char *args) /* {{{ */
1950 {
1951   int chars;
1952
1953   if (journal_fh == NULL)
1954     return 0;
1955
1956   pthread_mutex_lock(&journal_lock);
1957   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1958   journal_size += chars;
1959
1960   if (journal_size > JOURNAL_MAX)
1961     journal_new_file();
1962
1963   pthread_mutex_unlock(&journal_lock);
1964
1965   if (chars > 0)
1966   {
1967     pthread_mutex_lock(&stats_lock);
1968     stats_journal_bytes += chars;
1969     pthread_mutex_unlock(&stats_lock);
1970   }
1971
1972   return chars;
1973 } /* }}} static int journal_write */
1974
1975 static int journal_replay (const char *file) /* {{{ */
1976 {
1977   FILE *fh;
1978   int entry_cnt = 0;
1979   int fail_cnt = 0;
1980   uint64_t line = 0;
1981   char entry[CMD_MAX];
1982   time_t now;
1983
1984   if (file == NULL) return 0;
1985
1986   {
1987     char *reason = "unknown error";
1988     int status = 0;
1989     struct stat statbuf;
1990
1991     memset(&statbuf, 0, sizeof(statbuf));
1992     if (stat(file, &statbuf) != 0)
1993     {
1994       reason = "stat error";
1995       status = errno;
1996     }
1997     else if (!S_ISREG(statbuf.st_mode))
1998     {
1999       reason = "not a regular file";
2000       status = EPERM;
2001     }
2002     if (statbuf.st_uid != daemon_uid)
2003     {
2004       reason = "not owned by daemon user";
2005       status = EACCES;
2006     }
2007     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2008     {
2009       reason = "must not be user/group writable";
2010       status = EACCES;
2011     }
2012
2013     if (status != 0)
2014     {
2015       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2016                file, rrd_strerror(status), reason);
2017       return 0;
2018     }
2019   }
2020
2021   fh = fopen(file, "r");
2022   if (fh == NULL)
2023   {
2024     if (errno != ENOENT)
2025       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2026                file, rrd_strerror(errno));
2027     return 0;
2028   }
2029   else
2030     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2031
2032   now = time(NULL);
2033
2034   while(!feof(fh))
2035   {
2036     size_t entry_len;
2037
2038     ++line;
2039     if (fgets(entry, sizeof(entry), fh) == NULL)
2040       break;
2041     entry_len = strlen(entry);
2042
2043     /* check \n termination in case journal writing crashed mid-line */
2044     if (entry_len == 0)
2045       continue;
2046     else if (entry[entry_len - 1] != '\n')
2047     {
2048       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2049       ++fail_cnt;
2050       continue;
2051     }
2052
2053     entry[entry_len - 1] = '\0';
2054
2055     if (handle_request(NULL, now, entry, entry_len) == 0)
2056       ++entry_cnt;
2057     else
2058       ++fail_cnt;
2059   }
2060
2061   fclose(fh);
2062
2063   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2064            entry_cnt, fail_cnt);
2065
2066   return entry_cnt > 0 ? 1 : 0;
2067 } /* }}} static int journal_replay */
2068
2069 static int journal_sort(const void *v1, const void *v2)
2070 {
2071   char **jn1 = (char **) v1;
2072   char **jn2 = (char **) v2;
2073
2074   return strcmp(*jn1,*jn2);
2075 }
2076
2077 static void journal_init(void) /* {{{ */
2078 {
2079   int had_journal = 0;
2080   DIR *dir;
2081   struct dirent *dent;
2082   char path[PATH_MAX+1];
2083
2084   if (journal_dir == NULL) return;
2085
2086   pthread_mutex_lock(&journal_lock);
2087
2088   journal_cur = calloc(1, sizeof(journal_set));
2089   if (journal_cur == NULL)
2090   {
2091     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2092     return;
2093   }
2094
2095   RRDD_LOG(LOG_INFO, "checking for journal files");
2096
2097   /* Handle old journal files during transition.  This gives them the
2098    * correct sort order.  TODO: remove after first release
2099    */
2100   {
2101     char old_path[PATH_MAX+1];
2102     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2103     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2104     rename(old_path, path);
2105
2106     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE        );
2107     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2108     rename(old_path, path);
2109   }
2110
2111   dir = opendir(journal_dir);
2112   while ((dent = readdir(dir)) != NULL)
2113   {
2114     /* looks like a journal file? */
2115     if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2116       continue;
2117
2118     snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2119
2120     if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2121     {
2122       RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2123                dent->d_name);
2124       break;
2125     }
2126   }
2127   closedir(dir);
2128
2129   qsort(journal_cur->files, journal_cur->files_num,
2130         sizeof(journal_cur->files[0]), journal_sort);
2131
2132   for (uint i=0; i < journal_cur->files_num; i++)
2133     had_journal += journal_replay(journal_cur->files[i]);
2134
2135   journal_new_file();
2136
2137   /* it must have been a crash.  start a flush */
2138   if (had_journal && config_flush_at_shutdown)
2139     flush_old_values(-1);
2140
2141   pthread_mutex_unlock(&journal_lock);
2142
2143   RRDD_LOG(LOG_INFO, "journal processing complete");
2144
2145 } /* }}} static void journal_init */
2146
2147 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2148 {
2149   assert(sock != NULL);
2150
2151   free(sock->rbuf);  sock->rbuf = NULL;
2152   free(sock->wbuf);  sock->wbuf = NULL;
2153   free(sock);
2154 } /* }}} void free_listen_socket */
2155
2156 static void close_connection(listen_socket_t *sock) /* {{{ */
2157 {
2158   if (sock->fd >= 0)
2159   {
2160     close(sock->fd);
2161     sock->fd = -1;
2162   }
2163
2164   free_listen_socket(sock);
2165
2166 } /* }}} void close_connection */
2167
2168 static void *connection_thread_main (void *args) /* {{{ */
2169 {
2170   listen_socket_t *sock;
2171   int fd;
2172
2173   sock = (listen_socket_t *) args;
2174   fd = sock->fd;
2175
2176   /* init read buffers */
2177   sock->next_read = sock->next_cmd = 0;
2178   sock->rbuf = malloc(RBUF_SIZE);
2179   if (sock->rbuf == NULL)
2180   {
2181     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2182     close_connection(sock);
2183     return NULL;
2184   }
2185
2186   pthread_mutex_lock (&connection_threads_lock);
2187   connection_threads_num++;
2188   pthread_mutex_unlock (&connection_threads_lock);
2189
2190   while (state == RUNNING)
2191   {
2192     char *cmd;
2193     ssize_t cmd_len;
2194     ssize_t rbytes;
2195     time_t now;
2196
2197     struct pollfd pollfd;
2198     int status;
2199
2200     pollfd.fd = fd;
2201     pollfd.events = POLLIN | POLLPRI;
2202     pollfd.revents = 0;
2203
2204     status = poll (&pollfd, 1, /* timeout = */ 500);
2205     if (state != RUNNING)
2206       break;
2207     else if (status == 0) /* timeout */
2208       continue;
2209     else if (status < 0) /* error */
2210     {
2211       status = errno;
2212       if (status != EINTR)
2213         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2214       continue;
2215     }
2216
2217     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2218       break;
2219     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2220     {
2221       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2222           "poll(2) returned something unexpected: %#04hx",
2223           pollfd.revents);
2224       break;
2225     }
2226
2227     rbytes = read(fd, sock->rbuf + sock->next_read,
2228                   RBUF_SIZE - sock->next_read);
2229     if (rbytes < 0)
2230     {
2231       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2232       break;
2233     }
2234     else if (rbytes == 0)
2235       break; /* eof */
2236
2237     sock->next_read += rbytes;
2238
2239     if (sock->batch_start)
2240       now = sock->batch_start;
2241     else
2242       now = time(NULL);
2243
2244     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2245     {
2246       status = handle_request (sock, now, cmd, cmd_len+1);
2247       if (status != 0)
2248         goto out_close;
2249     }
2250   }
2251
2252 out_close:
2253   close_connection(sock);
2254
2255   /* Remove this thread from the connection threads list */
2256   pthread_mutex_lock (&connection_threads_lock);
2257   connection_threads_num--;
2258   if (connection_threads_num <= 0)
2259     pthread_cond_broadcast(&connection_threads_done);
2260   pthread_mutex_unlock (&connection_threads_lock);
2261
2262   return (NULL);
2263 } /* }}} void *connection_thread_main */
2264
2265 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2266 {
2267   int fd;
2268   struct sockaddr_un sa;
2269   listen_socket_t *temp;
2270   int status;
2271   const char *path;
2272   char *path_copy, *dir;
2273
2274   path = sock->addr;
2275   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2276     path += strlen("unix:");
2277
2278   /* dirname may modify its argument */
2279   path_copy = strdup(path);
2280   if (path_copy == NULL)
2281   {
2282     fprintf(stderr, "rrdcached: strdup(): %s\n",
2283         rrd_strerror(errno));
2284     return (-1);
2285   }
2286
2287   dir = dirname(path_copy);
2288   if (rrd_mkdir_p(dir, 0777) != 0)
2289   {
2290     fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2291         dir, rrd_strerror(errno));
2292     return (-1);
2293   }
2294
2295   free(path_copy);
2296
2297   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2298       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2299   if (temp == NULL)
2300   {
2301     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2302     return (-1);
2303   }
2304   listen_fds = temp;
2305   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2306
2307   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2308   if (fd < 0)
2309   {
2310     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2311              rrd_strerror(errno));
2312     return (-1);
2313   }
2314
2315   memset (&sa, 0, sizeof (sa));
2316   sa.sun_family = AF_UNIX;
2317   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2318
2319   /* if we've gotten this far, we own the pid file.  any daemon started
2320    * with the same args must not be alive.  therefore, ensure that we can
2321    * create the socket...
2322    */
2323   unlink(path);
2324
2325   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2326   if (status != 0)
2327   {
2328     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2329              path, rrd_strerror(errno));
2330     close (fd);
2331     return (-1);
2332   }
2333
2334   /* tweak the sockets group ownership */
2335   if (sock->socket_group != (gid_t)-1)
2336   {
2337     if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2338          (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2339     {
2340       fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2341     }
2342   }
2343
2344   if (sock->socket_permissions != (mode_t)-1)
2345   {
2346     if (chmod(path, sock->socket_permissions) != 0)
2347       fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2348           (unsigned int)sock->socket_permissions, strerror(errno));
2349   }
2350
2351   status = listen (fd, /* backlog = */ 10);
2352   if (status != 0)
2353   {
2354     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2355              path, rrd_strerror(errno));
2356     close (fd);
2357     unlink (path);
2358     return (-1);
2359   }
2360
2361   listen_fds[listen_fds_num].fd = fd;
2362   listen_fds[listen_fds_num].family = PF_UNIX;
2363   strncpy(listen_fds[listen_fds_num].addr, path,
2364           sizeof (listen_fds[listen_fds_num].addr) - 1);
2365   listen_fds_num++;
2366
2367   return (0);
2368 } /* }}} int open_listen_socket_unix */
2369
2370 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2371 {
2372   struct addrinfo ai_hints;
2373   struct addrinfo *ai_res;
2374   struct addrinfo *ai_ptr;
2375   char addr_copy[NI_MAXHOST];
2376   char *addr;
2377   char *port;
2378   int status;
2379
2380   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2381   addr_copy[sizeof (addr_copy) - 1] = 0;
2382   addr = addr_copy;
2383
2384   memset (&ai_hints, 0, sizeof (ai_hints));
2385   ai_hints.ai_flags = 0;
2386 #ifdef AI_ADDRCONFIG
2387   ai_hints.ai_flags |= AI_ADDRCONFIG;
2388 #endif
2389   ai_hints.ai_family = AF_UNSPEC;
2390   ai_hints.ai_socktype = SOCK_STREAM;
2391
2392   port = NULL;
2393   if (*addr == '[') /* IPv6+port format */
2394   {
2395     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2396     addr++;
2397
2398     port = strchr (addr, ']');
2399     if (port == NULL)
2400     {
2401       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2402       return (-1);
2403     }
2404     *port = 0;
2405     port++;
2406
2407     if (*port == ':')
2408       port++;
2409     else if (*port == 0)
2410       port = NULL;
2411     else
2412     {
2413       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2414       return (-1);
2415     }
2416   } /* if (*addr == '[') */
2417   else
2418   {
2419     port = rindex(addr, ':');
2420     if (port != NULL)
2421     {
2422       *port = 0;
2423       port++;
2424     }
2425   }
2426   ai_res = NULL;
2427   status = getaddrinfo (addr,
2428                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2429                         &ai_hints, &ai_res);
2430   if (status != 0)
2431   {
2432     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2433              addr, gai_strerror (status));
2434     return (-1);
2435   }
2436
2437   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2438   {
2439     int fd;
2440     listen_socket_t *temp;
2441     int one = 1;
2442
2443     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2444         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2445     if (temp == NULL)
2446     {
2447       fprintf (stderr,
2448                "rrdcached: open_listen_socket_network: realloc failed.\n");
2449       continue;
2450     }
2451     listen_fds = temp;
2452     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2453
2454     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2455     if (fd < 0)
2456     {
2457       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2458                rrd_strerror(errno));
2459       continue;
2460     }
2461
2462     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2463
2464     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2465     if (status != 0)
2466     {
2467       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2468                sock->addr, rrd_strerror(errno));
2469       close (fd);
2470       continue;
2471     }
2472
2473     status = listen (fd, /* backlog = */ 10);
2474     if (status != 0)
2475     {
2476       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2477                sock->addr, rrd_strerror(errno));
2478       close (fd);
2479       freeaddrinfo(ai_res);
2480       return (-1);
2481     }
2482
2483     listen_fds[listen_fds_num].fd = fd;
2484     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2485     listen_fds_num++;
2486   } /* for (ai_ptr) */
2487
2488   freeaddrinfo(ai_res);
2489   return (0);
2490 } /* }}} static int open_listen_socket_network */
2491
2492 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2493 {
2494   assert(sock != NULL);
2495   assert(sock->addr != NULL);
2496
2497   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2498       || sock->addr[0] == '/')
2499     return (open_listen_socket_unix(sock));
2500   else
2501     return (open_listen_socket_network(sock));
2502 } /* }}} int open_listen_socket */
2503
2504 static int close_listen_sockets (void) /* {{{ */
2505 {
2506   size_t i;
2507
2508   for (i = 0; i < listen_fds_num; i++)
2509   {
2510     close (listen_fds[i].fd);
2511
2512     if (listen_fds[i].family == PF_UNIX)
2513       unlink(listen_fds[i].addr);
2514   }
2515
2516   free (listen_fds);
2517   listen_fds = NULL;
2518   listen_fds_num = 0;
2519
2520   return (0);
2521 } /* }}} int close_listen_sockets */
2522
2523 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2524 {
2525   struct pollfd *pollfds;
2526   int pollfds_num;
2527   int status;
2528   int i;
2529
2530   if (listen_fds_num < 1)
2531   {
2532     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2533     return (NULL);
2534   }
2535
2536   pollfds_num = listen_fds_num;
2537   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2538   if (pollfds == NULL)
2539   {
2540     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2541     return (NULL);
2542   }
2543   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2544
2545   RRDD_LOG(LOG_INFO, "listening for connections");
2546
2547   while (state == RUNNING)
2548   {
2549     for (i = 0; i < pollfds_num; i++)
2550     {
2551       pollfds[i].fd = listen_fds[i].fd;
2552       pollfds[i].events = POLLIN | POLLPRI;
2553       pollfds[i].revents = 0;
2554     }
2555
2556     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2557     if (state != RUNNING)
2558       break;
2559     else if (status == 0) /* timeout */
2560       continue;
2561     else if (status < 0) /* error */
2562     {
2563       status = errno;
2564       if (status != EINTR)
2565       {
2566         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2567       }
2568       continue;
2569     }
2570
2571     for (i = 0; i < pollfds_num; i++)
2572     {
2573       listen_socket_t *client_sock;
2574       struct sockaddr_storage client_sa;
2575       socklen_t client_sa_size;
2576       pthread_t tid;
2577       pthread_attr_t attr;
2578
2579       if (pollfds[i].revents == 0)
2580         continue;
2581
2582       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2583       {
2584         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2585             "poll(2) returned something unexpected for listen FD #%i.",
2586             pollfds[i].fd);
2587         continue;
2588       }
2589
2590       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2591       if (client_sock == NULL)
2592       {
2593         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2594         continue;
2595       }
2596       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2597
2598       client_sa_size = sizeof (client_sa);
2599       client_sock->fd = accept (pollfds[i].fd,
2600           (struct sockaddr *) &client_sa, &client_sa_size);
2601       if (client_sock->fd < 0)
2602       {
2603         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2604         free(client_sock);
2605         continue;
2606       }
2607
2608       pthread_attr_init (&attr);
2609       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2610
2611       status = pthread_create (&tid, &attr, connection_thread_main,
2612                                client_sock);
2613       if (status != 0)
2614       {
2615         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2616         close_connection(client_sock);
2617         continue;
2618       }
2619     } /* for (pollfds_num) */
2620   } /* while (state == RUNNING) */
2621
2622   RRDD_LOG(LOG_INFO, "starting shutdown");
2623
2624   close_listen_sockets ();
2625
2626   pthread_mutex_lock (&connection_threads_lock);
2627   while (connection_threads_num > 0)
2628     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2629   pthread_mutex_unlock (&connection_threads_lock);
2630
2631   free(pollfds);
2632
2633   return (NULL);
2634 } /* }}} void *listen_thread_main */
2635
2636 static int daemonize (void) /* {{{ */
2637 {
2638   int pid_fd;
2639   char *base_dir;
2640
2641   daemon_uid = geteuid();
2642
2643   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2644   if (pid_fd < 0)
2645     pid_fd = check_pidfile();
2646   if (pid_fd < 0)
2647     return pid_fd;
2648
2649   /* open all the listen sockets */
2650   if (config_listen_address_list_len > 0)
2651   {
2652     for (size_t i = 0; i < config_listen_address_list_len; i++)
2653       open_listen_socket (config_listen_address_list[i]);
2654
2655     rrd_free_ptrs((void ***) &config_listen_address_list,
2656                   &config_listen_address_list_len);
2657   }
2658   else
2659   {
2660     listen_socket_t sock;
2661     memset(&sock, 0, sizeof(sock));
2662     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2663     open_listen_socket (&sock);
2664   }
2665
2666   if (listen_fds_num < 1)
2667   {
2668     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2669     goto error;
2670   }
2671
2672   if (!stay_foreground)
2673   {
2674     pid_t child;
2675
2676     child = fork ();
2677     if (child < 0)
2678     {
2679       fprintf (stderr, "daemonize: fork(2) failed.\n");
2680       goto error;
2681     }
2682     else if (child > 0)
2683       exit(0);
2684
2685     /* Become session leader */
2686     setsid ();
2687
2688     /* Open the first three file descriptors to /dev/null */
2689     close (2);
2690     close (1);
2691     close (0);
2692
2693     open ("/dev/null", O_RDWR);
2694     if (dup(0) == -1 || dup(0) == -1){
2695         RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2696     }
2697   } /* if (!stay_foreground) */
2698
2699   /* Change into the /tmp directory. */
2700   base_dir = (config_base_dir != NULL)
2701     ? config_base_dir
2702     : "/tmp";
2703
2704   if (chdir (base_dir) != 0)
2705   {
2706     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2707     goto error;
2708   }
2709
2710   install_signal_handlers();
2711
2712   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2713   RRDD_LOG(LOG_INFO, "starting up");
2714
2715   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2716                                 (GDestroyNotify) free_cache_item);
2717   if (cache_tree == NULL)
2718   {
2719     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2720     goto error;
2721   }
2722
2723   return write_pidfile (pid_fd);
2724
2725 error:
2726   remove_pidfile();
2727   return -1;
2728 } /* }}} int daemonize */
2729
2730 static int cleanup (void) /* {{{ */
2731 {
2732   pthread_cond_broadcast (&flush_cond);
2733   pthread_join (flush_thread, NULL);
2734
2735   pthread_cond_broadcast (&queue_cond);
2736   for (int i = 0; i < config_queue_threads; i++)
2737     pthread_join (queue_threads[i], NULL);
2738
2739   if (config_flush_at_shutdown)
2740   {
2741     assert(cache_queue_head == NULL);
2742     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2743   }
2744
2745   free(queue_threads);
2746   free(config_base_dir);
2747
2748   pthread_mutex_lock(&cache_lock);
2749   g_tree_destroy(cache_tree);
2750
2751   pthread_mutex_lock(&journal_lock);
2752   journal_done();
2753
2754   RRDD_LOG(LOG_INFO, "goodbye");
2755   closelog ();
2756
2757   remove_pidfile ();
2758   free(config_pid_file);
2759
2760   return (0);
2761 } /* }}} int cleanup */
2762
2763 static int read_options (int argc, char **argv) /* {{{ */
2764 {
2765   int option;
2766   int status = 0;
2767
2768   char **permissions = NULL;
2769   size_t permissions_len = 0;
2770
2771   gid_t  socket_group = (gid_t)-1;
2772   mode_t socket_permissions = (mode_t)-1;
2773
2774   while ((option = getopt(argc, argv, "gl:s:m:P:f:w:z:t:Bb:p:Fj:h?")) != -1)
2775   {
2776     switch (option)
2777     {
2778       case 'g':
2779         stay_foreground=1;
2780         break;
2781
2782       case 'l':
2783       {
2784         listen_socket_t *new;
2785
2786         new = malloc(sizeof(listen_socket_t));
2787         if (new == NULL)
2788         {
2789           fprintf(stderr, "read_options: malloc failed.\n");
2790           return(2);
2791         }
2792         memset(new, 0, sizeof(listen_socket_t));
2793
2794         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2795
2796         /* Add permissions to the socket {{{ */
2797         if (permissions_len != 0)
2798         {
2799           size_t i;
2800           for (i = 0; i < permissions_len; i++)
2801           {
2802             status = socket_permission_add (new, permissions[i]);
2803             if (status != 0)
2804             {
2805               fprintf (stderr, "read_options: Adding permission \"%s\" to "
2806                   "socket failed. Most likely, this permission doesn't "
2807                   "exist. Check your command line.\n", permissions[i]);
2808               status = 4;
2809             }
2810           }
2811         }
2812         else /* if (permissions_len == 0) */
2813         {
2814           /* Add permission for ALL commands to the socket. */
2815           size_t i;
2816           for (i = 0; i < list_of_commands_len; i++)
2817           {
2818             status = socket_permission_add (new, list_of_commands[i].cmd);
2819             if (status != 0)
2820             {
2821               fprintf (stderr, "read_options: Adding permission \"%s\" to "
2822                   "socket failed. This should never happen, ever! Sorry.\n",
2823                   permissions[i]);
2824               status = 4;
2825             }
2826           }
2827         }
2828         /* }}} Done adding permissions. */
2829
2830         new->socket_group = socket_group;
2831         new->socket_permissions = socket_permissions;
2832
2833         if (!rrd_add_ptr((void ***)&config_listen_address_list,
2834                          &config_listen_address_list_len, new))
2835         {
2836           fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2837           return (2);
2838         }
2839       }
2840       break;
2841
2842       /* set socket group permissions */
2843       case 's':
2844       {
2845         gid_t group_gid;
2846         struct group *grp;
2847
2848         group_gid = strtoul(optarg, NULL, 10);
2849         if (errno != EINVAL && group_gid>0)
2850         {
2851           /* we were passed a number */
2852           grp = getgrgid(group_gid);
2853         }
2854         else
2855         {
2856           grp = getgrnam(optarg);
2857         }
2858
2859         if (grp)
2860         {
2861           socket_group = grp->gr_gid;
2862         }
2863         else
2864         {
2865           /* no idea what the user wanted... */
2866           fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
2867           return (5);
2868         }
2869       }
2870       break;
2871
2872       /* set socket file permissions */
2873       case 'm':
2874       {
2875         long  tmp;
2876         char *endptr = NULL;
2877
2878         tmp = strtol (optarg, &endptr, 8);
2879         if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
2880             || (tmp > 07777) || (tmp < 0)) {
2881           fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
2882               optarg);
2883           return (5);
2884         }
2885
2886         socket_permissions = (mode_t)tmp;
2887       }
2888       break;
2889
2890       case 'P':
2891       {
2892         char *optcopy;
2893         char *saveptr;
2894         char *dummy;
2895         char *ptr;
2896
2897         rrd_free_ptrs ((void *) &permissions, &permissions_len);
2898
2899         optcopy = strdup (optarg);
2900         dummy = optcopy;
2901         saveptr = NULL;
2902         while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
2903         {
2904           dummy = NULL;
2905           rrd_add_strdup ((void *) &permissions, &permissions_len, ptr);
2906         }
2907
2908         free (optcopy);
2909       }
2910       break;
2911
2912       case 'f':
2913       {
2914         int temp;
2915
2916         temp = atoi (optarg);
2917         if (temp > 0)
2918           config_flush_interval = temp;
2919         else
2920         {
2921           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2922           status = 3;
2923         }
2924       }
2925       break;
2926
2927       case 'w':
2928       {
2929         int temp;
2930
2931         temp = atoi (optarg);
2932         if (temp > 0)
2933           config_write_interval = temp;
2934         else
2935         {
2936           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2937           status = 2;
2938         }
2939       }
2940       break;
2941
2942       case 'z':
2943       {
2944         int temp;
2945
2946         temp = atoi(optarg);
2947         if (temp > 0)
2948           config_write_jitter = temp;
2949         else
2950         {
2951           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2952           status = 2;
2953         }
2954
2955         break;
2956       }
2957
2958       case 't':
2959       {
2960         int threads;
2961         threads = atoi(optarg);
2962         if (threads >= 1)
2963           config_queue_threads = threads;
2964         else
2965         {
2966           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2967           return 1;
2968         }
2969       }
2970       break;
2971
2972       case 'B':
2973         config_write_base_only = 1;
2974         break;
2975
2976       case 'b':
2977       {
2978         size_t len;
2979         char base_realpath[PATH_MAX];
2980
2981         if (config_base_dir != NULL)
2982           free (config_base_dir);
2983         config_base_dir = strdup (optarg);
2984         if (config_base_dir == NULL)
2985         {
2986           fprintf (stderr, "read_options: strdup failed.\n");
2987           return (3);
2988         }
2989
2990         if (rrd_mkdir_p (config_base_dir, 0777) != 0)
2991         {
2992           fprintf (stderr, "Failed to create base directory '%s': %s\n",
2993               config_base_dir, rrd_strerror (errno));
2994           return (3);
2995         }
2996
2997         /* make sure that the base directory is not resolved via
2998          * symbolic links.  this makes some performance-enhancing
2999          * assumptions possible (we don't have to resolve paths
3000          * that start with a "/")
3001          */
3002         if (realpath(config_base_dir, base_realpath) == NULL)
3003         {
3004           fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3005               "%s\n", config_base_dir, rrd_strerror(errno));
3006           return 5;
3007         }
3008
3009         len = strlen (config_base_dir);
3010         while ((len > 0) && (config_base_dir[len - 1] == '/'))
3011         {
3012           config_base_dir[len - 1] = 0;
3013           len--;
3014         }
3015
3016         if (len < 1)
3017         {
3018           fprintf (stderr, "Invalid base directory: %s\n", optarg);
3019           return (4);
3020         }
3021
3022         _config_base_dir_len = len;
3023
3024         len = strlen (base_realpath);
3025         while ((len > 0) && (base_realpath[len - 1] == '/'))
3026         {
3027           base_realpath[len - 1] = '\0';
3028           len--;
3029         }
3030
3031         if (strncmp(config_base_dir,
3032                          base_realpath, sizeof(base_realpath)) != 0)
3033         {
3034           fprintf(stderr,
3035                   "Base directory (-b) resolved via file system links!\n"
3036                   "Please consult rrdcached '-b' documentation!\n"
3037                   "Consider specifying the real directory (%s)\n",
3038                   base_realpath);
3039           return 5;
3040         }
3041       }
3042       break;
3043
3044       case 'p':
3045       {
3046         if (config_pid_file != NULL)
3047           free (config_pid_file);
3048         config_pid_file = strdup (optarg);
3049         if (config_pid_file == NULL)
3050         {
3051           fprintf (stderr, "read_options: strdup failed.\n");
3052           return (3);
3053         }
3054       }
3055       break;
3056
3057       case 'F':
3058         config_flush_at_shutdown = 1;
3059         break;
3060
3061       case 'j':
3062       {
3063         const char *dir = journal_dir = strdup(optarg);
3064
3065         status = rrd_mkdir_p(dir, 0777);
3066         if (status != 0)
3067         {
3068           fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3069               dir, rrd_strerror(errno));
3070           return 6;
3071         }
3072
3073         if (access(dir, R_OK|W_OK|X_OK) != 0)
3074         {
3075           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3076                   errno ? rrd_strerror(errno) : "");
3077           return 6;
3078         }
3079       }
3080       break;
3081
3082       case 'h':
3083       case '?':
3084         printf ("RRDCacheD %s\n"
3085             "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3086             "\n"
3087             "Usage: rrdcached [options]\n"
3088             "\n"
3089             "Valid options are:\n"
3090             "  -l <address>  Socket address to listen to.\n"
3091             "  -P <perms>    Sets the permissions to assign to all following "
3092                             "sockets\n"
3093             "  -w <seconds>  Interval in which to write data.\n"
3094             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
3095             "  -t <threads>  Number of write threads.\n"
3096             "  -f <seconds>  Interval in which to flush dead data.\n"
3097             "  -p <file>     Location of the PID-file.\n"
3098             "  -b <dir>      Base directory to change to.\n"
3099             "  -B            Restrict file access to paths within -b <dir>\n"
3100             "  -g            Do not fork and run in the foreground.\n"
3101             "  -j <dir>      Directory in which to create the journal files.\n"
3102             "  -F            Always flush all updates at shutdown\n"
3103             "  -s <id|name>  Make socket g+rw to named group\n"
3104             "\n"
3105             "For more information and a detailed description of all options "
3106             "please refer\n"
3107             "to the rrdcached(1) manual page.\n",
3108             VERSION);
3109         status = -1;
3110         break;
3111     } /* switch (option) */
3112   } /* while (getopt) */
3113
3114   /* advise the user when values are not sane */
3115   if (config_flush_interval < 2 * config_write_interval)
3116     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3117             " 2x write interval (-w) !\n");
3118   if (config_write_jitter > config_write_interval)
3119     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3120             " write interval (-w) !\n");
3121
3122   if (config_write_base_only && config_base_dir == NULL)
3123     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3124             "  Consult the rrdcached documentation\n");
3125
3126   if (journal_dir == NULL)
3127     config_flush_at_shutdown = 1;
3128
3129   rrd_free_ptrs ((void *) &permissions, &permissions_len);
3130
3131   return (status);
3132 } /* }}} int read_options */
3133
3134 int main (int argc, char **argv)
3135 {
3136   int status;
3137
3138   status = read_options (argc, argv);
3139   if (status != 0)
3140   {
3141     if (status < 0)
3142       status = 0;
3143     return (status);
3144   }
3145
3146   status = daemonize ();
3147   if (status != 0)
3148   {
3149     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3150     return (1);
3151   }
3152
3153   journal_init();
3154
3155   /* start the queue threads */
3156   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3157   if (queue_threads == NULL)
3158   {
3159     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3160     cleanup();
3161     return (1);
3162   }
3163   for (int i = 0; i < config_queue_threads; i++)
3164   {
3165     memset (&queue_threads[i], 0, sizeof (*queue_threads));
3166     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3167     if (status != 0)
3168     {
3169       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3170       cleanup();
3171       return (1);
3172     }
3173   }
3174
3175   /* start the flush thread */
3176   memset(&flush_thread, 0, sizeof(flush_thread));
3177   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3178   if (status != 0)
3179   {
3180     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3181     cleanup();
3182     return (1);
3183   }
3184
3185   listen_thread_main (NULL);
3186   cleanup ();
3187
3188   return (0);
3189 } /* int main */
3190
3191 /*
3192  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3193  */