rrd_daemon handle_request_fetch needs to convert to absolute path -- Thorsten von...
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008-2010 Florian octo Forster
4  * Copyright (C) 2008,2009 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
73
74 #include "rrd.h"
75 #include "rrd_client.h"
76 #include "unused.h"
77
78 #include <stdlib.h>
79
80 #ifndef WIN32
81 #ifdef HAVE_STDINT_H
82 #  include <stdint.h>
83 #endif
84 #include <unistd.h>
85 #include <strings.h>
86 #include <inttypes.h>
87 #include <sys/socket.h>
88
89 #else
90
91 #endif
92 #include <stdio.h>
93 #include <string.h>
94
95 #include <sys/types.h>
96 #include <sys/stat.h>
97 #include <dirent.h>
98 #include <fcntl.h>
99 #include <signal.h>
100 #include <sys/un.h>
101 #include <netdb.h>
102 #include <poll.h>
103 #include <syslog.h>
104 #include <pthread.h>
105 #include <errno.h>
106 #include <assert.h>
107 #include <sys/time.h>
108 #include <time.h>
109 #include <libgen.h>
110 #include <grp.h>
111
112 #include <glib-2.0/glib.h>
113 /* }}} */
114
115 #define RRDD_LOG(severity, ...) \
116   do { \
117     if (stay_foreground) \
118       fprintf(stderr, __VA_ARGS__); \
119     syslog ((severity), __VA_ARGS__); \
120   } while (0)
121
122 /*
123  * Types
124  */
125 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
126
127 struct listen_socket_s
128 {
129   int fd;
130   char addr[PATH_MAX + 1];
131   int family;
132
133   /* state for BATCH processing */
134   time_t batch_start;
135   int batch_cmd;
136
137   /* buffered IO */
138   char *rbuf;
139   off_t next_cmd;
140   off_t next_read;
141
142   char *wbuf;
143   ssize_t wbuf_len;
144
145   uint32_t permissions;
146
147   gid_t  socket_group;
148   mode_t socket_permissions;
149 };
150 typedef struct listen_socket_s listen_socket_t;
151
152 struct command_s;
153 typedef struct command_s command_t;
154 /* note: guard against "unused" warnings in the handlers */
155 #define DISPATCH_PROTO  listen_socket_t UNUSED(*sock),\
156                         time_t UNUSED(now),\
157                         char  UNUSED(*buffer),\
158                         size_t UNUSED(buffer_size)
159
160 #define HANDLER_PROTO   command_t UNUSED(*cmd),\
161                         DISPATCH_PROTO
162
163 struct command_s {
164   char   *cmd;
165   int (*handler)(HANDLER_PROTO);
166
167   char  context;                /* where we expect to see it */
168 #define CMD_CONTEXT_CLIENT      (1<<0)
169 #define CMD_CONTEXT_BATCH       (1<<1)
170 #define CMD_CONTEXT_JOURNAL     (1<<2)
171 #define CMD_CONTEXT_ANY         (0x7f)
172
173   char *syntax;
174   char *help;
175 };
176
177 struct cache_item_s;
178 typedef struct cache_item_s cache_item_t;
179 struct cache_item_s
180 {
181   char *file;
182   char **values;
183   size_t values_num;            /* number of valid pointers */
184   size_t values_alloc;          /* number of allocated pointers */
185   time_t last_flush_time;
186   time_t last_update_stamp;
187 #define CI_FLAGS_IN_TREE  (1<<0)
188 #define CI_FLAGS_IN_QUEUE (1<<1)
189   int flags;
190   pthread_cond_t  flushed;
191   cache_item_t *prev;
192   cache_item_t *next;
193 };
194
195 struct callback_flush_data_s
196 {
197   time_t now;
198   time_t abs_timeout;
199   char **keys;
200   size_t keys_num;
201 };
202 typedef struct callback_flush_data_s callback_flush_data_t;
203
204 enum queue_side_e
205 {
206   HEAD,
207   TAIL
208 };
209 typedef enum queue_side_e queue_side_t;
210
211 /* describe a set of journal files */
212 typedef struct {
213   char **files;
214   size_t files_num;
215 } journal_set;
216
217 /* max length of socket command or response */
218 #define CMD_MAX 4096
219 #define RBUF_SIZE (CMD_MAX*2)
220
221 /*
222  * Variables
223  */
224 static int stay_foreground = 0;
225 static uid_t daemon_uid;
226
227 static listen_socket_t *listen_fds = NULL;
228 static size_t listen_fds_num = 0;
229
230 enum {
231   RUNNING,              /* normal operation */
232   FLUSHING,             /* flushing remaining values */
233   SHUTDOWN              /* shutting down */
234 } state = RUNNING;
235
236 static pthread_t *queue_threads;
237 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
238 static int config_queue_threads = 4;
239
240 static pthread_t flush_thread;
241 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
242
243 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
244 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
245 static int connection_threads_num = 0;
246
247 /* Cache stuff */
248 static GTree          *cache_tree = NULL;
249 static cache_item_t   *cache_queue_head = NULL;
250 static cache_item_t   *cache_queue_tail = NULL;
251 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
252
253 static int config_write_interval = 300;
254 static int config_write_jitter   = 0;
255 static int config_flush_interval = 3600;
256 static int config_flush_at_shutdown = 0;
257 static char *config_pid_file = NULL;
258 static char *config_base_dir = NULL;
259 static size_t _config_base_dir_len = 0;
260 static int config_write_base_only = 0;
261 static size_t config_alloc_chunk = 1;
262
263 static listen_socket_t **config_listen_address_list = NULL;
264 static size_t config_listen_address_list_len = 0;
265
266 static uint64_t stats_queue_length = 0;
267 static uint64_t stats_updates_received = 0;
268 static uint64_t stats_flush_received = 0;
269 static uint64_t stats_updates_written = 0;
270 static uint64_t stats_data_sets_written = 0;
271 static uint64_t stats_journal_bytes = 0;
272 static uint64_t stats_journal_rotate = 0;
273 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
274
275 /* Journaled updates */
276 #define JOURNAL_REPLAY(s) ((s) == NULL)
277 #define JOURNAL_BASE "rrd.journal"
278 static journal_set *journal_cur = NULL;
279 static journal_set *journal_old = NULL;
280 static char *journal_dir = NULL;
281 static FILE *journal_fh = NULL;         /* current journal file handle */
282 static long  journal_size = 0;          /* current journal size */
283 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
284 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
285 static int journal_write(char *cmd, char *args);
286 static void journal_done(void);
287 static void journal_rotate(void);
288
289 /* prototypes for forward refernces */
290 static int handle_request_help (HANDLER_PROTO);
291
292 /* 
293  * Functions
294  */
295 static void sig_common (const char *sig) /* {{{ */
296 {
297   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
298   state = FLUSHING;
299   pthread_cond_broadcast(&flush_cond);
300   pthread_cond_broadcast(&queue_cond);
301 } /* }}} void sig_common */
302
303 static void sig_int_handler (int UNUSED(s)) /* {{{ */
304 {
305   sig_common("INT");
306 } /* }}} void sig_int_handler */
307
308 static void sig_term_handler (int UNUSED(s)) /* {{{ */
309 {
310   sig_common("TERM");
311 } /* }}} void sig_term_handler */
312
313 static void sig_usr1_handler (int UNUSED(s)) /* {{{ */
314 {
315   config_flush_at_shutdown = 1;
316   sig_common("USR1");
317 } /* }}} void sig_usr1_handler */
318
319 static void sig_usr2_handler (int UNUSED(s)) /* {{{ */
320 {
321   config_flush_at_shutdown = 0;
322   sig_common("USR2");
323 } /* }}} void sig_usr2_handler */
324
325 static void install_signal_handlers(void) /* {{{ */
326 {
327   /* These structures are static, because `sigaction' behaves weird if the are
328    * overwritten.. */
329   static struct sigaction sa_int;
330   static struct sigaction sa_term;
331   static struct sigaction sa_pipe;
332   static struct sigaction sa_usr1;
333   static struct sigaction sa_usr2;
334
335   /* Install signal handlers */
336   memset (&sa_int, 0, sizeof (sa_int));
337   sa_int.sa_handler = sig_int_handler;
338   sigaction (SIGINT, &sa_int, NULL);
339
340   memset (&sa_term, 0, sizeof (sa_term));
341   sa_term.sa_handler = sig_term_handler;
342   sigaction (SIGTERM, &sa_term, NULL);
343
344   memset (&sa_pipe, 0, sizeof (sa_pipe));
345   sa_pipe.sa_handler = SIG_IGN;
346   sigaction (SIGPIPE, &sa_pipe, NULL);
347
348   memset (&sa_pipe, 0, sizeof (sa_usr1));
349   sa_usr1.sa_handler = sig_usr1_handler;
350   sigaction (SIGUSR1, &sa_usr1, NULL);
351
352   memset (&sa_usr2, 0, sizeof (sa_usr2));
353   sa_usr2.sa_handler = sig_usr2_handler;
354   sigaction (SIGUSR2, &sa_usr2, NULL);
355
356 } /* }}} void install_signal_handlers */
357
358 static int open_pidfile(char *action, int oflag) /* {{{ */
359 {
360   int fd;
361   const char *file;
362   char *file_copy, *dir;
363
364   file = (config_pid_file != NULL)
365     ? config_pid_file
366     : LOCALSTATEDIR "/run/rrdcached.pid";
367
368   /* dirname may modify its argument */
369   file_copy = strdup(file);
370   if (file_copy == NULL)
371   {
372     fprintf(stderr, "rrdcached: strdup(): %s\n",
373         rrd_strerror(errno));
374     return -1;
375   }
376
377   dir = dirname(file_copy);
378   if (rrd_mkdir_p(dir, 0777) != 0)
379   {
380     fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
381         dir, rrd_strerror(errno));
382     return -1;
383   }
384
385   free(file_copy);
386
387   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
388   if (fd < 0)
389     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
390             action, file, rrd_strerror(errno));
391
392   return(fd);
393 } /* }}} static int open_pidfile */
394
395 /* check existing pid file to see whether a daemon is running */
396 static int check_pidfile(void)
397 {
398   int pid_fd;
399   pid_t pid;
400   char pid_str[16];
401
402   pid_fd = open_pidfile("open", O_RDWR);
403   if (pid_fd < 0)
404     return pid_fd;
405
406   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
407     return -1;
408
409   pid = atoi(pid_str);
410   if (pid <= 0)
411     return -1;
412
413   /* another running process that we can signal COULD be
414    * a competing rrdcached */
415   if (pid != getpid() && kill(pid, 0) == 0)
416   {
417     fprintf(stderr,
418             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
419     close(pid_fd);
420     return -1;
421   }
422
423   lseek(pid_fd, 0, SEEK_SET);
424   if (ftruncate(pid_fd, 0) == -1)
425   {
426     fprintf(stderr,
427             "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
428     close(pid_fd);
429     return -1;
430   }
431
432   fprintf(stderr,
433           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
434           "rrdcached: starting normally.\n", pid);
435
436   return pid_fd;
437 } /* }}} static int check_pidfile */
438
439 static int write_pidfile (int fd) /* {{{ */
440 {
441   pid_t pid;
442   FILE *fh;
443
444   pid = getpid ();
445
446   fh = fdopen (fd, "w");
447   if (fh == NULL)
448   {
449     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
450     close(fd);
451     return (-1);
452   }
453
454   fprintf (fh, "%i\n", (int) pid);
455   fclose (fh);
456
457   return (0);
458 } /* }}} int write_pidfile */
459
460 static int remove_pidfile (void) /* {{{ */
461 {
462   char *file;
463   int status;
464
465   file = (config_pid_file != NULL)
466     ? config_pid_file
467     : LOCALSTATEDIR "/run/rrdcached.pid";
468
469   status = unlink (file);
470   if (status == 0)
471     return (0);
472   return (errno);
473 } /* }}} int remove_pidfile */
474
475 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
476 {
477   char *eol;
478
479   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
480                sock->next_read - sock->next_cmd);
481
482   if (eol == NULL)
483   {
484     /* no commands left, move remainder back to front of rbuf */
485     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
486             sock->next_read - sock->next_cmd);
487     sock->next_read -= sock->next_cmd;
488     sock->next_cmd = 0;
489     *len = 0;
490     return NULL;
491   }
492   else
493   {
494     char *cmd = sock->rbuf + sock->next_cmd;
495     *eol = '\0';
496
497     sock->next_cmd = eol - sock->rbuf + 1;
498
499     if (eol > sock->rbuf && *(eol-1) == '\r')
500       *(--eol) = '\0'; /* handle "\r\n" EOL */
501
502     *len = eol - cmd;
503
504     return cmd;
505   }
506
507   /* NOTREACHED */
508   assert(1==0);
509 } /* }}} char *next_cmd */
510
511 /* add the characters directly to the write buffer */
512 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
513 {
514   char *new_buf;
515
516   assert(sock != NULL);
517
518   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
519   if (new_buf == NULL)
520   {
521     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
522     return -1;
523   }
524
525   strncpy(new_buf + sock->wbuf_len, str, len + 1);
526
527   sock->wbuf = new_buf;
528   sock->wbuf_len += len;
529
530   return 0;
531 } /* }}} static int add_to_wbuf */
532
533 /* add the text to the "extra" info that's sent after the status line */
534 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
535 {
536   va_list argp;
537   char buffer[CMD_MAX];
538   int len;
539
540   if (JOURNAL_REPLAY(sock)) return 0;
541   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
542
543   va_start(argp, fmt);
544 #ifdef HAVE_VSNPRINTF
545   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
546 #else
547   len = vsprintf(buffer, fmt, argp);
548 #endif
549   va_end(argp);
550   if (len < 0)
551   {
552     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
553     return -1;
554   }
555
556   return add_to_wbuf(sock, buffer, len);
557 } /* }}} static int add_response_info */
558
559 static int count_lines(char *str) /* {{{ */
560 {
561   int lines = 0;
562
563   if (str != NULL)
564   {
565     while ((str = strchr(str, '\n')) != NULL)
566     {
567       ++lines;
568       ++str;
569     }
570   }
571
572   return lines;
573 } /* }}} static int count_lines */
574
575 /* send the response back to the user.
576  * returns 0 on success, -1 on error
577  * write buffer is always zeroed after this call */
578 static int send_response (listen_socket_t *sock, response_code rc,
579                           char *fmt, ...) /* {{{ */
580 {
581   va_list argp;
582   char buffer[CMD_MAX];
583   int lines;
584   ssize_t wrote;
585   int rclen, len;
586
587   if (JOURNAL_REPLAY(sock)) return rc;
588
589   if (sock->batch_start)
590   {
591     if (rc == RESP_OK)
592       return rc; /* no response on success during BATCH */
593     lines = sock->batch_cmd;
594   }
595   else if (rc == RESP_OK)
596     lines = count_lines(sock->wbuf);
597   else
598     lines = -1;
599
600   rclen = sprintf(buffer, "%d ", lines);
601   va_start(argp, fmt);
602 #ifdef HAVE_VSNPRINTF
603   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
604 #else
605   len = vsprintf(buffer+rclen, fmt, argp);
606 #endif
607   va_end(argp);
608   if (len < 0)
609     return -1;
610
611   len += rclen;
612
613   /* append the result to the wbuf, don't write to the user */
614   if (sock->batch_start)
615     return add_to_wbuf(sock, buffer, len);
616
617   /* first write must be complete */
618   if (len != write(sock->fd, buffer, len))
619   {
620     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
621     return -1;
622   }
623
624   if (sock->wbuf != NULL && rc == RESP_OK)
625   {
626     wrote = 0;
627     while (wrote < sock->wbuf_len)
628     {
629       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
630       if (wb <= 0)
631       {
632         RRDD_LOG(LOG_INFO, "send_response: could not write results");
633         return -1;
634       }
635       wrote += wb;
636     }
637   }
638
639   free(sock->wbuf); sock->wbuf = NULL;
640   sock->wbuf_len = 0;
641
642   return 0;
643 } /* }}} */
644
645 static void wipe_ci_values(cache_item_t *ci, time_t when)
646 {
647   ci->values = NULL;
648   ci->values_num = 0;
649   ci->values_alloc = 0;
650
651   ci->last_flush_time = when;
652   if (config_write_jitter > 0)
653     ci->last_flush_time += (rrd_random() % config_write_jitter);
654 }
655
656 /* remove_from_queue
657  * remove a "cache_item_t" item from the queue.
658  * must hold 'cache_lock' when calling this
659  */
660 static void remove_from_queue(cache_item_t *ci) /* {{{ */
661 {
662   if (ci == NULL) return;
663   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
664
665   if (ci->prev == NULL)
666     cache_queue_head = ci->next; /* reset head */
667   else
668     ci->prev->next = ci->next;
669
670   if (ci->next == NULL)
671     cache_queue_tail = ci->prev; /* reset the tail */
672   else
673     ci->next->prev = ci->prev;
674
675   ci->next = ci->prev = NULL;
676   ci->flags &= ~CI_FLAGS_IN_QUEUE;
677
678   pthread_mutex_lock (&stats_lock);
679   assert (stats_queue_length > 0);
680   stats_queue_length--;
681   pthread_mutex_unlock (&stats_lock);
682
683 } /* }}} static void remove_from_queue */
684
685 /* free the resources associated with the cache_item_t
686  * must hold cache_lock when calling this function
687  */
688 static void *free_cache_item(cache_item_t *ci) /* {{{ */
689 {
690   if (ci == NULL) return NULL;
691
692   remove_from_queue(ci);
693
694   for (size_t i=0; i < ci->values_num; i++)
695     free(ci->values[i]);
696
697   free (ci->values);
698   free (ci->file);
699
700   /* in case anyone is waiting */
701   pthread_cond_broadcast(&ci->flushed);
702   pthread_cond_destroy(&ci->flushed);
703
704   free (ci);
705
706   return NULL;
707 } /* }}} static void *free_cache_item */
708
709 /*
710  * enqueue_cache_item:
711  * `cache_lock' must be acquired before calling this function!
712  */
713 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
714     queue_side_t side)
715 {
716   if (ci == NULL)
717     return (-1);
718
719   if (ci->values_num == 0)
720     return (0);
721
722   if (side == HEAD)
723   {
724     if (cache_queue_head == ci)
725       return 0;
726
727     /* remove if further down in queue */
728     remove_from_queue(ci);
729
730     ci->prev = NULL;
731     ci->next = cache_queue_head;
732     if (ci->next != NULL)
733       ci->next->prev = ci;
734     cache_queue_head = ci;
735
736     if (cache_queue_tail == NULL)
737       cache_queue_tail = cache_queue_head;
738   }
739   else /* (side == TAIL) */
740   {
741     /* We don't move values back in the list.. */
742     if (ci->flags & CI_FLAGS_IN_QUEUE)
743       return (0);
744
745     assert (ci->next == NULL);
746     assert (ci->prev == NULL);
747
748     ci->prev = cache_queue_tail;
749
750     if (cache_queue_tail == NULL)
751       cache_queue_head = ci;
752     else
753       cache_queue_tail->next = ci;
754
755     cache_queue_tail = ci;
756   }
757
758   ci->flags |= CI_FLAGS_IN_QUEUE;
759
760   pthread_cond_signal(&queue_cond);
761   pthread_mutex_lock (&stats_lock);
762   stats_queue_length++;
763   pthread_mutex_unlock (&stats_lock);
764
765   return (0);
766 } /* }}} int enqueue_cache_item */
767
768 /*
769  * tree_callback_flush:
770  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
771  * while this is in progress.
772  */
773 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
774     gpointer data)
775 {
776   cache_item_t *ci;
777   callback_flush_data_t *cfd;
778
779   ci = (cache_item_t *) value;
780   cfd = (callback_flush_data_t *) data;
781
782   if (ci->flags & CI_FLAGS_IN_QUEUE)
783     return FALSE;
784
785   if (ci->values_num > 0
786       && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
787   {
788     enqueue_cache_item (ci, TAIL);
789   }
790   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
791       && (ci->values_num <= 0))
792   {
793     assert ((char *) key == ci->file);
794     if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
795     {
796       RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
797       return (FALSE);
798     }
799   }
800
801   return (FALSE);
802 } /* }}} gboolean tree_callback_flush */
803
804 static int flush_old_values (int max_age)
805 {
806   callback_flush_data_t cfd;
807   size_t k;
808
809   memset (&cfd, 0, sizeof (cfd));
810   /* Pass the current time as user data so that we don't need to call
811    * `time' for each node. */
812   cfd.now = time (NULL);
813   cfd.keys = NULL;
814   cfd.keys_num = 0;
815
816   if (max_age > 0)
817     cfd.abs_timeout = cfd.now - max_age;
818   else
819     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
820
821   /* `tree_callback_flush' will return the keys of all values that haven't
822    * been touched in the last `config_flush_interval' seconds in `cfd'.
823    * The char*'s in this array point to the same memory as ci->file, so we
824    * don't need to free them separately. */
825   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
826
827   for (k = 0; k < cfd.keys_num; k++)
828   {
829     gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
830     /* should never fail, since we have held the cache_lock
831      * the entire time */
832     assert(status == TRUE);
833   }
834
835   if (cfd.keys != NULL)
836   {
837     free (cfd.keys);
838     cfd.keys = NULL;
839   }
840
841   return (0);
842 } /* int flush_old_values */
843
844 static void *flush_thread_main (void UNUSED(*args)) /* {{{ */
845 {
846   struct timeval now;
847   struct timespec next_flush;
848   int status;
849
850   gettimeofday (&now, NULL);
851   next_flush.tv_sec = now.tv_sec + config_flush_interval;
852   next_flush.tv_nsec = 1000 * now.tv_usec;
853
854   pthread_mutex_lock(&cache_lock);
855
856   while (state == RUNNING)
857   {
858     gettimeofday (&now, NULL);
859     if ((now.tv_sec > next_flush.tv_sec)
860         || ((now.tv_sec == next_flush.tv_sec)
861           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
862     {
863       RRDD_LOG(LOG_DEBUG, "flushing old values");
864
865       /* Determine the time of the next cache flush. */
866       next_flush.tv_sec = now.tv_sec + config_flush_interval;
867
868       /* Flush all values that haven't been written in the last
869        * `config_write_interval' seconds. */
870       flush_old_values (config_write_interval);
871
872       /* unlock the cache while we rotate so we don't block incoming
873        * updates if the fsync() blocks on disk I/O */
874       pthread_mutex_unlock(&cache_lock);
875       journal_rotate();
876       pthread_mutex_lock(&cache_lock);
877     }
878
879     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
880     if (status != 0 && status != ETIMEDOUT)
881     {
882       RRDD_LOG (LOG_ERR, "flush_thread_main: "
883                 "pthread_cond_timedwait returned %i.", status);
884     }
885   }
886
887   if (config_flush_at_shutdown)
888     flush_old_values (-1); /* flush everything */
889
890   state = SHUTDOWN;
891
892   pthread_mutex_unlock(&cache_lock);
893
894   return NULL;
895 } /* void *flush_thread_main */
896
897 static void *queue_thread_main (void UNUSED(*args)) /* {{{ */
898 {
899   pthread_mutex_lock (&cache_lock);
900
901   while (state != SHUTDOWN
902          || (cache_queue_head != NULL && config_flush_at_shutdown))
903   {
904     cache_item_t *ci;
905     char *file;
906     char **values;
907     size_t values_num;
908     int status;
909
910     /* Now, check if there's something to store away. If not, wait until
911      * something comes in. */
912     if (cache_queue_head == NULL)
913     {
914       status = pthread_cond_wait (&queue_cond, &cache_lock);
915       if ((status != 0) && (status != ETIMEDOUT))
916       {
917         RRDD_LOG (LOG_ERR, "queue_thread_main: "
918             "pthread_cond_wait returned %i.", status);
919       }
920     }
921
922     /* Check if a value has arrived. This may be NULL if we timed out or there
923      * was an interrupt such as a signal. */
924     if (cache_queue_head == NULL)
925       continue;
926
927     ci = cache_queue_head;
928
929     /* copy the relevant parts */
930     file = strdup (ci->file);
931     if (file == NULL)
932     {
933       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
934       continue;
935     }
936
937     assert(ci->values != NULL);
938     assert(ci->values_num > 0);
939
940     values = ci->values;
941     values_num = ci->values_num;
942
943     wipe_ci_values(ci, time(NULL));
944     remove_from_queue(ci);
945
946     pthread_mutex_unlock (&cache_lock);
947
948     rrd_clear_error ();
949     status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
950     if (status != 0)
951     {
952       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
953           "rrd_update_r (%s) failed with status %i. (%s)",
954           file, status, rrd_get_error());
955     }
956
957     journal_write("wrote", file);
958
959     /* Search again in the tree.  It's possible someone issued a "FORGET"
960      * while we were writing the update values. */
961     pthread_mutex_lock(&cache_lock);
962     ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
963     if (ci)
964       pthread_cond_broadcast(&ci->flushed);
965     pthread_mutex_unlock(&cache_lock);
966
967     if (status == 0)
968     {
969       pthread_mutex_lock (&stats_lock);
970       stats_updates_written++;
971       stats_data_sets_written += values_num;
972       pthread_mutex_unlock (&stats_lock);
973     }
974
975     rrd_free_ptrs((void ***) &values, &values_num);
976     free(file);
977
978     pthread_mutex_lock (&cache_lock);
979   }
980   pthread_mutex_unlock (&cache_lock);
981
982   return (NULL);
983 } /* }}} void *queue_thread_main */
984
985 static int buffer_get_field (char **buffer_ret, /* {{{ */
986     size_t *buffer_size_ret, char **field_ret)
987 {
988   char *buffer;
989   size_t buffer_pos;
990   size_t buffer_size;
991   char *field;
992   size_t field_size;
993   int status;
994
995   buffer = *buffer_ret;
996   buffer_pos = 0;
997   buffer_size = *buffer_size_ret;
998   field = *buffer_ret;
999   field_size = 0;
1000
1001   if (buffer_size <= 0)
1002     return (-1);
1003
1004   /* This is ensured by `handle_request'. */
1005   assert (buffer[buffer_size - 1] == '\0');
1006
1007   status = -1;
1008   while (buffer_pos < buffer_size)
1009   {
1010     /* Check for end-of-field or end-of-buffer */
1011     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1012     {
1013       field[field_size] = 0;
1014       field_size++;
1015       buffer_pos++;
1016       status = 0;
1017       break;
1018     }
1019     /* Handle escaped characters. */
1020     else if (buffer[buffer_pos] == '\\')
1021     {
1022       if (buffer_pos >= (buffer_size - 1))
1023         break;
1024       buffer_pos++;
1025       field[field_size] = buffer[buffer_pos];
1026       field_size++;
1027       buffer_pos++;
1028     }
1029     /* Normal operation */ 
1030     else
1031     {
1032       field[field_size] = buffer[buffer_pos];
1033       field_size++;
1034       buffer_pos++;
1035     }
1036   } /* while (buffer_pos < buffer_size) */
1037
1038   if (status != 0)
1039     return (status);
1040
1041   *buffer_ret = buffer + buffer_pos;
1042   *buffer_size_ret = buffer_size - buffer_pos;
1043   *field_ret = field;
1044
1045   return (0);
1046 } /* }}} int buffer_get_field */
1047
1048 /* if we're restricting writes to the base directory,
1049  * check whether the file falls within the dir
1050  * returns 1 if OK, otherwise 0
1051  */
1052 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1053 {
1054   assert(file != NULL);
1055
1056   if (!config_write_base_only
1057       || JOURNAL_REPLAY(sock)
1058       || config_base_dir == NULL)
1059     return 1;
1060
1061   if (strstr(file, "../") != NULL) goto err;
1062
1063   /* relative paths without "../" are ok */
1064   if (*file != '/') return 1;
1065
1066   /* file must be of the format base + "/" + <1+ char filename> */
1067   if (strlen(file) < _config_base_dir_len + 2) goto err;
1068   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1069   if (*(file + _config_base_dir_len) != '/') goto err;
1070
1071   return 1;
1072
1073 err:
1074   if (sock != NULL && sock->fd >= 0)
1075     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1076
1077   return 0;
1078 } /* }}} static int check_file_access */
1079
1080 /* when using a base dir, convert relative paths to absolute paths.
1081  * if necessary, modifies the "filename" pointer to point
1082  * to the new path created in "tmp".  "tmp" is provided
1083  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1084  *
1085  * this allows us to optimize for the expected case (absolute path)
1086  * with a no-op.
1087  */
1088 static void get_abs_path(char **filename, char *tmp)
1089 {
1090   assert(tmp != NULL);
1091   assert(filename != NULL && *filename != NULL);
1092
1093   if (config_base_dir == NULL || **filename == '/')
1094     return;
1095
1096   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1097   *filename = tmp;
1098 } /* }}} static int get_abs_path */
1099
1100 static int flush_file (const char *filename) /* {{{ */
1101 {
1102   cache_item_t *ci;
1103
1104   pthread_mutex_lock (&cache_lock);
1105
1106   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1107   if (ci == NULL)
1108   {
1109     pthread_mutex_unlock (&cache_lock);
1110     return (ENOENT);
1111   }
1112
1113   if (ci->values_num > 0)
1114   {
1115     /* Enqueue at head */
1116     enqueue_cache_item (ci, HEAD);
1117     pthread_cond_wait(&ci->flushed, &cache_lock);
1118   }
1119
1120   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1121    * may have been purged during our cond_wait() */
1122
1123   pthread_mutex_unlock(&cache_lock);
1124
1125   return (0);
1126 } /* }}} int flush_file */
1127
1128 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1129 {
1130   char *err = "Syntax error.\n";
1131
1132   if (cmd && cmd->syntax)
1133     err = cmd->syntax;
1134
1135   return send_response(sock, RESP_ERR, "Usage: %s", err);
1136 } /* }}} static int syntax_error() */
1137
1138 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1139 {
1140   uint64_t copy_queue_length;
1141   uint64_t copy_updates_received;
1142   uint64_t copy_flush_received;
1143   uint64_t copy_updates_written;
1144   uint64_t copy_data_sets_written;
1145   uint64_t copy_journal_bytes;
1146   uint64_t copy_journal_rotate;
1147
1148   uint64_t tree_nodes_number;
1149   uint64_t tree_depth;
1150
1151   pthread_mutex_lock (&stats_lock);
1152   copy_queue_length       = stats_queue_length;
1153   copy_updates_received   = stats_updates_received;
1154   copy_flush_received     = stats_flush_received;
1155   copy_updates_written    = stats_updates_written;
1156   copy_data_sets_written  = stats_data_sets_written;
1157   copy_journal_bytes      = stats_journal_bytes;
1158   copy_journal_rotate     = stats_journal_rotate;
1159   pthread_mutex_unlock (&stats_lock);
1160
1161   pthread_mutex_lock (&cache_lock);
1162   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1163   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1164   pthread_mutex_unlock (&cache_lock);
1165
1166   add_response_info(sock,
1167                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1168   add_response_info(sock,
1169                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1170   add_response_info(sock,
1171                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1172   add_response_info(sock,
1173                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1174   add_response_info(sock,
1175                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1176   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1177   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1178   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1179   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1180
1181   send_response(sock, RESP_OK, "Statistics follow\n");
1182
1183   return (0);
1184 } /* }}} int handle_request_stats */
1185
1186 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1187 {
1188   char *file, file_tmp[PATH_MAX];
1189   int status;
1190
1191   status = buffer_get_field (&buffer, &buffer_size, &file);
1192   if (status != 0)
1193   {
1194     return syntax_error(sock,cmd);
1195   }
1196   else
1197   {
1198     pthread_mutex_lock(&stats_lock);
1199     stats_flush_received++;
1200     pthread_mutex_unlock(&stats_lock);
1201
1202     get_abs_path(&file, file_tmp);
1203     if (!check_file_access(file, sock)) return 0;
1204
1205     status = flush_file (file);
1206     if (status == 0)
1207       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1208     else if (status == ENOENT)
1209     {
1210       /* no file in our tree; see whether it exists at all */
1211       struct stat statbuf;
1212
1213       memset(&statbuf, 0, sizeof(statbuf));
1214       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1215         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1216       else
1217         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1218     }
1219     else if (status < 0)
1220       return send_response(sock, RESP_ERR, "Internal error.\n");
1221     else
1222       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1223   }
1224
1225   /* NOTREACHED */
1226   assert(1==0);
1227 } /* }}} int handle_request_flush */
1228
1229 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1230 {
1231   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1232
1233   pthread_mutex_lock(&cache_lock);
1234   flush_old_values(-1);
1235   pthread_mutex_unlock(&cache_lock);
1236
1237   return send_response(sock, RESP_OK, "Started flush.\n");
1238 } /* }}} static int handle_request_flushall */
1239
1240 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1241 {
1242   int status;
1243   char *file, file_tmp[PATH_MAX];
1244   cache_item_t *ci;
1245
1246   status = buffer_get_field(&buffer, &buffer_size, &file);
1247   if (status != 0)
1248     return syntax_error(sock,cmd);
1249
1250   get_abs_path(&file, file_tmp);
1251
1252   pthread_mutex_lock(&cache_lock);
1253   ci = g_tree_lookup(cache_tree, file);
1254   if (ci == NULL)
1255   {
1256     pthread_mutex_unlock(&cache_lock);
1257     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1258   }
1259
1260   for (size_t i=0; i < ci->values_num; i++)
1261     add_response_info(sock, "%s\n", ci->values[i]);
1262
1263   pthread_mutex_unlock(&cache_lock);
1264   return send_response(sock, RESP_OK, "updates pending\n");
1265 } /* }}} static int handle_request_pending */
1266
1267 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1268 {
1269   int status;
1270   gboolean found;
1271   char *file, file_tmp[PATH_MAX];
1272
1273   status = buffer_get_field(&buffer, &buffer_size, &file);
1274   if (status != 0)
1275     return syntax_error(sock,cmd);
1276
1277   get_abs_path(&file, file_tmp);
1278   if (!check_file_access(file, sock)) return 0;
1279
1280   pthread_mutex_lock(&cache_lock);
1281   found = g_tree_remove(cache_tree, file);
1282   pthread_mutex_unlock(&cache_lock);
1283
1284   if (found == TRUE)
1285   {
1286     if (!JOURNAL_REPLAY(sock))
1287       journal_write("forget", file);
1288
1289     return send_response(sock, RESP_OK, "Gone!\n");
1290   }
1291   else
1292     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1293
1294   /* NOTREACHED */
1295   assert(1==0);
1296 } /* }}} static int handle_request_forget */
1297
1298 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1299 {
1300   cache_item_t *ci;
1301
1302   pthread_mutex_lock(&cache_lock);
1303
1304   ci = cache_queue_head;
1305   while (ci != NULL)
1306   {
1307     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1308     ci = ci->next;
1309   }
1310
1311   pthread_mutex_unlock(&cache_lock);
1312
1313   return send_response(sock, RESP_OK, "in queue.\n");
1314 } /* }}} int handle_request_queue */
1315
1316 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1317 {
1318   char *file, file_tmp[PATH_MAX];
1319   int values_num = 0;
1320   int status;
1321   char orig_buf[CMD_MAX];
1322
1323   cache_item_t *ci;
1324
1325   /* save it for the journal later */
1326   if (!JOURNAL_REPLAY(sock))
1327     strncpy(orig_buf, buffer, buffer_size);
1328
1329   status = buffer_get_field (&buffer, &buffer_size, &file);
1330   if (status != 0)
1331     return syntax_error(sock,cmd);
1332
1333   pthread_mutex_lock(&stats_lock);
1334   stats_updates_received++;
1335   pthread_mutex_unlock(&stats_lock);
1336
1337   get_abs_path(&file, file_tmp);
1338   if (!check_file_access(file, sock)) return 0;
1339
1340   pthread_mutex_lock (&cache_lock);
1341   ci = g_tree_lookup (cache_tree, file);
1342
1343   if (ci == NULL) /* {{{ */
1344   {
1345     struct stat statbuf;
1346     cache_item_t *tmp;
1347
1348     /* don't hold the lock while we setup; stat(2) might block */
1349     pthread_mutex_unlock(&cache_lock);
1350
1351     memset (&statbuf, 0, sizeof (statbuf));
1352     status = stat (file, &statbuf);
1353     if (status != 0)
1354     {
1355       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1356
1357       status = errno;
1358       if (status == ENOENT)
1359         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1360       else
1361         return send_response(sock, RESP_ERR,
1362                              "stat failed with error %i.\n", status);
1363     }
1364     if (!S_ISREG (statbuf.st_mode))
1365       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1366
1367     if (access(file, R_OK|W_OK) != 0)
1368       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1369                            file, rrd_strerror(errno));
1370
1371     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1372     if (ci == NULL)
1373     {
1374       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1375
1376       return send_response(sock, RESP_ERR, "malloc failed.\n");
1377     }
1378     memset (ci, 0, sizeof (cache_item_t));
1379
1380     ci->file = strdup (file);
1381     if (ci->file == NULL)
1382     {
1383       free (ci);
1384       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1385
1386       return send_response(sock, RESP_ERR, "strdup failed.\n");
1387     }
1388
1389     wipe_ci_values(ci, now);
1390     ci->flags = CI_FLAGS_IN_TREE;
1391     pthread_cond_init(&ci->flushed, NULL);
1392
1393     pthread_mutex_lock(&cache_lock);
1394
1395     /* another UPDATE might have added this entry in the meantime */
1396     tmp = g_tree_lookup (cache_tree, file);
1397     if (tmp == NULL)
1398       g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1399     else
1400     {
1401       free_cache_item (ci);
1402       ci = tmp;
1403     }
1404
1405     /* state may have changed while we were unlocked */
1406     if (state == SHUTDOWN)
1407       return -1;
1408   } /* }}} */
1409   assert (ci != NULL);
1410
1411   /* don't re-write updates in replay mode */
1412   if (!JOURNAL_REPLAY(sock))
1413     journal_write("update", orig_buf);
1414
1415   while (buffer_size > 0)
1416   {
1417     char *value;
1418     time_t stamp;
1419     char *eostamp;
1420
1421     status = buffer_get_field (&buffer, &buffer_size, &value);
1422     if (status != 0)
1423     {
1424       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1425       break;
1426     }
1427
1428     /* make sure update time is always moving forward */
1429     stamp = strtol(value, &eostamp, 10);
1430     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1431     {
1432       pthread_mutex_unlock(&cache_lock);
1433       return send_response(sock, RESP_ERR,
1434                            "Cannot find timestamp in '%s'!\n", value);
1435     }
1436     else if (stamp <= ci->last_update_stamp)
1437     {
1438       pthread_mutex_unlock(&cache_lock);
1439       return send_response(sock, RESP_ERR,
1440                            "illegal attempt to update using time %ld when last"
1441                            " update time is %ld (minimum one second step)\n",
1442                            stamp, ci->last_update_stamp);
1443     }
1444     else
1445       ci->last_update_stamp = stamp;
1446
1447     if (!rrd_add_strdup_chunk(&ci->values, &ci->values_num, value,
1448                               &ci->values_alloc, config_alloc_chunk))
1449     {
1450       RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1451       continue;
1452     }
1453
1454     values_num++;
1455   }
1456
1457   if (((now - ci->last_flush_time) >= config_write_interval)
1458       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1459       && (ci->values_num > 0))
1460   {
1461     enqueue_cache_item (ci, TAIL);
1462   }
1463
1464   pthread_mutex_unlock (&cache_lock);
1465
1466   if (values_num < 1)
1467     return send_response(sock, RESP_ERR, "No values updated.\n");
1468   else
1469     return send_response(sock, RESP_OK,
1470                          "errors, enqueued %i value(s).\n", values_num);
1471
1472   /* NOTREACHED */
1473   assert(1==0);
1474
1475 } /* }}} int handle_request_update */
1476
1477 static int handle_request_fetch (HANDLER_PROTO) /* {{{ */
1478 {
1479   char *file, file_tmp[PATH_MAX];
1480   char *cf;
1481
1482   char *start_str;
1483   char *end_str;
1484   time_t start_tm;
1485   time_t end_tm;
1486
1487   unsigned long step;
1488   unsigned long ds_cnt;
1489   char **ds_namv;
1490   rrd_value_t *data;
1491
1492   int status;
1493   unsigned long i;
1494   time_t t;
1495   rrd_value_t *data_ptr;
1496
1497   file = NULL;
1498   cf = NULL;
1499   start_str = NULL;
1500   end_str = NULL;
1501
1502   /* Read the arguments */
1503   do /* while (0) */
1504   {
1505     status = buffer_get_field (&buffer, &buffer_size, &file);
1506     if (status != 0)
1507       break;
1508
1509     status = buffer_get_field (&buffer, &buffer_size, &cf);
1510     if (status != 0)
1511       break;
1512
1513     status = buffer_get_field (&buffer, &buffer_size, &start_str);
1514     if (status != 0)
1515     {
1516       start_str = NULL;
1517       status = 0;
1518       break;
1519     }
1520
1521     status = buffer_get_field (&buffer, &buffer_size, &end_str);
1522     if (status != 0)
1523     {
1524       end_str = NULL;
1525       status = 0;
1526       break;
1527     }
1528   } while (0);
1529
1530   if (status != 0)
1531     return (syntax_error(sock,cmd));
1532
1533   get_abs_path(&file, file_tmp);
1534   if (!check_file_access(file, sock)) return 0;
1535
1536   status = flush_file (file);
1537   if ((status != 0) && (status != ENOENT))
1538     return (send_response (sock, RESP_ERR,
1539           "flush_file (%s) failed with status %i.\n", file, status));
1540
1541   t = time (NULL); /* "now" */
1542
1543   /* Parse start time */
1544   if (start_str != NULL)
1545   {
1546     char *endptr;
1547     long value;
1548
1549     endptr = NULL;
1550     errno = 0;
1551     value = strtol (start_str, &endptr, /* base = */ 0);
1552     if ((endptr == start_str) || (errno != 0))
1553       return (send_response(sock, RESP_ERR,
1554             "Cannot parse start time `%s': Only simple integers are allowed.\n",
1555             start_str));
1556
1557     if (value > 0)
1558       start_tm = (time_t) value;
1559     else
1560       start_tm = (time_t) (t + value);
1561   }
1562   else
1563   {
1564     start_tm = t - 86400;
1565   }
1566
1567   /* Parse end time */
1568   if (end_str != NULL)
1569   {
1570     char *endptr;
1571     long value;
1572
1573     endptr = NULL;
1574     errno = 0;
1575     value = strtol (end_str, &endptr, /* base = */ 0);
1576     if ((endptr == end_str) || (errno != 0))
1577       return (send_response(sock, RESP_ERR,
1578             "Cannot parse end time `%s': Only simple integers are allowed.\n",
1579             end_str));
1580
1581     if (value > 0)
1582       end_tm = (time_t) value;
1583     else
1584       end_tm = (time_t) (t + value);
1585   }
1586   else
1587   {
1588     end_tm = t;
1589   }
1590
1591   step = -1;
1592   ds_cnt = 0;
1593   ds_namv = NULL;
1594   data = NULL;
1595
1596   status = rrd_fetch_r (file, cf, &start_tm, &end_tm, &step,
1597       &ds_cnt, &ds_namv, &data);
1598   if (status != 0)
1599     return (send_response(sock, RESP_ERR,
1600           "rrd_fetch_r failed: %s\n", rrd_get_error ()));
1601
1602   add_response_info (sock, "FlushVersion: %lu\n", 1);
1603   add_response_info (sock, "Start: %lu\n", (unsigned long) start_tm);
1604   add_response_info (sock, "End: %lu\n", (unsigned long) end_tm);
1605   add_response_info (sock, "Step: %lu\n", step);
1606   add_response_info (sock, "DSCount: %lu\n", ds_cnt);
1607
1608 #define SSTRCAT(buffer,str,buffer_fill) do { \
1609     size_t str_len = strlen (str); \
1610     if ((buffer_fill + str_len) > sizeof (buffer)) \
1611       str_len = sizeof (buffer) - buffer_fill; \
1612     if (str_len > 0) { \
1613       strncpy (buffer + buffer_fill, str, str_len); \
1614       buffer_fill += str_len; \
1615       assert (buffer_fill <= sizeof (buffer)); \
1616       if (buffer_fill == sizeof (buffer)) \
1617         buffer[buffer_fill - 1] = 0; \
1618       else \
1619         buffer[buffer_fill] = 0; \
1620     } \
1621   } while (0)
1622
1623   { /* Add list of DS names */
1624     char linebuf[1024];
1625     size_t linebuf_fill;
1626
1627     memset (linebuf, 0, sizeof (linebuf));
1628     linebuf_fill = 0;
1629     for (i = 0; i < ds_cnt; i++)
1630     {
1631       if (i > 0)
1632         SSTRCAT (linebuf, " ", linebuf_fill);
1633       SSTRCAT (linebuf, ds_namv[i], linebuf_fill);
1634       rrd_freemem(ds_namv[i]);
1635     }
1636     rrd_freemem(ds_namv);
1637     add_response_info (sock, "DSName: %s\n", linebuf);
1638   }
1639
1640   /* Add the actual data */
1641   assert (step > 0);
1642   data_ptr = data;
1643   for (t = start_tm + step; t <= end_tm; t += step)
1644   {
1645     char linebuf[1024];
1646     size_t linebuf_fill;
1647     char tmp[128];
1648
1649     memset (linebuf, 0, sizeof (linebuf));
1650     linebuf_fill = 0;
1651     for (i = 0; i < ds_cnt; i++)
1652     {
1653       snprintf (tmp, sizeof (tmp), " %0.10e", *data_ptr);
1654       tmp[sizeof (tmp) - 1] = 0;
1655       SSTRCAT (linebuf, tmp, linebuf_fill);
1656
1657       data_ptr++;
1658     }
1659
1660     add_response_info (sock, "%10lu:%s\n", (unsigned long) t, linebuf);
1661   } /* for (t) */
1662   rrd_freemem(data);
1663
1664   return (send_response (sock, RESP_OK, "Success\n"));
1665 #undef SSTRCAT
1666 } /* }}} int handle_request_fetch */
1667
1668 /* we came across a "WROTE" entry during journal replay.
1669  * throw away any values that we have accumulated for this file
1670  */
1671 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1672 {
1673   cache_item_t *ci;
1674   const char *file = buffer;
1675
1676   pthread_mutex_lock(&cache_lock);
1677
1678   ci = g_tree_lookup(cache_tree, file);
1679   if (ci == NULL)
1680   {
1681     pthread_mutex_unlock(&cache_lock);
1682     return (0);
1683   }
1684
1685   if (ci->values)
1686     rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1687
1688   wipe_ci_values(ci, now);
1689   remove_from_queue(ci);
1690
1691   pthread_mutex_unlock(&cache_lock);
1692   return (0);
1693 } /* }}} int handle_request_wrote */
1694
1695 /* start "BATCH" processing */
1696 static int batch_start (HANDLER_PROTO) /* {{{ */
1697 {
1698   int status;
1699   if (sock->batch_start)
1700     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1701
1702   status = send_response(sock, RESP_OK,
1703                          "Go ahead.  End with dot '.' on its own line.\n");
1704   sock->batch_start = time(NULL);
1705   sock->batch_cmd = 0;
1706
1707   return status;
1708 } /* }}} static int batch_start */
1709
1710 /* finish "BATCH" processing and return results to the client */
1711 static int batch_done (HANDLER_PROTO) /* {{{ */
1712 {
1713   assert(sock->batch_start);
1714   sock->batch_start = 0;
1715   sock->batch_cmd  = 0;
1716   return send_response(sock, RESP_OK, "errors\n");
1717 } /* }}} static int batch_done */
1718
1719 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1720 {
1721   return -1;
1722 } /* }}} static int handle_request_quit */
1723
1724 static command_t list_of_commands[] = { /* {{{ */
1725   {
1726     "UPDATE",
1727     handle_request_update,
1728     CMD_CONTEXT_ANY,
1729     "UPDATE <filename> <values> [<values> ...]\n"
1730     ,
1731     "Adds the given file to the internal cache if it is not yet known and\n"
1732     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1733     "for details.\n"
1734     "\n"
1735     "Each <values> has the following form:\n"
1736     "  <values> = <time>:<value>[:<value>[...]]\n"
1737     "See the rrdupdate(1) manpage for details.\n"
1738   },
1739   {
1740     "WROTE",
1741     handle_request_wrote,
1742     CMD_CONTEXT_JOURNAL,
1743     NULL,
1744     NULL
1745   },
1746   {
1747     "FLUSH",
1748     handle_request_flush,
1749     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1750     "FLUSH <filename>\n"
1751     ,
1752     "Adds the given filename to the head of the update queue and returns\n"
1753     "after it has been dequeued.\n"
1754   },
1755   {
1756     "FLUSHALL",
1757     handle_request_flushall,
1758     CMD_CONTEXT_CLIENT,
1759     "FLUSHALL\n"
1760     ,
1761     "Triggers writing of all pending updates.  Returns immediately.\n"
1762   },
1763   {
1764     "PENDING",
1765     handle_request_pending,
1766     CMD_CONTEXT_CLIENT,
1767     "PENDING <filename>\n"
1768     ,
1769     "Shows any 'pending' updates for a file, in order.\n"
1770     "The updates shown have not yet been written to the underlying RRD file.\n"
1771   },
1772   {
1773     "FORGET",
1774     handle_request_forget,
1775     CMD_CONTEXT_ANY,
1776     "FORGET <filename>\n"
1777     ,
1778     "Removes the file completely from the cache.\n"
1779     "Any pending updates for the file will be lost.\n"
1780   },
1781   {
1782     "QUEUE",
1783     handle_request_queue,
1784     CMD_CONTEXT_CLIENT,
1785     "QUEUE\n"
1786     ,
1787         "Shows all files in the output queue.\n"
1788     "The output is zero or more lines in the following format:\n"
1789     "(where <num_vals> is the number of values to be written)\n"
1790     "\n"
1791     "<num_vals> <filename>\n"
1792   },
1793   {
1794     "STATS",
1795     handle_request_stats,
1796     CMD_CONTEXT_CLIENT,
1797     "STATS\n"
1798     ,
1799     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1800     "a description of the values.\n"
1801   },
1802   {
1803     "HELP",
1804     handle_request_help,
1805     CMD_CONTEXT_CLIENT,
1806     "HELP [<command>]\n",
1807     NULL, /* special! */
1808   },
1809   {
1810     "BATCH",
1811     batch_start,
1812     CMD_CONTEXT_CLIENT,
1813     "BATCH\n"
1814     ,
1815     "The 'BATCH' command permits the client to initiate a bulk load\n"
1816     "   of commands to rrdcached.\n"
1817     "\n"
1818     "Usage:\n"
1819     "\n"
1820     "    client: BATCH\n"
1821     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1822     "    client: command #1\n"
1823     "    client: command #2\n"
1824     "    client: ... and so on\n"
1825     "    client: .\n"
1826     "    server: 2 errors\n"
1827     "    server: 7 message for command #7\n"
1828     "    server: 9 message for command #9\n"
1829     "\n"
1830     "For more information, consult the rrdcached(1) documentation.\n"
1831   },
1832   {
1833     ".",   /* BATCH terminator */
1834     batch_done,
1835     CMD_CONTEXT_BATCH,
1836     NULL,
1837     NULL
1838   },
1839   {
1840     "FETCH",
1841     handle_request_fetch,
1842     CMD_CONTEXT_CLIENT,
1843     "FETCH <file> <CF> [<start> [<end>]]\n"
1844     ,
1845     "The 'FETCH' can be used by the client to retrieve values from an RRD file.\n"
1846   },
1847   {
1848     "QUIT",
1849     handle_request_quit,
1850     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1851     "QUIT\n"
1852     ,
1853     "Disconnect from rrdcached.\n"
1854   }
1855 }; /* }}} command_t list_of_commands[] */
1856 static size_t list_of_commands_len = sizeof (list_of_commands)
1857   / sizeof (list_of_commands[0]);
1858
1859 static command_t *find_command(char *cmd)
1860 {
1861   size_t i;
1862
1863   for (i = 0; i < list_of_commands_len; i++)
1864     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1865       return (&list_of_commands[i]);
1866   return NULL;
1867 }
1868
1869 /* We currently use the index in the `list_of_commands' array as a bit position
1870  * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
1871  * outside these functions so that switching to a more elegant storage method
1872  * is easily possible. */
1873 static ssize_t find_command_index (const char *cmd) /* {{{ */
1874 {
1875   size_t i;
1876
1877   for (i = 0; i < list_of_commands_len; i++)
1878     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1879       return ((ssize_t) i);
1880   return (-1);
1881 } /* }}} ssize_t find_command_index */
1882
1883 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
1884     const char *cmd)
1885 {
1886   ssize_t i;
1887
1888   if (JOURNAL_REPLAY(sock))
1889     return (1);
1890
1891   if (cmd == NULL)
1892     return (-1);
1893
1894   if ((strcasecmp ("QUIT", cmd) == 0)
1895       || (strcasecmp ("HELP", cmd) == 0))
1896     return (1);
1897   else if (strcmp (".", cmd) == 0)
1898     cmd = "BATCH";
1899
1900   i = find_command_index (cmd);
1901   if (i < 0)
1902     return (-1);
1903   assert (i < 32);
1904
1905   if ((sock->permissions & (1 << i)) != 0)
1906     return (1);
1907   return (0);
1908 } /* }}} int socket_permission_check */
1909
1910 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
1911     const char *cmd)
1912 {
1913   ssize_t i;
1914
1915   i = find_command_index (cmd);
1916   if (i < 0)
1917     return (-1);
1918   assert (i < 32);
1919
1920   sock->permissions |= (1 << i);
1921   return (0);
1922 } /* }}} int socket_permission_add */
1923
1924 /* check whether commands are received in the expected context */
1925 static int command_check_context(listen_socket_t *sock, command_t *cmd)
1926 {
1927   if (JOURNAL_REPLAY(sock))
1928     return (cmd->context & CMD_CONTEXT_JOURNAL);
1929   else if (sock->batch_start)
1930     return (cmd->context & CMD_CONTEXT_BATCH);
1931   else
1932     return (cmd->context & CMD_CONTEXT_CLIENT);
1933
1934   /* NOTREACHED */
1935   assert(1==0);
1936 }
1937
1938 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1939 {
1940   int status;
1941   char *cmd_str;
1942   char *resp_txt;
1943   command_t *help = NULL;
1944
1945   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1946   if (status == 0)
1947     help = find_command(cmd_str);
1948
1949   if (help && (help->syntax || help->help))
1950   {
1951     char tmp[CMD_MAX];
1952
1953     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1954     resp_txt = tmp;
1955
1956     if (help->syntax)
1957       add_response_info(sock, "Usage: %s\n", help->syntax);
1958
1959     if (help->help)
1960       add_response_info(sock, "%s\n", help->help);
1961   }
1962   else
1963   {
1964     size_t i;
1965
1966     resp_txt = "Command overview\n";
1967
1968     for (i = 0; i < list_of_commands_len; i++)
1969     {
1970       if (list_of_commands[i].syntax == NULL)
1971         continue;
1972       add_response_info (sock, "%s", list_of_commands[i].syntax);
1973     }
1974   }
1975
1976   return send_response(sock, RESP_OK, resp_txt);
1977 } /* }}} int handle_request_help */
1978
1979 static int handle_request (DISPATCH_PROTO) /* {{{ */
1980 {
1981   char *buffer_ptr = buffer;
1982   char *cmd_str = NULL;
1983   command_t *cmd = NULL;
1984   int status;
1985
1986   assert (buffer[buffer_size - 1] == '\0');
1987
1988   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1989   if (status != 0)
1990   {
1991     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1992     return (-1);
1993   }
1994
1995   if (sock != NULL && sock->batch_start)
1996     sock->batch_cmd++;
1997
1998   cmd = find_command(cmd_str);
1999   if (!cmd)
2000     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
2001
2002   if (!socket_permission_check (sock, cmd->cmd))
2003     return send_response(sock, RESP_ERR, "Permission denied.\n");
2004
2005   if (!command_check_context(sock, cmd))
2006     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
2007
2008   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
2009 } /* }}} int handle_request */
2010
2011 static void journal_set_free (journal_set *js) /* {{{ */
2012 {
2013   if (js == NULL)
2014     return;
2015
2016   rrd_free_ptrs((void ***) &js->files, &js->files_num);
2017
2018   free(js);
2019 } /* }}} journal_set_free */
2020
2021 static void journal_set_remove (journal_set *js) /* {{{ */
2022 {
2023   if (js == NULL)
2024     return;
2025
2026   for (uint i=0; i < js->files_num; i++)
2027   {
2028     RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
2029     unlink(js->files[i]);
2030   }
2031 } /* }}} journal_set_remove */
2032
2033 /* close current journal file handle.
2034  * MUST hold journal_lock before calling */
2035 static void journal_close(void) /* {{{ */
2036 {
2037   if (journal_fh != NULL)
2038   {
2039     if (fclose(journal_fh) != 0)
2040       RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
2041   }
2042
2043   journal_fh = NULL;
2044   journal_size = 0;
2045 } /* }}} journal_close */
2046
2047 /* MUST hold journal_lock before calling */
2048 static void journal_new_file(void) /* {{{ */
2049 {
2050   struct timeval now;
2051   int  new_fd;
2052   char new_file[PATH_MAX + 1];
2053
2054   assert(journal_dir != NULL);
2055   assert(journal_cur != NULL);
2056
2057   journal_close();
2058
2059   gettimeofday(&now, NULL);
2060   /* this format assures that the files sort in strcmp() order */
2061   snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
2062            journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
2063
2064   new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
2065                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
2066   if (new_fd < 0)
2067     goto error;
2068
2069   journal_fh = fdopen(new_fd, "a");
2070   if (journal_fh == NULL)
2071     goto error;
2072
2073   journal_size = ftell(journal_fh);
2074   RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
2075
2076   /* record the file in the journal set */
2077   rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
2078
2079   return;
2080
2081 error:
2082   RRDD_LOG(LOG_CRIT,
2083            "JOURNALING DISABLED: Error while trying to create %s : %s",
2084            new_file, rrd_strerror(errno));
2085   RRDD_LOG(LOG_CRIT,
2086            "JOURNALING DISABLED: All values will be flushed at shutdown");
2087
2088   close(new_fd);
2089   config_flush_at_shutdown = 1;
2090
2091 } /* }}} journal_new_file */
2092
2093 /* MUST NOT hold journal_lock before calling this */
2094 static void journal_rotate(void) /* {{{ */
2095 {
2096   journal_set *old_js = NULL;
2097
2098   if (journal_dir == NULL)
2099     return;
2100
2101   RRDD_LOG(LOG_DEBUG, "rotating journals");
2102
2103   pthread_mutex_lock(&stats_lock);
2104   ++stats_journal_rotate;
2105   pthread_mutex_unlock(&stats_lock);
2106
2107   pthread_mutex_lock(&journal_lock);
2108
2109   journal_close();
2110
2111   /* rotate the journal sets */
2112   old_js = journal_old;
2113   journal_old = journal_cur;
2114   journal_cur = calloc(1, sizeof(journal_set));
2115
2116   if (journal_cur != NULL)
2117     journal_new_file();
2118   else
2119     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2120
2121   pthread_mutex_unlock(&journal_lock);
2122
2123   journal_set_remove(old_js);
2124   journal_set_free  (old_js);
2125
2126 } /* }}} static void journal_rotate */
2127
2128 /* MUST hold journal_lock when calling */
2129 static void journal_done(void) /* {{{ */
2130 {
2131   if (journal_cur == NULL)
2132     return;
2133
2134   journal_close();
2135
2136   if (config_flush_at_shutdown)
2137   {
2138     RRDD_LOG(LOG_INFO, "removing journals");
2139     journal_set_remove(journal_old);
2140     journal_set_remove(journal_cur);
2141   }
2142   else
2143   {
2144     RRDD_LOG(LOG_INFO, "expedited shutdown; "
2145              "journals will be used at next startup");
2146   }
2147
2148   journal_set_free(journal_cur);
2149   journal_set_free(journal_old);
2150   free(journal_dir);
2151
2152 } /* }}} static void journal_done */
2153
2154 static int journal_write(char *cmd, char *args) /* {{{ */
2155 {
2156   int chars;
2157
2158   if (journal_fh == NULL)
2159     return 0;
2160
2161   pthread_mutex_lock(&journal_lock);
2162   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
2163   journal_size += chars;
2164
2165   if (journal_size > JOURNAL_MAX)
2166     journal_new_file();
2167
2168   pthread_mutex_unlock(&journal_lock);
2169
2170   if (chars > 0)
2171   {
2172     pthread_mutex_lock(&stats_lock);
2173     stats_journal_bytes += chars;
2174     pthread_mutex_unlock(&stats_lock);
2175   }
2176
2177   return chars;
2178 } /* }}} static int journal_write */
2179
2180 static int journal_replay (const char *file) /* {{{ */
2181 {
2182   FILE *fh;
2183   int entry_cnt = 0;
2184   int fail_cnt = 0;
2185   uint64_t line = 0;
2186   char entry[CMD_MAX];
2187   time_t now;
2188
2189   if (file == NULL) return 0;
2190
2191   {
2192     char *reason = "unknown error";
2193     int status = 0;
2194     struct stat statbuf;
2195
2196     memset(&statbuf, 0, sizeof(statbuf));
2197     if (stat(file, &statbuf) != 0)
2198     {
2199       reason = "stat error";
2200       status = errno;
2201     }
2202     else if (!S_ISREG(statbuf.st_mode))
2203     {
2204       reason = "not a regular file";
2205       status = EPERM;
2206     }
2207     if (statbuf.st_uid != daemon_uid)
2208     {
2209       reason = "not owned by daemon user";
2210       status = EACCES;
2211     }
2212     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2213     {
2214       reason = "must not be user/group writable";
2215       status = EACCES;
2216     }
2217
2218     if (status != 0)
2219     {
2220       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2221                file, rrd_strerror(status), reason);
2222       return 0;
2223     }
2224   }
2225
2226   fh = fopen(file, "r");
2227   if (fh == NULL)
2228   {
2229     if (errno != ENOENT)
2230       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2231                file, rrd_strerror(errno));
2232     return 0;
2233   }
2234   else
2235     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2236
2237   now = time(NULL);
2238
2239   while(!feof(fh))
2240   {
2241     size_t entry_len;
2242
2243     ++line;
2244     if (fgets(entry, sizeof(entry), fh) == NULL)
2245       break;
2246     entry_len = strlen(entry);
2247
2248     /* check \n termination in case journal writing crashed mid-line */
2249     if (entry_len == 0)
2250       continue;
2251     else if (entry[entry_len - 1] != '\n')
2252     {
2253       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2254       ++fail_cnt;
2255       continue;
2256     }
2257
2258     entry[entry_len - 1] = '\0';
2259
2260     if (handle_request(NULL, now, entry, entry_len) == 0)
2261       ++entry_cnt;
2262     else
2263       ++fail_cnt;
2264   }
2265
2266   fclose(fh);
2267
2268   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2269            entry_cnt, fail_cnt);
2270
2271   return entry_cnt > 0 ? 1 : 0;
2272 } /* }}} static int journal_replay */
2273
2274 static int journal_sort(const void *v1, const void *v2)
2275 {
2276   char **jn1 = (char **) v1;
2277   char **jn2 = (char **) v2;
2278
2279   return strcmp(*jn1,*jn2);
2280 }
2281
2282 static void journal_init(void) /* {{{ */
2283 {
2284   int had_journal = 0;
2285   DIR *dir;
2286   struct dirent *dent;
2287   char path[PATH_MAX+1];
2288
2289   if (journal_dir == NULL) return;
2290
2291   pthread_mutex_lock(&journal_lock);
2292
2293   journal_cur = calloc(1, sizeof(journal_set));
2294   if (journal_cur == NULL)
2295   {
2296     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2297     return;
2298   }
2299
2300   RRDD_LOG(LOG_INFO, "checking for journal files");
2301
2302   /* Handle old journal files during transition.  This gives them the
2303    * correct sort order.  TODO: remove after first release
2304    */
2305   {
2306     char old_path[PATH_MAX+1];
2307     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2308     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2309     rename(old_path, path);
2310
2311     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE        );
2312     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2313     rename(old_path, path);
2314   }
2315
2316   dir = opendir(journal_dir);
2317   if (!dir) {
2318     RRDD_LOG(LOG_CRIT, "journal_init: opendir(%s) failed\n", journal_dir);
2319     return;
2320   }
2321   while ((dent = readdir(dir)) != NULL)
2322   {
2323     /* looks like a journal file? */
2324     if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2325       continue;
2326
2327     snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2328
2329     if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2330     {
2331       RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2332                dent->d_name);
2333       break;
2334     }
2335   }
2336   closedir(dir);
2337
2338   qsort(journal_cur->files, journal_cur->files_num,
2339         sizeof(journal_cur->files[0]), journal_sort);
2340
2341   for (uint i=0; i < journal_cur->files_num; i++)
2342     had_journal += journal_replay(journal_cur->files[i]);
2343
2344   journal_new_file();
2345
2346   /* it must have been a crash.  start a flush */
2347   if (had_journal && config_flush_at_shutdown)
2348     flush_old_values(-1);
2349
2350   pthread_mutex_unlock(&journal_lock);
2351
2352   RRDD_LOG(LOG_INFO, "journal processing complete");
2353
2354 } /* }}} static void journal_init */
2355
2356 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2357 {
2358   assert(sock != NULL);
2359
2360   free(sock->rbuf);  sock->rbuf = NULL;
2361   free(sock->wbuf);  sock->wbuf = NULL;
2362   free(sock);
2363 } /* }}} void free_listen_socket */
2364
2365 static void close_connection(listen_socket_t *sock) /* {{{ */
2366 {
2367   if (sock->fd >= 0)
2368   {
2369     close(sock->fd);
2370     sock->fd = -1;
2371   }
2372
2373   free_listen_socket(sock);
2374
2375 } /* }}} void close_connection */
2376
2377 static void *connection_thread_main (void *args) /* {{{ */
2378 {
2379   listen_socket_t *sock;
2380   int fd;
2381
2382   sock = (listen_socket_t *) args;
2383   fd = sock->fd;
2384
2385   /* init read buffers */
2386   sock->next_read = sock->next_cmd = 0;
2387   sock->rbuf = malloc(RBUF_SIZE);
2388   if (sock->rbuf == NULL)
2389   {
2390     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2391     close_connection(sock);
2392     return NULL;
2393   }
2394
2395   pthread_mutex_lock (&connection_threads_lock);
2396   connection_threads_num++;
2397   pthread_mutex_unlock (&connection_threads_lock);
2398
2399   while (state == RUNNING)
2400   {
2401     char *cmd;
2402     ssize_t cmd_len;
2403     ssize_t rbytes;
2404     time_t now;
2405
2406     struct pollfd pollfd;
2407     int status;
2408
2409     pollfd.fd = fd;
2410     pollfd.events = POLLIN | POLLPRI;
2411     pollfd.revents = 0;
2412
2413     status = poll (&pollfd, 1, /* timeout = */ 500);
2414     if (state != RUNNING)
2415       break;
2416     else if (status == 0) /* timeout */
2417       continue;
2418     else if (status < 0) /* error */
2419     {
2420       status = errno;
2421       if (status != EINTR)
2422         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2423       continue;
2424     }
2425
2426     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2427       break;
2428     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2429     {
2430       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2431           "poll(2) returned something unexpected: %#04hx",
2432           pollfd.revents);
2433       break;
2434     }
2435
2436     rbytes = read(fd, sock->rbuf + sock->next_read,
2437                   RBUF_SIZE - sock->next_read);
2438     if (rbytes < 0)
2439     {
2440       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2441       break;
2442     }
2443     else if (rbytes == 0)
2444       break; /* eof */
2445
2446     sock->next_read += rbytes;
2447
2448     if (sock->batch_start)
2449       now = sock->batch_start;
2450     else
2451       now = time(NULL);
2452
2453     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2454     {
2455       status = handle_request (sock, now, cmd, cmd_len+1);
2456       if (status != 0)
2457         goto out_close;
2458     }
2459   }
2460
2461 out_close:
2462   close_connection(sock);
2463
2464   /* Remove this thread from the connection threads list */
2465   pthread_mutex_lock (&connection_threads_lock);
2466   connection_threads_num--;
2467   if (connection_threads_num <= 0)
2468     pthread_cond_broadcast(&connection_threads_done);
2469   pthread_mutex_unlock (&connection_threads_lock);
2470
2471   return (NULL);
2472 } /* }}} void *connection_thread_main */
2473
2474 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2475 {
2476   int fd;
2477   struct sockaddr_un sa;
2478   listen_socket_t *temp;
2479   int status;
2480   const char *path;
2481   char *path_copy, *dir;
2482
2483   path = sock->addr;
2484   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2485     path += strlen("unix:");
2486
2487   /* dirname may modify its argument */
2488   path_copy = strdup(path);
2489   if (path_copy == NULL)
2490   {
2491     fprintf(stderr, "rrdcached: strdup(): %s\n",
2492         rrd_strerror(errno));
2493     return (-1);
2494   }
2495
2496   dir = dirname(path_copy);
2497   if (rrd_mkdir_p(dir, 0777) != 0)
2498   {
2499     fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2500         dir, rrd_strerror(errno));
2501     return (-1);
2502   }
2503
2504   free(path_copy);
2505
2506   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2507       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2508   if (temp == NULL)
2509   {
2510     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2511     return (-1);
2512   }
2513   listen_fds = temp;
2514   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2515
2516   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2517   if (fd < 0)
2518   {
2519     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2520              rrd_strerror(errno));
2521     return (-1);
2522   }
2523
2524   memset (&sa, 0, sizeof (sa));
2525   sa.sun_family = AF_UNIX;
2526   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2527
2528   /* if we've gotten this far, we own the pid file.  any daemon started
2529    * with the same args must not be alive.  therefore, ensure that we can
2530    * create the socket...
2531    */
2532   unlink(path);
2533
2534   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2535   if (status != 0)
2536   {
2537     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2538              path, rrd_strerror(errno));
2539     close (fd);
2540     return (-1);
2541   }
2542
2543   /* tweak the sockets group ownership */
2544   if (sock->socket_group != (gid_t)-1)
2545   {
2546     if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2547          (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2548     {
2549       fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2550     }
2551   }
2552
2553   if (sock->socket_permissions != (mode_t)-1)
2554   {
2555     if (chmod(path, sock->socket_permissions) != 0)
2556       fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2557           (unsigned int)sock->socket_permissions, strerror(errno));
2558   }
2559
2560   status = listen (fd, /* backlog = */ 10);
2561   if (status != 0)
2562   {
2563     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2564              path, rrd_strerror(errno));
2565     close (fd);
2566     unlink (path);
2567     return (-1);
2568   }
2569
2570   listen_fds[listen_fds_num].fd = fd;
2571   listen_fds[listen_fds_num].family = PF_UNIX;
2572   strncpy(listen_fds[listen_fds_num].addr, path,
2573           sizeof (listen_fds[listen_fds_num].addr) - 1);
2574   listen_fds_num++;
2575
2576   return (0);
2577 } /* }}} int open_listen_socket_unix */
2578
2579 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2580 {
2581   struct addrinfo ai_hints;
2582   struct addrinfo *ai_res;
2583   struct addrinfo *ai_ptr;
2584   char addr_copy[NI_MAXHOST];
2585   char *addr;
2586   char *port;
2587   int status;
2588
2589   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2590   addr_copy[sizeof (addr_copy) - 1] = 0;
2591   addr = addr_copy;
2592
2593   memset (&ai_hints, 0, sizeof (ai_hints));
2594   ai_hints.ai_flags = 0;
2595 #ifdef AI_ADDRCONFIG
2596   ai_hints.ai_flags |= AI_ADDRCONFIG;
2597 #endif
2598   ai_hints.ai_family = AF_UNSPEC;
2599   ai_hints.ai_socktype = SOCK_STREAM;
2600
2601   port = NULL;
2602   if (*addr == '[') /* IPv6+port format */
2603   {
2604     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2605     addr++;
2606
2607     port = strchr (addr, ']');
2608     if (port == NULL)
2609     {
2610       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2611       return (-1);
2612     }
2613     *port = 0;
2614     port++;
2615
2616     if (*port == ':')
2617       port++;
2618     else if (*port == 0)
2619       port = NULL;
2620     else
2621     {
2622       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2623       return (-1);
2624     }
2625   } /* if (*addr == '[') */
2626   else
2627   {
2628     port = rindex(addr, ':');
2629     if (port != NULL)
2630     {
2631       *port = 0;
2632       port++;
2633     }
2634   }
2635   ai_res = NULL;
2636   status = getaddrinfo (addr,
2637                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2638                         &ai_hints, &ai_res);
2639   if (status != 0)
2640   {
2641     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2642              addr, gai_strerror (status));
2643     return (-1);
2644   }
2645
2646   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2647   {
2648     int fd;
2649     listen_socket_t *temp;
2650     int one = 1;
2651
2652     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2653         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2654     if (temp == NULL)
2655     {
2656       fprintf (stderr,
2657                "rrdcached: open_listen_socket_network: realloc failed.\n");
2658       continue;
2659     }
2660     listen_fds = temp;
2661     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2662
2663     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2664     if (fd < 0)
2665     {
2666       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2667                rrd_strerror(errno));
2668       continue;
2669     }
2670
2671     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2672
2673     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2674     if (status != 0)
2675     {
2676       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2677                sock->addr, rrd_strerror(errno));
2678       close (fd);
2679       continue;
2680     }
2681
2682     status = listen (fd, /* backlog = */ 10);
2683     if (status != 0)
2684     {
2685       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2686                sock->addr, rrd_strerror(errno));
2687       close (fd);
2688       freeaddrinfo(ai_res);
2689       return (-1);
2690     }
2691
2692     listen_fds[listen_fds_num].fd = fd;
2693     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2694     listen_fds_num++;
2695   } /* for (ai_ptr) */
2696
2697   freeaddrinfo(ai_res);
2698   return (0);
2699 } /* }}} static int open_listen_socket_network */
2700
2701 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2702 {
2703   assert(sock != NULL);
2704   assert(sock->addr != NULL);
2705
2706   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2707       || sock->addr[0] == '/')
2708     return (open_listen_socket_unix(sock));
2709   else
2710     return (open_listen_socket_network(sock));
2711 } /* }}} int open_listen_socket */
2712
2713 static int close_listen_sockets (void) /* {{{ */
2714 {
2715   size_t i;
2716
2717   for (i = 0; i < listen_fds_num; i++)
2718   {
2719     close (listen_fds[i].fd);
2720
2721     if (listen_fds[i].family == PF_UNIX)
2722       unlink(listen_fds[i].addr);
2723   }
2724
2725   free (listen_fds);
2726   listen_fds = NULL;
2727   listen_fds_num = 0;
2728
2729   return (0);
2730 } /* }}} int close_listen_sockets */
2731
2732 static void *listen_thread_main (void UNUSED(*args)) /* {{{ */
2733 {
2734   struct pollfd *pollfds;
2735   int pollfds_num;
2736   int status;
2737   int i;
2738
2739   if (listen_fds_num < 1)
2740   {
2741     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2742     return (NULL);
2743   }
2744
2745   pollfds_num = listen_fds_num;
2746   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2747   if (pollfds == NULL)
2748   {
2749     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2750     return (NULL);
2751   }
2752   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2753
2754   RRDD_LOG(LOG_INFO, "listening for connections");
2755
2756   while (state == RUNNING)
2757   {
2758     for (i = 0; i < pollfds_num; i++)
2759     {
2760       pollfds[i].fd = listen_fds[i].fd;
2761       pollfds[i].events = POLLIN | POLLPRI;
2762       pollfds[i].revents = 0;
2763     }
2764
2765     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2766     if (state != RUNNING)
2767       break;
2768     else if (status == 0) /* timeout */
2769       continue;
2770     else if (status < 0) /* error */
2771     {
2772       status = errno;
2773       if (status != EINTR)
2774       {
2775         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2776       }
2777       continue;
2778     }
2779
2780     for (i = 0; i < pollfds_num; i++)
2781     {
2782       listen_socket_t *client_sock;
2783       struct sockaddr_storage client_sa;
2784       socklen_t client_sa_size;
2785       pthread_t tid;
2786       pthread_attr_t attr;
2787
2788       if (pollfds[i].revents == 0)
2789         continue;
2790
2791       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2792       {
2793         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2794             "poll(2) returned something unexpected for listen FD #%i.",
2795             pollfds[i].fd);
2796         continue;
2797       }
2798
2799       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2800       if (client_sock == NULL)
2801       {
2802         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2803         continue;
2804       }
2805       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2806
2807       client_sa_size = sizeof (client_sa);
2808       client_sock->fd = accept (pollfds[i].fd,
2809           (struct sockaddr *) &client_sa, &client_sa_size);
2810       if (client_sock->fd < 0)
2811       {
2812         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2813         free(client_sock);
2814         continue;
2815       }
2816
2817       pthread_attr_init (&attr);
2818       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2819
2820       status = pthread_create (&tid, &attr, connection_thread_main,
2821                                client_sock);
2822       if (status != 0)
2823       {
2824         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2825         close_connection(client_sock);
2826         continue;
2827       }
2828     } /* for (pollfds_num) */
2829   } /* while (state == RUNNING) */
2830
2831   RRDD_LOG(LOG_INFO, "starting shutdown");
2832
2833   close_listen_sockets ();
2834
2835   pthread_mutex_lock (&connection_threads_lock);
2836   while (connection_threads_num > 0)
2837     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2838   pthread_mutex_unlock (&connection_threads_lock);
2839
2840   free(pollfds);
2841
2842   return (NULL);
2843 } /* }}} void *listen_thread_main */
2844
2845 static int daemonize (void) /* {{{ */
2846 {
2847   int pid_fd;
2848   char *base_dir;
2849
2850   daemon_uid = geteuid();
2851
2852   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2853   if (pid_fd < 0)
2854     pid_fd = check_pidfile();
2855   if (pid_fd < 0)
2856     return pid_fd;
2857
2858   /* open all the listen sockets */
2859   if (config_listen_address_list_len > 0)
2860   {
2861     for (size_t i = 0; i < config_listen_address_list_len; i++)
2862       open_listen_socket (config_listen_address_list[i]);
2863
2864     rrd_free_ptrs((void ***) &config_listen_address_list,
2865                   &config_listen_address_list_len);
2866   }
2867   else
2868   {
2869     listen_socket_t sock;
2870     memset(&sock, 0, sizeof(sock));
2871     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2872     open_listen_socket (&sock);
2873   }
2874
2875   if (listen_fds_num < 1)
2876   {
2877     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2878     goto error;
2879   }
2880
2881   if (!stay_foreground)
2882   {
2883     pid_t child;
2884
2885     child = fork ();
2886     if (child < 0)
2887     {
2888       fprintf (stderr, "daemonize: fork(2) failed.\n");
2889       goto error;
2890     }
2891     else if (child > 0)
2892       exit(0);
2893
2894     /* Become session leader */
2895     setsid ();
2896
2897     /* Open the first three file descriptors to /dev/null */
2898     close (2);
2899     close (1);
2900     close (0);
2901
2902     open ("/dev/null", O_RDWR);
2903     if (dup(0) == -1 || dup(0) == -1){
2904         RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2905     }
2906   } /* if (!stay_foreground) */
2907
2908   /* Change into the /tmp directory. */
2909   base_dir = (config_base_dir != NULL)
2910     ? config_base_dir
2911     : "/tmp";
2912
2913   if (chdir (base_dir) != 0)
2914   {
2915     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2916     goto error;
2917   }
2918
2919   install_signal_handlers();
2920
2921   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2922   RRDD_LOG(LOG_INFO, "starting up");
2923
2924   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2925                                 (GDestroyNotify) free_cache_item);
2926   if (cache_tree == NULL)
2927   {
2928     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2929     goto error;
2930   }
2931
2932   return write_pidfile (pid_fd);
2933
2934 error:
2935   remove_pidfile();
2936   return -1;
2937 } /* }}} int daemonize */
2938
2939 static int cleanup (void) /* {{{ */
2940 {
2941   pthread_cond_broadcast (&flush_cond);
2942   pthread_join (flush_thread, NULL);
2943
2944   pthread_cond_broadcast (&queue_cond);
2945   for (int i = 0; i < config_queue_threads; i++)
2946     pthread_join (queue_threads[i], NULL);
2947
2948   if (config_flush_at_shutdown)
2949   {
2950     assert(cache_queue_head == NULL);
2951     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2952   }
2953
2954   free(queue_threads);
2955   free(config_base_dir);
2956
2957   pthread_mutex_lock(&cache_lock);
2958   g_tree_destroy(cache_tree);
2959
2960   pthread_mutex_lock(&journal_lock);
2961   journal_done();
2962
2963   RRDD_LOG(LOG_INFO, "goodbye");
2964   closelog ();
2965
2966   remove_pidfile ();
2967   free(config_pid_file);
2968
2969   return (0);
2970 } /* }}} int cleanup */
2971
2972 static int read_options (int argc, char **argv) /* {{{ */
2973 {
2974   int option;
2975   int status = 0;
2976
2977   char **permissions = NULL;
2978   size_t permissions_len = 0;
2979
2980   gid_t  socket_group = (gid_t)-1;
2981   mode_t socket_permissions = (mode_t)-1;
2982
2983   while ((option = getopt(argc, argv, "gl:s:m:P:f:w:z:t:Bb:p:Fj:a:h?")) != -1)
2984   {
2985     switch (option)
2986     {
2987       case 'g':
2988         stay_foreground=1;
2989         break;
2990
2991       case 'l':
2992       {
2993         listen_socket_t *new;
2994
2995         new = malloc(sizeof(listen_socket_t));
2996         if (new == NULL)
2997         {
2998           fprintf(stderr, "read_options: malloc failed.\n");
2999           return(2);
3000         }
3001         memset(new, 0, sizeof(listen_socket_t));
3002
3003         strncpy(new->addr, optarg, sizeof(new->addr)-1);
3004
3005         /* Add permissions to the socket {{{ */
3006         if (permissions_len != 0)
3007         {
3008           size_t i;
3009           for (i = 0; i < permissions_len; i++)
3010           {
3011             status = socket_permission_add (new, permissions[i]);
3012             if (status != 0)
3013             {
3014               fprintf (stderr, "read_options: Adding permission \"%s\" to "
3015                   "socket failed. Most likely, this permission doesn't "
3016                   "exist. Check your command line.\n", permissions[i]);
3017               status = 4;
3018             }
3019           }
3020         }
3021         else /* if (permissions_len == 0) */
3022         {
3023           /* Add permission for ALL commands to the socket. */
3024           size_t i;
3025           for (i = 0; i < list_of_commands_len; i++)
3026           {
3027             status = socket_permission_add (new, list_of_commands[i].cmd);
3028             if (status != 0)
3029             {
3030               fprintf (stderr, "read_options: Adding permission \"%s\" to "
3031                   "socket failed. This should never happen, ever! Sorry.\n",
3032                   permissions[i]);
3033               status = 4;
3034             }
3035           }
3036         }
3037         /* }}} Done adding permissions. */
3038
3039         new->socket_group = socket_group;
3040         new->socket_permissions = socket_permissions;
3041
3042         if (!rrd_add_ptr((void ***)&config_listen_address_list,
3043                          &config_listen_address_list_len, new))
3044         {
3045           fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
3046           return (2);
3047         }
3048       }
3049       break;
3050
3051       /* set socket group permissions */
3052       case 's':
3053       {
3054         gid_t group_gid;
3055         struct group *grp;
3056
3057         group_gid = strtoul(optarg, NULL, 10);
3058         if (errno != EINVAL && group_gid>0)
3059         {
3060           /* we were passed a number */
3061           grp = getgrgid(group_gid);
3062         }
3063         else
3064         {
3065           grp = getgrnam(optarg);
3066         }
3067
3068         if (grp)
3069         {
3070           socket_group = grp->gr_gid;
3071         }
3072         else
3073         {
3074           /* no idea what the user wanted... */
3075           fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
3076           return (5);
3077         }
3078       }
3079       break;
3080
3081       /* set socket file permissions */
3082       case 'm':
3083       {
3084         long  tmp;
3085         char *endptr = NULL;
3086
3087         tmp = strtol (optarg, &endptr, 8);
3088         if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
3089             || (tmp > 07777) || (tmp < 0)) {
3090           fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
3091               optarg);
3092           return (5);
3093         }
3094
3095         socket_permissions = (mode_t)tmp;
3096       }
3097       break;
3098
3099       case 'P':
3100       {
3101         char *optcopy;
3102         char *saveptr;
3103         char *dummy;
3104         char *ptr;
3105
3106         rrd_free_ptrs ((void *) &permissions, &permissions_len);
3107
3108         optcopy = strdup (optarg);
3109         dummy = optcopy;
3110         saveptr = NULL;
3111         while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
3112         {
3113           dummy = NULL;
3114           rrd_add_strdup ((void *) &permissions, &permissions_len, ptr);
3115         }
3116
3117         free (optcopy);
3118       }
3119       break;
3120
3121       case 'f':
3122       {
3123         int temp;
3124
3125         temp = atoi (optarg);
3126         if (temp > 0)
3127           config_flush_interval = temp;
3128         else
3129         {
3130           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
3131           status = 3;
3132         }
3133       }
3134       break;
3135
3136       case 'w':
3137       {
3138         int temp;
3139
3140         temp = atoi (optarg);
3141         if (temp > 0)
3142           config_write_interval = temp;
3143         else
3144         {
3145           fprintf (stderr, "Invalid write interval: %s\n", optarg);
3146           status = 2;
3147         }
3148       }
3149       break;
3150
3151       case 'z':
3152       {
3153         int temp;
3154
3155         temp = atoi(optarg);
3156         if (temp > 0)
3157           config_write_jitter = temp;
3158         else
3159         {
3160           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
3161           status = 2;
3162         }
3163
3164         break;
3165       }
3166
3167       case 't':
3168       {
3169         int threads;
3170         threads = atoi(optarg);
3171         if (threads >= 1)
3172           config_queue_threads = threads;
3173         else
3174         {
3175           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
3176           return 1;
3177         }
3178       }
3179       break;
3180
3181       case 'B':
3182         config_write_base_only = 1;
3183         break;
3184
3185       case 'b':
3186       {
3187         size_t len;
3188         char base_realpath[PATH_MAX];
3189
3190         if (config_base_dir != NULL)
3191           free (config_base_dir);
3192         config_base_dir = strdup (optarg);
3193         if (config_base_dir == NULL)
3194         {
3195           fprintf (stderr, "read_options: strdup failed.\n");
3196           return (3);
3197         }
3198
3199         if (rrd_mkdir_p (config_base_dir, 0777) != 0)
3200         {
3201           fprintf (stderr, "Failed to create base directory '%s': %s\n",
3202               config_base_dir, rrd_strerror (errno));
3203           return (3);
3204         }
3205
3206         /* make sure that the base directory is not resolved via
3207          * symbolic links.  this makes some performance-enhancing
3208          * assumptions possible (we don't have to resolve paths
3209          * that start with a "/")
3210          */
3211         if (realpath(config_base_dir, base_realpath) == NULL)
3212         {
3213           fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3214               "%s\n", config_base_dir, rrd_strerror(errno));
3215           return 5;
3216         }
3217
3218         len = strlen (config_base_dir);
3219         while ((len > 0) && (config_base_dir[len - 1] == '/'))
3220         {
3221           config_base_dir[len - 1] = 0;
3222           len--;
3223         }
3224
3225         if (len < 1)
3226         {
3227           fprintf (stderr, "Invalid base directory: %s\n", optarg);
3228           return (4);
3229         }
3230
3231         _config_base_dir_len = len;
3232
3233         len = strlen (base_realpath);
3234         while ((len > 0) && (base_realpath[len - 1] == '/'))
3235         {
3236           base_realpath[len - 1] = '\0';
3237           len--;
3238         }
3239
3240         if (strncmp(config_base_dir,
3241                          base_realpath, sizeof(base_realpath)) != 0)
3242         {
3243           fprintf(stderr,
3244                   "Base directory (-b) resolved via file system links!\n"
3245                   "Please consult rrdcached '-b' documentation!\n"
3246                   "Consider specifying the real directory (%s)\n",
3247                   base_realpath);
3248           return 5;
3249         }
3250       }
3251       break;
3252
3253       case 'p':
3254       {
3255         if (config_pid_file != NULL)
3256           free (config_pid_file);
3257         config_pid_file = strdup (optarg);
3258         if (config_pid_file == NULL)
3259         {
3260           fprintf (stderr, "read_options: strdup failed.\n");
3261           return (3);
3262         }
3263       }
3264       break;
3265
3266       case 'F':
3267         config_flush_at_shutdown = 1;
3268         break;
3269
3270       case 'j':
3271       {
3272         char journal_dir_actual[PATH_MAX];
3273         const char *dir;
3274         dir = journal_dir = strdup(realpath((const char *)optarg, journal_dir_actual));
3275
3276         status = rrd_mkdir_p(dir, 0777);
3277         if (status != 0)
3278         {
3279           fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3280               dir, rrd_strerror(errno));
3281           return 6;
3282         }
3283
3284         if (access(dir, R_OK|W_OK|X_OK) != 0)
3285         {
3286           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3287                   errno ? rrd_strerror(errno) : "");
3288           return 6;
3289         }
3290       }
3291       break;
3292
3293       case 'a':
3294       {
3295         int temp = atoi(optarg);
3296         if (temp > 0)
3297           config_alloc_chunk = temp;
3298         else
3299         {
3300           fprintf(stderr, "Invalid allocation size: %s\n", optarg);
3301           return 10;
3302         }
3303       }
3304       break;
3305
3306       case 'h':
3307       case '?':
3308         printf ("RRDCacheD %s\n"
3309             "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3310             "\n"
3311             "Usage: rrdcached [options]\n"
3312             "\n"
3313             "Valid options are:\n"
3314             "  -l <address>  Socket address to listen to.\n"
3315             "  -P <perms>    Sets the permissions to assign to all following "
3316                             "sockets\n"
3317             "  -w <seconds>  Interval in which to write data.\n"
3318             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
3319             "  -t <threads>  Number of write threads.\n"
3320             "  -f <seconds>  Interval in which to flush dead data.\n"
3321             "  -p <file>     Location of the PID-file.\n"
3322             "  -b <dir>      Base directory to change to.\n"
3323             "  -B            Restrict file access to paths within -b <dir>\n"
3324             "  -g            Do not fork and run in the foreground.\n"
3325             "  -j <dir>      Directory in which to create the journal files.\n"
3326             "  -F            Always flush all updates at shutdown\n"
3327             "  -s <id|name>  Group owner of all following UNIX sockets\n"
3328             "                (the socket will also have read/write permissions "
3329                             "for that group)\n"
3330             "  -m <mode>     File permissions (octal) of all following UNIX "
3331                             "sockets\n"
3332             "  -a <size>     Memory allocation chunk size. Default is 1."
3333             "\n"
3334             "For more information and a detailed description of all options "
3335             "please refer\n"
3336             "to the rrdcached(1) manual page.\n",
3337             VERSION);
3338         if (option == 'h')
3339           status = -1;
3340         else
3341           status = 1;
3342         break;
3343     } /* switch (option) */
3344   } /* while (getopt) */
3345
3346   /* advise the user when values are not sane */
3347   if (config_flush_interval < 2 * config_write_interval)
3348     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3349             " 2x write interval (-w) !\n");
3350   if (config_write_jitter > config_write_interval)
3351     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3352             " write interval (-w) !\n");
3353
3354   if (config_write_base_only && config_base_dir == NULL)
3355     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3356             "  Consult the rrdcached documentation\n");
3357
3358   if (journal_dir == NULL)
3359     config_flush_at_shutdown = 1;
3360
3361   rrd_free_ptrs ((void *) &permissions, &permissions_len);
3362
3363   return (status);
3364 } /* }}} int read_options */
3365
3366 int main (int argc, char **argv)
3367 {
3368   int status;
3369
3370   status = read_options (argc, argv);
3371   if (status != 0)
3372   {
3373     if (status < 0)
3374       status = 0;
3375     return (status);
3376   }
3377
3378   status = daemonize ();
3379   if (status != 0)
3380   {
3381     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3382     return (1);
3383   }
3384
3385   journal_init();
3386
3387   /* start the queue threads */
3388   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3389   if (queue_threads == NULL)
3390   {
3391     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3392     cleanup();
3393     return (1);
3394   }
3395   for (int i = 0; i < config_queue_threads; i++)
3396   {
3397     memset (&queue_threads[i], 0, sizeof (*queue_threads));
3398     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3399     if (status != 0)
3400     {
3401       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3402       cleanup();
3403       return (1);
3404     }
3405   }
3406
3407   /* start the flush thread */
3408   memset(&flush_thread, 0, sizeof(flush_thread));
3409   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3410   if (status != 0)
3411   {
3412     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3413     cleanup();
3414     return (1);
3415   }
3416
3417   listen_thread_main (NULL);
3418   cleanup ();
3419
3420   return (0);
3421 } /* int main */
3422
3423 /*
3424  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3425  */