RRDCacheD: Add the "FETCH" command.
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008-2010 Florian octo Forster
4  * Copyright (C) 2008,2009 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
73
74 #include "rrd.h"
75 #include "rrd_client.h"
76
77 #include <stdlib.h>
78
79 #ifndef WIN32
80 #ifdef HAVE_STDINT_H
81 #  include <stdint.h>
82 #endif
83 #include <unistd.h>
84 #include <strings.h>
85 #include <inttypes.h>
86 #include <sys/socket.h>
87
88 #else
89
90 #endif
91 #include <stdio.h>
92 #include <string.h>
93
94 #include <sys/types.h>
95 #include <sys/stat.h>
96 #include <dirent.h>
97 #include <fcntl.h>
98 #include <signal.h>
99 #include <sys/un.h>
100 #include <netdb.h>
101 #include <poll.h>
102 #include <syslog.h>
103 #include <pthread.h>
104 #include <errno.h>
105 #include <assert.h>
106 #include <sys/time.h>
107 #include <time.h>
108 #include <libgen.h>
109
110 #include <glib-2.0/glib.h>
111 /* }}} */
112
113 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
114
115 #ifndef __GNUC__
116 # define __attribute__(x) /**/
117 #endif
118
119 /*
120  * Types
121  */
122 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
123
124 struct listen_socket_s
125 {
126   int fd;
127   char addr[PATH_MAX + 1];
128   int family;
129
130   /* state for BATCH processing */
131   time_t batch_start;
132   int batch_cmd;
133
134   /* buffered IO */
135   char *rbuf;
136   off_t next_cmd;
137   off_t next_read;
138
139   char *wbuf;
140   ssize_t wbuf_len;
141
142   uint32_t permissions;
143 };
144 typedef struct listen_socket_s listen_socket_t;
145
146 struct command_s;
147 typedef struct command_s command_t;
148 /* note: guard against "unused" warnings in the handlers */
149 #define DISPATCH_PROTO  listen_socket_t *sock   __attribute__((unused)),\
150                         time_t now              __attribute__((unused)),\
151                         char  *buffer           __attribute__((unused)),\
152                         size_t buffer_size      __attribute__((unused))
153
154 #define HANDLER_PROTO   command_t *cmd          __attribute__((unused)),\
155                         DISPATCH_PROTO
156
157 struct command_s {
158   char   *cmd;
159   int (*handler)(HANDLER_PROTO);
160
161   char  context;                /* where we expect to see it */
162 #define CMD_CONTEXT_CLIENT      (1<<0)
163 #define CMD_CONTEXT_BATCH       (1<<1)
164 #define CMD_CONTEXT_JOURNAL     (1<<2)
165 #define CMD_CONTEXT_ANY         (0x7f)
166
167   char *syntax;
168   char *help;
169 };
170
171 struct cache_item_s;
172 typedef struct cache_item_s cache_item_t;
173 struct cache_item_s
174 {
175   char *file;
176   char **values;
177   size_t values_num;
178   time_t last_flush_time;
179   time_t last_update_stamp;
180 #define CI_FLAGS_IN_TREE  (1<<0)
181 #define CI_FLAGS_IN_QUEUE (1<<1)
182   int flags;
183   pthread_cond_t  flushed;
184   cache_item_t *prev;
185   cache_item_t *next;
186 };
187
188 struct callback_flush_data_s
189 {
190   time_t now;
191   time_t abs_timeout;
192   char **keys;
193   size_t keys_num;
194 };
195 typedef struct callback_flush_data_s callback_flush_data_t;
196
197 enum queue_side_e
198 {
199   HEAD,
200   TAIL
201 };
202 typedef enum queue_side_e queue_side_t;
203
204 /* describe a set of journal files */
205 typedef struct {
206   char **files;
207   size_t files_num;
208 } journal_set;
209
210 /* max length of socket command or response */
211 #define CMD_MAX 4096
212 #define RBUF_SIZE (CMD_MAX*2)
213
214 /*
215  * Variables
216  */
217 static int stay_foreground = 0;
218 static uid_t daemon_uid;
219
220 static listen_socket_t *listen_fds = NULL;
221 static size_t listen_fds_num = 0;
222
223 enum {
224   RUNNING,              /* normal operation */
225   FLUSHING,             /* flushing remaining values */
226   SHUTDOWN              /* shutting down */
227 } state = RUNNING;
228
229 static pthread_t *queue_threads;
230 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
231 static int config_queue_threads = 4;
232
233 static pthread_t flush_thread;
234 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
235
236 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
237 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
238 static int connection_threads_num = 0;
239
240 /* Cache stuff */
241 static GTree          *cache_tree = NULL;
242 static cache_item_t   *cache_queue_head = NULL;
243 static cache_item_t   *cache_queue_tail = NULL;
244 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
245
246 static int config_write_interval = 300;
247 static int config_write_jitter   = 0;
248 static int config_flush_interval = 3600;
249 static int config_flush_at_shutdown = 0;
250 static char *config_pid_file = NULL;
251 static char *config_base_dir = NULL;
252 static size_t _config_base_dir_len = 0;
253 static int config_write_base_only = 0;
254
255 static listen_socket_t **config_listen_address_list = NULL;
256 static size_t config_listen_address_list_len = 0;
257
258 static uint64_t stats_queue_length = 0;
259 static uint64_t stats_updates_received = 0;
260 static uint64_t stats_flush_received = 0;
261 static uint64_t stats_updates_written = 0;
262 static uint64_t stats_data_sets_written = 0;
263 static uint64_t stats_journal_bytes = 0;
264 static uint64_t stats_journal_rotate = 0;
265 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
266
267 /* Journaled updates */
268 #define JOURNAL_REPLAY(s) ((s) == NULL)
269 #define JOURNAL_BASE "rrd.journal"
270 static journal_set *journal_cur = NULL;
271 static journal_set *journal_old = NULL;
272 static char *journal_dir = NULL;
273 static FILE *journal_fh = NULL;         /* current journal file handle */
274 static long  journal_size = 0;          /* current journal size */
275 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
276 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
277 static int journal_write(char *cmd, char *args);
278 static void journal_done(void);
279 static void journal_rotate(void);
280
281 /* prototypes for forward refernces */
282 static int handle_request_help (HANDLER_PROTO);
283
284 /* 
285  * Functions
286  */
287 static void sig_common (const char *sig) /* {{{ */
288 {
289   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
290   state = FLUSHING;
291   pthread_cond_broadcast(&flush_cond);
292   pthread_cond_broadcast(&queue_cond);
293 } /* }}} void sig_common */
294
295 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
296 {
297   sig_common("INT");
298 } /* }}} void sig_int_handler */
299
300 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
301 {
302   sig_common("TERM");
303 } /* }}} void sig_term_handler */
304
305 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
306 {
307   config_flush_at_shutdown = 1;
308   sig_common("USR1");
309 } /* }}} void sig_usr1_handler */
310
311 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
312 {
313   config_flush_at_shutdown = 0;
314   sig_common("USR2");
315 } /* }}} void sig_usr2_handler */
316
317 static void install_signal_handlers(void) /* {{{ */
318 {
319   /* These structures are static, because `sigaction' behaves weird if the are
320    * overwritten.. */
321   static struct sigaction sa_int;
322   static struct sigaction sa_term;
323   static struct sigaction sa_pipe;
324   static struct sigaction sa_usr1;
325   static struct sigaction sa_usr2;
326
327   /* Install signal handlers */
328   memset (&sa_int, 0, sizeof (sa_int));
329   sa_int.sa_handler = sig_int_handler;
330   sigaction (SIGINT, &sa_int, NULL);
331
332   memset (&sa_term, 0, sizeof (sa_term));
333   sa_term.sa_handler = sig_term_handler;
334   sigaction (SIGTERM, &sa_term, NULL);
335
336   memset (&sa_pipe, 0, sizeof (sa_pipe));
337   sa_pipe.sa_handler = SIG_IGN;
338   sigaction (SIGPIPE, &sa_pipe, NULL);
339
340   memset (&sa_pipe, 0, sizeof (sa_usr1));
341   sa_usr1.sa_handler = sig_usr1_handler;
342   sigaction (SIGUSR1, &sa_usr1, NULL);
343
344   memset (&sa_usr2, 0, sizeof (sa_usr2));
345   sa_usr2.sa_handler = sig_usr2_handler;
346   sigaction (SIGUSR2, &sa_usr2, NULL);
347
348 } /* }}} void install_signal_handlers */
349
350 static int open_pidfile(char *action, int oflag) /* {{{ */
351 {
352   int fd;
353   const char *file;
354   char *file_copy, *dir;
355
356   file = (config_pid_file != NULL)
357     ? config_pid_file
358     : LOCALSTATEDIR "/run/rrdcached.pid";
359
360   /* dirname may modify its argument */
361   file_copy = strdup(file);
362   if (file_copy == NULL)
363   {
364     fprintf(stderr, "rrdcached: strdup(): %s\n",
365         rrd_strerror(errno));
366     return -1;
367   }
368
369   dir = dirname(file_copy);
370   if (rrd_mkdir_p(dir, 0777) != 0)
371   {
372     fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
373         dir, rrd_strerror(errno));
374     return -1;
375   }
376
377   free(file_copy);
378
379   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
380   if (fd < 0)
381     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
382             action, file, rrd_strerror(errno));
383
384   return(fd);
385 } /* }}} static int open_pidfile */
386
387 /* check existing pid file to see whether a daemon is running */
388 static int check_pidfile(void)
389 {
390   int pid_fd;
391   pid_t pid;
392   char pid_str[16];
393
394   pid_fd = open_pidfile("open", O_RDWR);
395   if (pid_fd < 0)
396     return pid_fd;
397
398   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
399     return -1;
400
401   pid = atoi(pid_str);
402   if (pid <= 0)
403     return -1;
404
405   /* another running process that we can signal COULD be
406    * a competing rrdcached */
407   if (pid != getpid() && kill(pid, 0) == 0)
408   {
409     fprintf(stderr,
410             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
411     close(pid_fd);
412     return -1;
413   }
414
415   lseek(pid_fd, 0, SEEK_SET);
416   if (ftruncate(pid_fd, 0) == -1)
417   {
418     fprintf(stderr,
419             "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
420     close(pid_fd);
421     return -1;
422   }
423
424   fprintf(stderr,
425           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
426           "rrdcached: starting normally.\n", pid);
427
428   return pid_fd;
429 } /* }}} static int check_pidfile */
430
431 static int write_pidfile (int fd) /* {{{ */
432 {
433   pid_t pid;
434   FILE *fh;
435
436   pid = getpid ();
437
438   fh = fdopen (fd, "w");
439   if (fh == NULL)
440   {
441     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
442     close(fd);
443     return (-1);
444   }
445
446   fprintf (fh, "%i\n", (int) pid);
447   fclose (fh);
448
449   return (0);
450 } /* }}} int write_pidfile */
451
452 static int remove_pidfile (void) /* {{{ */
453 {
454   char *file;
455   int status;
456
457   file = (config_pid_file != NULL)
458     ? config_pid_file
459     : LOCALSTATEDIR "/run/rrdcached.pid";
460
461   status = unlink (file);
462   if (status == 0)
463     return (0);
464   return (errno);
465 } /* }}} int remove_pidfile */
466
467 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
468 {
469   char *eol;
470
471   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
472                sock->next_read - sock->next_cmd);
473
474   if (eol == NULL)
475   {
476     /* no commands left, move remainder back to front of rbuf */
477     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
478             sock->next_read - sock->next_cmd);
479     sock->next_read -= sock->next_cmd;
480     sock->next_cmd = 0;
481     *len = 0;
482     return NULL;
483   }
484   else
485   {
486     char *cmd = sock->rbuf + sock->next_cmd;
487     *eol = '\0';
488
489     sock->next_cmd = eol - sock->rbuf + 1;
490
491     if (eol > sock->rbuf && *(eol-1) == '\r')
492       *(--eol) = '\0'; /* handle "\r\n" EOL */
493
494     *len = eol - cmd;
495
496     return cmd;
497   }
498
499   /* NOTREACHED */
500   assert(1==0);
501 } /* }}} char *next_cmd */
502
503 /* add the characters directly to the write buffer */
504 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
505 {
506   char *new_buf;
507
508   assert(sock != NULL);
509
510   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
511   if (new_buf == NULL)
512   {
513     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
514     return -1;
515   }
516
517   strncpy(new_buf + sock->wbuf_len, str, len + 1);
518
519   sock->wbuf = new_buf;
520   sock->wbuf_len += len;
521
522   return 0;
523 } /* }}} static int add_to_wbuf */
524
525 /* add the text to the "extra" info that's sent after the status line */
526 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
527 {
528   va_list argp;
529   char buffer[CMD_MAX];
530   int len;
531
532   if (JOURNAL_REPLAY(sock)) return 0;
533   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
534
535   va_start(argp, fmt);
536 #ifdef HAVE_VSNPRINTF
537   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
538 #else
539   len = vsprintf(buffer, fmt, argp);
540 #endif
541   va_end(argp);
542   if (len < 0)
543   {
544     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
545     return -1;
546   }
547
548   return add_to_wbuf(sock, buffer, len);
549 } /* }}} static int add_response_info */
550
551 static int count_lines(char *str) /* {{{ */
552 {
553   int lines = 0;
554
555   if (str != NULL)
556   {
557     while ((str = strchr(str, '\n')) != NULL)
558     {
559       ++lines;
560       ++str;
561     }
562   }
563
564   return lines;
565 } /* }}} static int count_lines */
566
567 /* send the response back to the user.
568  * returns 0 on success, -1 on error
569  * write buffer is always zeroed after this call */
570 static int send_response (listen_socket_t *sock, response_code rc,
571                           char *fmt, ...) /* {{{ */
572 {
573   va_list argp;
574   char buffer[CMD_MAX];
575   int lines;
576   ssize_t wrote;
577   int rclen, len;
578
579   if (JOURNAL_REPLAY(sock)) return rc;
580
581   if (sock->batch_start)
582   {
583     if (rc == RESP_OK)
584       return rc; /* no response on success during BATCH */
585     lines = sock->batch_cmd;
586   }
587   else if (rc == RESP_OK)
588     lines = count_lines(sock->wbuf);
589   else
590     lines = -1;
591
592   rclen = sprintf(buffer, "%d ", lines);
593   va_start(argp, fmt);
594 #ifdef HAVE_VSNPRINTF
595   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
596 #else
597   len = vsprintf(buffer+rclen, fmt, argp);
598 #endif
599   va_end(argp);
600   if (len < 0)
601     return -1;
602
603   len += rclen;
604
605   /* append the result to the wbuf, don't write to the user */
606   if (sock->batch_start)
607     return add_to_wbuf(sock, buffer, len);
608
609   /* first write must be complete */
610   if (len != write(sock->fd, buffer, len))
611   {
612     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
613     return -1;
614   }
615
616   if (sock->wbuf != NULL && rc == RESP_OK)
617   {
618     wrote = 0;
619     while (wrote < sock->wbuf_len)
620     {
621       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
622       if (wb <= 0)
623       {
624         RRDD_LOG(LOG_INFO, "send_response: could not write results");
625         return -1;
626       }
627       wrote += wb;
628     }
629   }
630
631   free(sock->wbuf); sock->wbuf = NULL;
632   sock->wbuf_len = 0;
633
634   return 0;
635 } /* }}} */
636
637 static void wipe_ci_values(cache_item_t *ci, time_t when)
638 {
639   ci->values = NULL;
640   ci->values_num = 0;
641
642   ci->last_flush_time = when;
643   if (config_write_jitter > 0)
644     ci->last_flush_time += (rrd_random() % config_write_jitter);
645 }
646
647 /* remove_from_queue
648  * remove a "cache_item_t" item from the queue.
649  * must hold 'cache_lock' when calling this
650  */
651 static void remove_from_queue(cache_item_t *ci) /* {{{ */
652 {
653   if (ci == NULL) return;
654   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
655
656   if (ci->prev == NULL)
657     cache_queue_head = ci->next; /* reset head */
658   else
659     ci->prev->next = ci->next;
660
661   if (ci->next == NULL)
662     cache_queue_tail = ci->prev; /* reset the tail */
663   else
664     ci->next->prev = ci->prev;
665
666   ci->next = ci->prev = NULL;
667   ci->flags &= ~CI_FLAGS_IN_QUEUE;
668
669   pthread_mutex_lock (&stats_lock);
670   assert (stats_queue_length > 0);
671   stats_queue_length--;
672   pthread_mutex_unlock (&stats_lock);
673
674 } /* }}} static void remove_from_queue */
675
676 /* free the resources associated with the cache_item_t
677  * must hold cache_lock when calling this function
678  */
679 static void *free_cache_item(cache_item_t *ci) /* {{{ */
680 {
681   if (ci == NULL) return NULL;
682
683   remove_from_queue(ci);
684
685   for (size_t i=0; i < ci->values_num; i++)
686     free(ci->values[i]);
687
688   free (ci->values);
689   free (ci->file);
690
691   /* in case anyone is waiting */
692   pthread_cond_broadcast(&ci->flushed);
693   pthread_cond_destroy(&ci->flushed);
694
695   free (ci);
696
697   return NULL;
698 } /* }}} static void *free_cache_item */
699
700 /*
701  * enqueue_cache_item:
702  * `cache_lock' must be acquired before calling this function!
703  */
704 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
705     queue_side_t side)
706 {
707   if (ci == NULL)
708     return (-1);
709
710   if (ci->values_num == 0)
711     return (0);
712
713   if (side == HEAD)
714   {
715     if (cache_queue_head == ci)
716       return 0;
717
718     /* remove if further down in queue */
719     remove_from_queue(ci);
720
721     ci->prev = NULL;
722     ci->next = cache_queue_head;
723     if (ci->next != NULL)
724       ci->next->prev = ci;
725     cache_queue_head = ci;
726
727     if (cache_queue_tail == NULL)
728       cache_queue_tail = cache_queue_head;
729   }
730   else /* (side == TAIL) */
731   {
732     /* We don't move values back in the list.. */
733     if (ci->flags & CI_FLAGS_IN_QUEUE)
734       return (0);
735
736     assert (ci->next == NULL);
737     assert (ci->prev == NULL);
738
739     ci->prev = cache_queue_tail;
740
741     if (cache_queue_tail == NULL)
742       cache_queue_head = ci;
743     else
744       cache_queue_tail->next = ci;
745
746     cache_queue_tail = ci;
747   }
748
749   ci->flags |= CI_FLAGS_IN_QUEUE;
750
751   pthread_cond_signal(&queue_cond);
752   pthread_mutex_lock (&stats_lock);
753   stats_queue_length++;
754   pthread_mutex_unlock (&stats_lock);
755
756   return (0);
757 } /* }}} int enqueue_cache_item */
758
759 /*
760  * tree_callback_flush:
761  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
762  * while this is in progress.
763  */
764 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
765     gpointer data)
766 {
767   cache_item_t *ci;
768   callback_flush_data_t *cfd;
769
770   ci = (cache_item_t *) value;
771   cfd = (callback_flush_data_t *) data;
772
773   if (ci->flags & CI_FLAGS_IN_QUEUE)
774     return FALSE;
775
776   if (ci->values_num > 0
777       && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
778   {
779     enqueue_cache_item (ci, TAIL);
780   }
781   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
782       && (ci->values_num <= 0))
783   {
784     assert ((char *) key == ci->file);
785     if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
786     {
787       RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
788       return (FALSE);
789     }
790   }
791
792   return (FALSE);
793 } /* }}} gboolean tree_callback_flush */
794
795 static int flush_old_values (int max_age)
796 {
797   callback_flush_data_t cfd;
798   size_t k;
799
800   memset (&cfd, 0, sizeof (cfd));
801   /* Pass the current time as user data so that we don't need to call
802    * `time' for each node. */
803   cfd.now = time (NULL);
804   cfd.keys = NULL;
805   cfd.keys_num = 0;
806
807   if (max_age > 0)
808     cfd.abs_timeout = cfd.now - max_age;
809   else
810     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
811
812   /* `tree_callback_flush' will return the keys of all values that haven't
813    * been touched in the last `config_flush_interval' seconds in `cfd'.
814    * The char*'s in this array point to the same memory as ci->file, so we
815    * don't need to free them separately. */
816   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
817
818   for (k = 0; k < cfd.keys_num; k++)
819   {
820     /* should never fail, since we have held the cache_lock
821      * the entire time */
822     assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
823   }
824
825   if (cfd.keys != NULL)
826   {
827     free (cfd.keys);
828     cfd.keys = NULL;
829   }
830
831   return (0);
832 } /* int flush_old_values */
833
834 static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
835 {
836   struct timeval now;
837   struct timespec next_flush;
838   int status;
839
840   gettimeofday (&now, NULL);
841   next_flush.tv_sec = now.tv_sec + config_flush_interval;
842   next_flush.tv_nsec = 1000 * now.tv_usec;
843
844   pthread_mutex_lock(&cache_lock);
845
846   while (state == RUNNING)
847   {
848     gettimeofday (&now, NULL);
849     if ((now.tv_sec > next_flush.tv_sec)
850         || ((now.tv_sec == next_flush.tv_sec)
851           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
852     {
853       RRDD_LOG(LOG_DEBUG, "flushing old values");
854
855       /* Determine the time of the next cache flush. */
856       next_flush.tv_sec = now.tv_sec + config_flush_interval;
857
858       /* Flush all values that haven't been written in the last
859        * `config_write_interval' seconds. */
860       flush_old_values (config_write_interval);
861
862       /* unlock the cache while we rotate so we don't block incoming
863        * updates if the fsync() blocks on disk I/O */
864       pthread_mutex_unlock(&cache_lock);
865       journal_rotate();
866       pthread_mutex_lock(&cache_lock);
867     }
868
869     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
870     if (status != 0 && status != ETIMEDOUT)
871     {
872       RRDD_LOG (LOG_ERR, "flush_thread_main: "
873                 "pthread_cond_timedwait returned %i.", status);
874     }
875   }
876
877   if (config_flush_at_shutdown)
878     flush_old_values (-1); /* flush everything */
879
880   state = SHUTDOWN;
881
882   pthread_mutex_unlock(&cache_lock);
883
884   return NULL;
885 } /* void *flush_thread_main */
886
887 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
888 {
889   pthread_mutex_lock (&cache_lock);
890
891   while (state != SHUTDOWN
892          || (cache_queue_head != NULL && config_flush_at_shutdown))
893   {
894     cache_item_t *ci;
895     char *file;
896     char **values;
897     size_t values_num;
898     int status;
899
900     /* Now, check if there's something to store away. If not, wait until
901      * something comes in. */
902     if (cache_queue_head == NULL)
903     {
904       status = pthread_cond_wait (&queue_cond, &cache_lock);
905       if ((status != 0) && (status != ETIMEDOUT))
906       {
907         RRDD_LOG (LOG_ERR, "queue_thread_main: "
908             "pthread_cond_wait returned %i.", status);
909       }
910     }
911
912     /* Check if a value has arrived. This may be NULL if we timed out or there
913      * was an interrupt such as a signal. */
914     if (cache_queue_head == NULL)
915       continue;
916
917     ci = cache_queue_head;
918
919     /* copy the relevant parts */
920     file = strdup (ci->file);
921     if (file == NULL)
922     {
923       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
924       continue;
925     }
926
927     assert(ci->values != NULL);
928     assert(ci->values_num > 0);
929
930     values = ci->values;
931     values_num = ci->values_num;
932
933     wipe_ci_values(ci, time(NULL));
934     remove_from_queue(ci);
935
936     pthread_mutex_unlock (&cache_lock);
937
938     rrd_clear_error ();
939     status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
940     if (status != 0)
941     {
942       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
943           "rrd_update_r (%s) failed with status %i. (%s)",
944           file, status, rrd_get_error());
945     }
946
947     journal_write("wrote", file);
948
949     /* Search again in the tree.  It's possible someone issued a "FORGET"
950      * while we were writing the update values. */
951     pthread_mutex_lock(&cache_lock);
952     ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
953     if (ci)
954       pthread_cond_broadcast(&ci->flushed);
955     pthread_mutex_unlock(&cache_lock);
956
957     if (status == 0)
958     {
959       pthread_mutex_lock (&stats_lock);
960       stats_updates_written++;
961       stats_data_sets_written += values_num;
962       pthread_mutex_unlock (&stats_lock);
963     }
964
965     rrd_free_ptrs((void ***) &values, &values_num);
966     free(file);
967
968     pthread_mutex_lock (&cache_lock);
969   }
970   pthread_mutex_unlock (&cache_lock);
971
972   return (NULL);
973 } /* }}} void *queue_thread_main */
974
975 static int buffer_get_field (char **buffer_ret, /* {{{ */
976     size_t *buffer_size_ret, char **field_ret)
977 {
978   char *buffer;
979   size_t buffer_pos;
980   size_t buffer_size;
981   char *field;
982   size_t field_size;
983   int status;
984
985   buffer = *buffer_ret;
986   buffer_pos = 0;
987   buffer_size = *buffer_size_ret;
988   field = *buffer_ret;
989   field_size = 0;
990
991   if (buffer_size <= 0)
992     return (-1);
993
994   /* This is ensured by `handle_request'. */
995   assert (buffer[buffer_size - 1] == '\0');
996
997   status = -1;
998   while (buffer_pos < buffer_size)
999   {
1000     /* Check for end-of-field or end-of-buffer */
1001     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1002     {
1003       field[field_size] = 0;
1004       field_size++;
1005       buffer_pos++;
1006       status = 0;
1007       break;
1008     }
1009     /* Handle escaped characters. */
1010     else if (buffer[buffer_pos] == '\\')
1011     {
1012       if (buffer_pos >= (buffer_size - 1))
1013         break;
1014       buffer_pos++;
1015       field[field_size] = buffer[buffer_pos];
1016       field_size++;
1017       buffer_pos++;
1018     }
1019     /* Normal operation */ 
1020     else
1021     {
1022       field[field_size] = buffer[buffer_pos];
1023       field_size++;
1024       buffer_pos++;
1025     }
1026   } /* while (buffer_pos < buffer_size) */
1027
1028   if (status != 0)
1029     return (status);
1030
1031   *buffer_ret = buffer + buffer_pos;
1032   *buffer_size_ret = buffer_size - buffer_pos;
1033   *field_ret = field;
1034
1035   return (0);
1036 } /* }}} int buffer_get_field */
1037
1038 /* if we're restricting writes to the base directory,
1039  * check whether the file falls within the dir
1040  * returns 1 if OK, otherwise 0
1041  */
1042 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1043 {
1044   assert(file != NULL);
1045
1046   if (!config_write_base_only
1047       || JOURNAL_REPLAY(sock)
1048       || config_base_dir == NULL)
1049     return 1;
1050
1051   if (strstr(file, "../") != NULL) goto err;
1052
1053   /* relative paths without "../" are ok */
1054   if (*file != '/') return 1;
1055
1056   /* file must be of the format base + "/" + <1+ char filename> */
1057   if (strlen(file) < _config_base_dir_len + 2) goto err;
1058   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1059   if (*(file + _config_base_dir_len) != '/') goto err;
1060
1061   return 1;
1062
1063 err:
1064   if (sock != NULL && sock->fd >= 0)
1065     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1066
1067   return 0;
1068 } /* }}} static int check_file_access */
1069
1070 /* when using a base dir, convert relative paths to absolute paths.
1071  * if necessary, modifies the "filename" pointer to point
1072  * to the new path created in "tmp".  "tmp" is provided
1073  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1074  *
1075  * this allows us to optimize for the expected case (absolute path)
1076  * with a no-op.
1077  */
1078 static void get_abs_path(char **filename, char *tmp)
1079 {
1080   assert(tmp != NULL);
1081   assert(filename != NULL && *filename != NULL);
1082
1083   if (config_base_dir == NULL || **filename == '/')
1084     return;
1085
1086   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1087   *filename = tmp;
1088 } /* }}} static int get_abs_path */
1089
1090 static int flush_file (const char *filename) /* {{{ */
1091 {
1092   cache_item_t *ci;
1093
1094   pthread_mutex_lock (&cache_lock);
1095
1096   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1097   if (ci == NULL)
1098   {
1099     pthread_mutex_unlock (&cache_lock);
1100     return (ENOENT);
1101   }
1102
1103   if (ci->values_num > 0)
1104   {
1105     /* Enqueue at head */
1106     enqueue_cache_item (ci, HEAD);
1107     pthread_cond_wait(&ci->flushed, &cache_lock);
1108   }
1109
1110   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1111    * may have been purged during our cond_wait() */
1112
1113   pthread_mutex_unlock(&cache_lock);
1114
1115   return (0);
1116 } /* }}} int flush_file */
1117
1118 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1119 {
1120   char *err = "Syntax error.\n";
1121
1122   if (cmd && cmd->syntax)
1123     err = cmd->syntax;
1124
1125   return send_response(sock, RESP_ERR, "Usage: %s", err);
1126 } /* }}} static int syntax_error() */
1127
1128 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1129 {
1130   uint64_t copy_queue_length;
1131   uint64_t copy_updates_received;
1132   uint64_t copy_flush_received;
1133   uint64_t copy_updates_written;
1134   uint64_t copy_data_sets_written;
1135   uint64_t copy_journal_bytes;
1136   uint64_t copy_journal_rotate;
1137
1138   uint64_t tree_nodes_number;
1139   uint64_t tree_depth;
1140
1141   pthread_mutex_lock (&stats_lock);
1142   copy_queue_length       = stats_queue_length;
1143   copy_updates_received   = stats_updates_received;
1144   copy_flush_received     = stats_flush_received;
1145   copy_updates_written    = stats_updates_written;
1146   copy_data_sets_written  = stats_data_sets_written;
1147   copy_journal_bytes      = stats_journal_bytes;
1148   copy_journal_rotate     = stats_journal_rotate;
1149   pthread_mutex_unlock (&stats_lock);
1150
1151   pthread_mutex_lock (&cache_lock);
1152   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1153   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1154   pthread_mutex_unlock (&cache_lock);
1155
1156   add_response_info(sock,
1157                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1158   add_response_info(sock,
1159                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1160   add_response_info(sock,
1161                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1162   add_response_info(sock,
1163                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1164   add_response_info(sock,
1165                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1166   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1167   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1168   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1169   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1170
1171   send_response(sock, RESP_OK, "Statistics follow\n");
1172
1173   return (0);
1174 } /* }}} int handle_request_stats */
1175
1176 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1177 {
1178   char *file, file_tmp[PATH_MAX];
1179   int status;
1180
1181   status = buffer_get_field (&buffer, &buffer_size, &file);
1182   if (status != 0)
1183   {
1184     return syntax_error(sock,cmd);
1185   }
1186   else
1187   {
1188     pthread_mutex_lock(&stats_lock);
1189     stats_flush_received++;
1190     pthread_mutex_unlock(&stats_lock);
1191
1192     get_abs_path(&file, file_tmp);
1193     if (!check_file_access(file, sock)) return 0;
1194
1195     status = flush_file (file);
1196     if (status == 0)
1197       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1198     else if (status == ENOENT)
1199     {
1200       /* no file in our tree; see whether it exists at all */
1201       struct stat statbuf;
1202
1203       memset(&statbuf, 0, sizeof(statbuf));
1204       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1205         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1206       else
1207         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1208     }
1209     else if (status < 0)
1210       return send_response(sock, RESP_ERR, "Internal error.\n");
1211     else
1212       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1213   }
1214
1215   /* NOTREACHED */
1216   assert(1==0);
1217 } /* }}} int handle_request_flush */
1218
1219 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1220 {
1221   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1222
1223   pthread_mutex_lock(&cache_lock);
1224   flush_old_values(-1);
1225   pthread_mutex_unlock(&cache_lock);
1226
1227   return send_response(sock, RESP_OK, "Started flush.\n");
1228 } /* }}} static int handle_request_flushall */
1229
1230 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1231 {
1232   int status;
1233   char *file, file_tmp[PATH_MAX];
1234   cache_item_t *ci;
1235
1236   status = buffer_get_field(&buffer, &buffer_size, &file);
1237   if (status != 0)
1238     return syntax_error(sock,cmd);
1239
1240   get_abs_path(&file, file_tmp);
1241
1242   pthread_mutex_lock(&cache_lock);
1243   ci = g_tree_lookup(cache_tree, file);
1244   if (ci == NULL)
1245   {
1246     pthread_mutex_unlock(&cache_lock);
1247     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1248   }
1249
1250   for (size_t i=0; i < ci->values_num; i++)
1251     add_response_info(sock, "%s\n", ci->values[i]);
1252
1253   pthread_mutex_unlock(&cache_lock);
1254   return send_response(sock, RESP_OK, "updates pending\n");
1255 } /* }}} static int handle_request_pending */
1256
1257 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1258 {
1259   int status;
1260   gboolean found;
1261   char *file, file_tmp[PATH_MAX];
1262
1263   status = buffer_get_field(&buffer, &buffer_size, &file);
1264   if (status != 0)
1265     return syntax_error(sock,cmd);
1266
1267   get_abs_path(&file, file_tmp);
1268   if (!check_file_access(file, sock)) return 0;
1269
1270   pthread_mutex_lock(&cache_lock);
1271   found = g_tree_remove(cache_tree, file);
1272   pthread_mutex_unlock(&cache_lock);
1273
1274   if (found == TRUE)
1275   {
1276     if (!JOURNAL_REPLAY(sock))
1277       journal_write("forget", file);
1278
1279     return send_response(sock, RESP_OK, "Gone!\n");
1280   }
1281   else
1282     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1283
1284   /* NOTREACHED */
1285   assert(1==0);
1286 } /* }}} static int handle_request_forget */
1287
1288 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1289 {
1290   cache_item_t *ci;
1291
1292   pthread_mutex_lock(&cache_lock);
1293
1294   ci = cache_queue_head;
1295   while (ci != NULL)
1296   {
1297     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1298     ci = ci->next;
1299   }
1300
1301   pthread_mutex_unlock(&cache_lock);
1302
1303   return send_response(sock, RESP_OK, "in queue.\n");
1304 } /* }}} int handle_request_queue */
1305
1306 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1307 {
1308   char *file, file_tmp[PATH_MAX];
1309   int values_num = 0;
1310   int status;
1311   char orig_buf[CMD_MAX];
1312
1313   cache_item_t *ci;
1314
1315   /* save it for the journal later */
1316   if (!JOURNAL_REPLAY(sock))
1317     strncpy(orig_buf, buffer, buffer_size);
1318
1319   status = buffer_get_field (&buffer, &buffer_size, &file);
1320   if (status != 0)
1321     return syntax_error(sock,cmd);
1322
1323   pthread_mutex_lock(&stats_lock);
1324   stats_updates_received++;
1325   pthread_mutex_unlock(&stats_lock);
1326
1327   get_abs_path(&file, file_tmp);
1328   if (!check_file_access(file, sock)) return 0;
1329
1330   pthread_mutex_lock (&cache_lock);
1331   ci = g_tree_lookup (cache_tree, file);
1332
1333   if (ci == NULL) /* {{{ */
1334   {
1335     struct stat statbuf;
1336     cache_item_t *tmp;
1337
1338     /* don't hold the lock while we setup; stat(2) might block */
1339     pthread_mutex_unlock(&cache_lock);
1340
1341     memset (&statbuf, 0, sizeof (statbuf));
1342     status = stat (file, &statbuf);
1343     if (status != 0)
1344     {
1345       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1346
1347       status = errno;
1348       if (status == ENOENT)
1349         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1350       else
1351         return send_response(sock, RESP_ERR,
1352                              "stat failed with error %i.\n", status);
1353     }
1354     if (!S_ISREG (statbuf.st_mode))
1355       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1356
1357     if (access(file, R_OK|W_OK) != 0)
1358       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1359                            file, rrd_strerror(errno));
1360
1361     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1362     if (ci == NULL)
1363     {
1364       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1365
1366       return send_response(sock, RESP_ERR, "malloc failed.\n");
1367     }
1368     memset (ci, 0, sizeof (cache_item_t));
1369
1370     ci->file = strdup (file);
1371     if (ci->file == NULL)
1372     {
1373       free (ci);
1374       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1375
1376       return send_response(sock, RESP_ERR, "strdup failed.\n");
1377     }
1378
1379     wipe_ci_values(ci, now);
1380     ci->flags = CI_FLAGS_IN_TREE;
1381     pthread_cond_init(&ci->flushed, NULL);
1382
1383     pthread_mutex_lock(&cache_lock);
1384
1385     /* another UPDATE might have added this entry in the meantime */
1386     tmp = g_tree_lookup (cache_tree, file);
1387     if (tmp == NULL)
1388       g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1389     else
1390     {
1391       free_cache_item (ci);
1392       ci = tmp;
1393     }
1394
1395     /* state may have changed while we were unlocked */
1396     if (state == SHUTDOWN)
1397       return -1;
1398   } /* }}} */
1399   assert (ci != NULL);
1400
1401   /* don't re-write updates in replay mode */
1402   if (!JOURNAL_REPLAY(sock))
1403     journal_write("update", orig_buf);
1404
1405   while (buffer_size > 0)
1406   {
1407     char *value;
1408     time_t stamp;
1409     char *eostamp;
1410
1411     status = buffer_get_field (&buffer, &buffer_size, &value);
1412     if (status != 0)
1413     {
1414       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1415       break;
1416     }
1417
1418     /* make sure update time is always moving forward */
1419     stamp = strtol(value, &eostamp, 10);
1420     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1421     {
1422       pthread_mutex_unlock(&cache_lock);
1423       return send_response(sock, RESP_ERR,
1424                            "Cannot find timestamp in '%s'!\n", value);
1425     }
1426     else if (stamp <= ci->last_update_stamp)
1427     {
1428       pthread_mutex_unlock(&cache_lock);
1429       return send_response(sock, RESP_ERR,
1430                            "illegal attempt to update using time %ld when last"
1431                            " update time is %ld (minimum one second step)\n",
1432                            stamp, ci->last_update_stamp);
1433     }
1434     else
1435       ci->last_update_stamp = stamp;
1436
1437     if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
1438     {
1439       RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1440       continue;
1441     }
1442
1443     values_num++;
1444   }
1445
1446   if (((now - ci->last_flush_time) >= config_write_interval)
1447       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1448       && (ci->values_num > 0))
1449   {
1450     enqueue_cache_item (ci, TAIL);
1451   }
1452
1453   pthread_mutex_unlock (&cache_lock);
1454
1455   if (values_num < 1)
1456     return send_response(sock, RESP_ERR, "No values updated.\n");
1457   else
1458     return send_response(sock, RESP_OK,
1459                          "errors, enqueued %i value(s).\n", values_num);
1460
1461   /* NOTREACHED */
1462   assert(1==0);
1463
1464 } /* }}} int handle_request_update */
1465
1466 static int handle_request_fetch (HANDLER_PROTO) /* {{{ */
1467 {
1468   char *file;
1469   char *cf;
1470
1471   char *start_str;
1472   char *end_str;
1473   rrd_time_value_t start_tv;
1474   rrd_time_value_t end_tv;
1475   time_t start_tm;
1476   time_t end_tm;
1477
1478   unsigned long step;
1479   unsigned long ds_cnt;
1480   char **ds_namv;
1481   rrd_value_t *data;
1482
1483   int status;
1484   unsigned long i;
1485   time_t t;
1486   rrd_value_t *data_ptr;
1487
1488   file = NULL;
1489   cf = NULL;
1490   start_str = NULL;
1491   end_str = NULL;
1492
1493   /* Read the arguments */
1494   do /* while (0) */
1495   {
1496     status = buffer_get_field (&buffer, &buffer_size, &file);
1497     if (status != 0)
1498       break;
1499
1500     status = buffer_get_field (&buffer, &buffer_size, &cf);
1501     if (status != 0)
1502       break;
1503
1504     status = buffer_get_field (&buffer, &buffer_size, &start_str);
1505     if (status != 0)
1506     {
1507       start_str = NULL;
1508       status = 0;
1509       break;
1510     }
1511
1512     status = buffer_get_field (&buffer, &buffer_size, &end_str);
1513     if (status != 0)
1514     {
1515       end_str = NULL;
1516       status = 0;
1517       break;
1518     }
1519   } while (0);
1520
1521   if (status != 0)
1522     return (syntax_error(sock,cmd));
1523
1524   status = flush_file (file);
1525   if ((status != 0) && (status != ENOENT))
1526     return (send_response (sock, RESP_ERR,
1527           "flush_file (%s) failed with status %i.\n", file, status));
1528
1529   /* Parse start time */
1530   if (start_str != NULL)
1531   {
1532     const char *errmsg;
1533
1534     errmsg = rrd_parsetime (start_str, &start_tv);
1535     if (errmsg != NULL)
1536       return (send_response(sock, RESP_ERR,
1537             "Cannot parse start time `%s': %s\n", start_str, errmsg));
1538   }
1539   else
1540     rrd_parsetime ("-86400", &start_tv);
1541
1542   /* Parse end time */
1543   if (end_str != NULL)
1544   {
1545     const char *errmsg;
1546
1547     errmsg = rrd_parsetime (end_str, &end_tv);
1548     if (errmsg != NULL)
1549       return (send_response(sock, RESP_ERR,
1550             "Cannot parse end time `%s': %s\n", end_str, errmsg));
1551   }
1552   else
1553     rrd_parsetime ("now", &end_tv);
1554
1555   start_tm = 0;
1556   end_tm = 0;
1557   status = rrd_proc_start_end (&start_tv, &end_tv, &start_tm, &end_tm);
1558   if (status != 0)
1559     return (send_response(sock, RESP_ERR,
1560           "rrd_proc_start_end failed: %s\n", rrd_get_error ()));
1561
1562   step = -1;
1563   ds_cnt = 0;
1564   ds_namv = NULL;
1565   data = NULL;
1566
1567   status = rrd_fetch_r (file, cf, &start_tm, &end_tm, &step,
1568       &ds_cnt, &ds_namv, &data);
1569   if (status != 0)
1570     return (send_response(sock, RESP_ERR,
1571           "rrd_fetch_r failed: %s\n", rrd_get_error ()));
1572
1573   add_response_info (sock, "FlushVersion: %lu\n", 1);
1574   add_response_info (sock, "Start: %lu\n", (unsigned long) start_tm);
1575   add_response_info (sock, "End: %lu\n", (unsigned long) end_tm);
1576   add_response_info (sock, "Step: %lu\n", step);
1577   add_response_info (sock, "DSCount: %lu\n", ds_cnt);
1578
1579 #define SSTRCAT(buffer,str,buffer_fill) do { \
1580     size_t str_len = strlen (str); \
1581     if ((buffer_fill + str_len) > sizeof (buffer)) \
1582       str_len = sizeof (buffer) - buffer_fill; \
1583     if (str_len > 0) { \
1584       strncpy (buffer + buffer_fill, str, str_len); \
1585       buffer_fill += str_len; \
1586       assert (buffer_fill <= sizeof (buffer)); \
1587       if (buffer_fill == sizeof (buffer)) \
1588         buffer[buffer_fill - 1] = 0; \
1589       else \
1590         buffer[buffer_fill] = 0; \
1591     } \
1592   } while (0)
1593
1594   { /* Add list of DS names */
1595     char linebuf[1024];
1596     size_t linebuf_fill;
1597
1598     memset (linebuf, 0, sizeof (linebuf));
1599     linebuf_fill = 0;
1600     for (i = 0; i < ds_cnt; i++)
1601     {
1602       if (i > 0)
1603         SSTRCAT (linebuf, " ", linebuf_fill);
1604       SSTRCAT (linebuf, ds_namv[i], linebuf_fill);
1605     }
1606     add_response_info (sock, "DSName: %s\n", linebuf);
1607   }
1608
1609   /* Add the actual data */
1610   assert (step > 0);
1611   data_ptr = data;
1612   for (t = start_tm + step; t <= end_tm; t += step)
1613   {
1614     char linebuf[1024];
1615     size_t linebuf_fill;
1616     char tmp[128];
1617
1618     memset (linebuf, 0, sizeof (linebuf));
1619     linebuf_fill = 0;
1620     for (i = 0; i < ds_cnt; i++)
1621     {
1622       snprintf (tmp, sizeof (tmp), " %0.10e", *data_ptr);
1623       tmp[sizeof (tmp) - 1] = 0;
1624       SSTRCAT (linebuf, tmp, linebuf_fill);
1625
1626       data_ptr++;
1627     }
1628
1629     add_response_info (sock, "%10lu:%s\n", (unsigned long) t, linebuf);
1630   } /* for (t) */
1631
1632   return (send_response (sock, RESP_OK, "Success\n"));
1633 #undef SSTRCAT
1634 } /* }}} int handle_request_fetch */
1635
1636 /* we came across a "WROTE" entry during journal replay.
1637  * throw away any values that we have accumulated for this file
1638  */
1639 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1640 {
1641   cache_item_t *ci;
1642   const char *file = buffer;
1643
1644   pthread_mutex_lock(&cache_lock);
1645
1646   ci = g_tree_lookup(cache_tree, file);
1647   if (ci == NULL)
1648   {
1649     pthread_mutex_unlock(&cache_lock);
1650     return (0);
1651   }
1652
1653   if (ci->values)
1654     rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1655
1656   wipe_ci_values(ci, now);
1657   remove_from_queue(ci);
1658
1659   pthread_mutex_unlock(&cache_lock);
1660   return (0);
1661 } /* }}} int handle_request_wrote */
1662
1663 /* start "BATCH" processing */
1664 static int batch_start (HANDLER_PROTO) /* {{{ */
1665 {
1666   int status;
1667   if (sock->batch_start)
1668     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1669
1670   status = send_response(sock, RESP_OK,
1671                          "Go ahead.  End with dot '.' on its own line.\n");
1672   sock->batch_start = time(NULL);
1673   sock->batch_cmd = 0;
1674
1675   return status;
1676 } /* }}} static int batch_start */
1677
1678 /* finish "BATCH" processing and return results to the client */
1679 static int batch_done (HANDLER_PROTO) /* {{{ */
1680 {
1681   assert(sock->batch_start);
1682   sock->batch_start = 0;
1683   sock->batch_cmd  = 0;
1684   return send_response(sock, RESP_OK, "errors\n");
1685 } /* }}} static int batch_done */
1686
1687 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1688 {
1689   return -1;
1690 } /* }}} static int handle_request_quit */
1691
1692 static command_t list_of_commands[] = { /* {{{ */
1693   {
1694     "UPDATE",
1695     handle_request_update,
1696     CMD_CONTEXT_ANY,
1697     "UPDATE <filename> <values> [<values> ...]\n"
1698     ,
1699     "Adds the given file to the internal cache if it is not yet known and\n"
1700     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1701     "for details.\n"
1702     "\n"
1703     "Each <values> has the following form:\n"
1704     "  <values> = <time>:<value>[:<value>[...]]\n"
1705     "See the rrdupdate(1) manpage for details.\n"
1706   },
1707   {
1708     "WROTE",
1709     handle_request_wrote,
1710     CMD_CONTEXT_JOURNAL,
1711     NULL,
1712     NULL
1713   },
1714   {
1715     "FLUSH",
1716     handle_request_flush,
1717     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1718     "FLUSH <filename>\n"
1719     ,
1720     "Adds the given filename to the head of the update queue and returns\n"
1721     "after it has been dequeued.\n"
1722   },
1723   {
1724     "FLUSHALL",
1725     handle_request_flushall,
1726     CMD_CONTEXT_CLIENT,
1727     "FLUSHALL\n"
1728     ,
1729     "Triggers writing of all pending updates.  Returns immediately.\n"
1730   },
1731   {
1732     "PENDING",
1733     handle_request_pending,
1734     CMD_CONTEXT_CLIENT,
1735     "PENDING <filename>\n"
1736     ,
1737     "Shows any 'pending' updates for a file, in order.\n"
1738     "The updates shown have not yet been written to the underlying RRD file.\n"
1739   },
1740   {
1741     "FORGET",
1742     handle_request_forget,
1743     CMD_CONTEXT_ANY,
1744     "FORGET <filename>\n"
1745     ,
1746     "Removes the file completely from the cache.\n"
1747     "Any pending updates for the file will be lost.\n"
1748   },
1749   {
1750     "QUEUE",
1751     handle_request_queue,
1752     CMD_CONTEXT_CLIENT,
1753     "QUEUE\n"
1754     ,
1755         "Shows all files in the output queue.\n"
1756     "The output is zero or more lines in the following format:\n"
1757     "(where <num_vals> is the number of values to be written)\n"
1758     "\n"
1759     "<num_vals> <filename>\n"
1760   },
1761   {
1762     "STATS",
1763     handle_request_stats,
1764     CMD_CONTEXT_CLIENT,
1765     "STATS\n"
1766     ,
1767     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1768     "a description of the values.\n"
1769   },
1770   {
1771     "HELP",
1772     handle_request_help,
1773     CMD_CONTEXT_CLIENT,
1774     "HELP [<command>]\n",
1775     NULL, /* special! */
1776   },
1777   {
1778     "BATCH",
1779     batch_start,
1780     CMD_CONTEXT_CLIENT,
1781     "BATCH\n"
1782     ,
1783     "The 'BATCH' command permits the client to initiate a bulk load\n"
1784     "   of commands to rrdcached.\n"
1785     "\n"
1786     "Usage:\n"
1787     "\n"
1788     "    client: BATCH\n"
1789     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1790     "    client: command #1\n"
1791     "    client: command #2\n"
1792     "    client: ... and so on\n"
1793     "    client: .\n"
1794     "    server: 2 errors\n"
1795     "    server: 7 message for command #7\n"
1796     "    server: 9 message for command #9\n"
1797     "\n"
1798     "For more information, consult the rrdcached(1) documentation.\n"
1799   },
1800   {
1801     ".",   /* BATCH terminator */
1802     batch_done,
1803     CMD_CONTEXT_BATCH,
1804     NULL,
1805     NULL
1806   },
1807   {
1808     "FETCH",
1809     handle_request_fetch,
1810     CMD_CONTEXT_CLIENT,
1811     "FETCH <file> <CF> [<start> [<end>]]\n"
1812     ,
1813     "The 'FETCH' can be used by the client to retrieve values from an RRD file.\n"
1814   },
1815   {
1816     "QUIT",
1817     handle_request_quit,
1818     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1819     "QUIT\n"
1820     ,
1821     "Disconnect from rrdcached.\n"
1822   }
1823 }; /* }}} command_t list_of_commands[] */
1824 static size_t list_of_commands_len = sizeof (list_of_commands)
1825   / sizeof (list_of_commands[0]);
1826
1827 static command_t *find_command(char *cmd)
1828 {
1829   size_t i;
1830
1831   for (i = 0; i < list_of_commands_len; i++)
1832     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1833       return (&list_of_commands[i]);
1834   return NULL;
1835 }
1836
1837 /* We currently use the index in the `list_of_commands' array as a bit position
1838  * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
1839  * outside these functions so that switching to a more elegant storage method
1840  * is easily possible. */
1841 static ssize_t find_command_index (const char *cmd) /* {{{ */
1842 {
1843   size_t i;
1844
1845   for (i = 0; i < list_of_commands_len; i++)
1846     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1847       return ((ssize_t) i);
1848   return (-1);
1849 } /* }}} ssize_t find_command_index */
1850
1851 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
1852     const char *cmd)
1853 {
1854   ssize_t i;
1855
1856   if (JOURNAL_REPLAY(sock))
1857     return (1);
1858
1859   if (cmd == NULL)
1860     return (-1);
1861
1862   if ((strcasecmp ("QUIT", cmd) == 0)
1863       || (strcasecmp ("HELP", cmd) == 0))
1864     return (1);
1865   else if (strcmp (".", cmd) == 0)
1866     cmd = "BATCH";
1867
1868   i = find_command_index (cmd);
1869   if (i < 0)
1870     return (-1);
1871   assert (i < 32);
1872
1873   if ((sock->permissions & (1 << i)) != 0)
1874     return (1);
1875   return (0);
1876 } /* }}} int socket_permission_check */
1877
1878 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
1879     const char *cmd)
1880 {
1881   ssize_t i;
1882
1883   i = find_command_index (cmd);
1884   if (i < 0)
1885     return (-1);
1886   assert (i < 32);
1887
1888   sock->permissions |= (1 << i);
1889   return (0);
1890 } /* }}} int socket_permission_add */
1891
1892 /* check whether commands are received in the expected context */
1893 static int command_check_context(listen_socket_t *sock, command_t *cmd)
1894 {
1895   if (JOURNAL_REPLAY(sock))
1896     return (cmd->context & CMD_CONTEXT_JOURNAL);
1897   else if (sock->batch_start)
1898     return (cmd->context & CMD_CONTEXT_BATCH);
1899   else
1900     return (cmd->context & CMD_CONTEXT_CLIENT);
1901
1902   /* NOTREACHED */
1903   assert(1==0);
1904 }
1905
1906 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1907 {
1908   int status;
1909   char *cmd_str;
1910   char *resp_txt;
1911   command_t *help = NULL;
1912
1913   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1914   if (status == 0)
1915     help = find_command(cmd_str);
1916
1917   if (help && (help->syntax || help->help))
1918   {
1919     char tmp[CMD_MAX];
1920
1921     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1922     resp_txt = tmp;
1923
1924     if (help->syntax)
1925       add_response_info(sock, "Usage: %s\n", help->syntax);
1926
1927     if (help->help)
1928       add_response_info(sock, "%s\n", help->help);
1929   }
1930   else
1931   {
1932     size_t i;
1933
1934     resp_txt = "Command overview\n";
1935
1936     for (i = 0; i < list_of_commands_len; i++)
1937     {
1938       if (list_of_commands[i].syntax == NULL)
1939         continue;
1940       add_response_info (sock, "%s", list_of_commands[i].syntax);
1941     }
1942   }
1943
1944   return send_response(sock, RESP_OK, resp_txt);
1945 } /* }}} int handle_request_help */
1946
1947 static int handle_request (DISPATCH_PROTO) /* {{{ */
1948 {
1949   char *buffer_ptr = buffer;
1950   char *cmd_str = NULL;
1951   command_t *cmd = NULL;
1952   int status;
1953
1954   assert (buffer[buffer_size - 1] == '\0');
1955
1956   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1957   if (status != 0)
1958   {
1959     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1960     return (-1);
1961   }
1962
1963   if (sock != NULL && sock->batch_start)
1964     sock->batch_cmd++;
1965
1966   cmd = find_command(cmd_str);
1967   if (!cmd)
1968     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1969
1970   if (!socket_permission_check (sock, cmd->cmd))
1971     return send_response(sock, RESP_ERR, "Permission denied.\n");
1972
1973   if (!command_check_context(sock, cmd))
1974     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1975
1976   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1977 } /* }}} int handle_request */
1978
1979 static void journal_set_free (journal_set *js) /* {{{ */
1980 {
1981   if (js == NULL)
1982     return;
1983
1984   rrd_free_ptrs((void ***) &js->files, &js->files_num);
1985
1986   free(js);
1987 } /* }}} journal_set_free */
1988
1989 static void journal_set_remove (journal_set *js) /* {{{ */
1990 {
1991   if (js == NULL)
1992     return;
1993
1994   for (uint i=0; i < js->files_num; i++)
1995   {
1996     RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
1997     unlink(js->files[i]);
1998   }
1999 } /* }}} journal_set_remove */
2000
2001 /* close current journal file handle.
2002  * MUST hold journal_lock before calling */
2003 static void journal_close(void) /* {{{ */
2004 {
2005   if (journal_fh != NULL)
2006   {
2007     if (fclose(journal_fh) != 0)
2008       RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
2009   }
2010
2011   journal_fh = NULL;
2012   journal_size = 0;
2013 } /* }}} journal_close */
2014
2015 /* MUST hold journal_lock before calling */
2016 static void journal_new_file(void) /* {{{ */
2017 {
2018   struct timeval now;
2019   int  new_fd;
2020   char new_file[PATH_MAX + 1];
2021
2022   assert(journal_dir != NULL);
2023   assert(journal_cur != NULL);
2024
2025   journal_close();
2026
2027   gettimeofday(&now, NULL);
2028   /* this format assures that the files sort in strcmp() order */
2029   snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
2030            journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
2031
2032   new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
2033                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
2034   if (new_fd < 0)
2035     goto error;
2036
2037   journal_fh = fdopen(new_fd, "a");
2038   if (journal_fh == NULL)
2039     goto error;
2040
2041   journal_size = ftell(journal_fh);
2042   RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
2043
2044   /* record the file in the journal set */
2045   rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
2046
2047   return;
2048
2049 error:
2050   RRDD_LOG(LOG_CRIT,
2051            "JOURNALING DISABLED: Error while trying to create %s : %s",
2052            new_file, rrd_strerror(errno));
2053   RRDD_LOG(LOG_CRIT,
2054            "JOURNALING DISABLED: All values will be flushed at shutdown");
2055
2056   close(new_fd);
2057   config_flush_at_shutdown = 1;
2058
2059 } /* }}} journal_new_file */
2060
2061 /* MUST NOT hold journal_lock before calling this */
2062 static void journal_rotate(void) /* {{{ */
2063 {
2064   journal_set *old_js = NULL;
2065
2066   if (journal_dir == NULL)
2067     return;
2068
2069   RRDD_LOG(LOG_DEBUG, "rotating journals");
2070
2071   pthread_mutex_lock(&stats_lock);
2072   ++stats_journal_rotate;
2073   pthread_mutex_unlock(&stats_lock);
2074
2075   pthread_mutex_lock(&journal_lock);
2076
2077   journal_close();
2078
2079   /* rotate the journal sets */
2080   old_js = journal_old;
2081   journal_old = journal_cur;
2082   journal_cur = calloc(1, sizeof(journal_set));
2083
2084   if (journal_cur != NULL)
2085     journal_new_file();
2086   else
2087     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2088
2089   pthread_mutex_unlock(&journal_lock);
2090
2091   journal_set_remove(old_js);
2092   journal_set_free  (old_js);
2093
2094 } /* }}} static void journal_rotate */
2095
2096 /* MUST hold journal_lock when calling */
2097 static void journal_done(void) /* {{{ */
2098 {
2099   if (journal_cur == NULL)
2100     return;
2101
2102   journal_close();
2103
2104   if (config_flush_at_shutdown)
2105   {
2106     RRDD_LOG(LOG_INFO, "removing journals");
2107     journal_set_remove(journal_old);
2108     journal_set_remove(journal_cur);
2109   }
2110   else
2111   {
2112     RRDD_LOG(LOG_INFO, "expedited shutdown; "
2113              "journals will be used at next startup");
2114   }
2115
2116   journal_set_free(journal_cur);
2117   journal_set_free(journal_old);
2118   free(journal_dir);
2119
2120 } /* }}} static void journal_done */
2121
2122 static int journal_write(char *cmd, char *args) /* {{{ */
2123 {
2124   int chars;
2125
2126   if (journal_fh == NULL)
2127     return 0;
2128
2129   pthread_mutex_lock(&journal_lock);
2130   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
2131   journal_size += chars;
2132
2133   if (journal_size > JOURNAL_MAX)
2134     journal_new_file();
2135
2136   pthread_mutex_unlock(&journal_lock);
2137
2138   if (chars > 0)
2139   {
2140     pthread_mutex_lock(&stats_lock);
2141     stats_journal_bytes += chars;
2142     pthread_mutex_unlock(&stats_lock);
2143   }
2144
2145   return chars;
2146 } /* }}} static int journal_write */
2147
2148 static int journal_replay (const char *file) /* {{{ */
2149 {
2150   FILE *fh;
2151   int entry_cnt = 0;
2152   int fail_cnt = 0;
2153   uint64_t line = 0;
2154   char entry[CMD_MAX];
2155   time_t now;
2156
2157   if (file == NULL) return 0;
2158
2159   {
2160     char *reason = "unknown error";
2161     int status = 0;
2162     struct stat statbuf;
2163
2164     memset(&statbuf, 0, sizeof(statbuf));
2165     if (stat(file, &statbuf) != 0)
2166     {
2167       reason = "stat error";
2168       status = errno;
2169     }
2170     else if (!S_ISREG(statbuf.st_mode))
2171     {
2172       reason = "not a regular file";
2173       status = EPERM;
2174     }
2175     if (statbuf.st_uid != daemon_uid)
2176     {
2177       reason = "not owned by daemon user";
2178       status = EACCES;
2179     }
2180     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2181     {
2182       reason = "must not be user/group writable";
2183       status = EACCES;
2184     }
2185
2186     if (status != 0)
2187     {
2188       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2189                file, rrd_strerror(status), reason);
2190       return 0;
2191     }
2192   }
2193
2194   fh = fopen(file, "r");
2195   if (fh == NULL)
2196   {
2197     if (errno != ENOENT)
2198       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2199                file, rrd_strerror(errno));
2200     return 0;
2201   }
2202   else
2203     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2204
2205   now = time(NULL);
2206
2207   while(!feof(fh))
2208   {
2209     size_t entry_len;
2210
2211     ++line;
2212     if (fgets(entry, sizeof(entry), fh) == NULL)
2213       break;
2214     entry_len = strlen(entry);
2215
2216     /* check \n termination in case journal writing crashed mid-line */
2217     if (entry_len == 0)
2218       continue;
2219     else if (entry[entry_len - 1] != '\n')
2220     {
2221       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2222       ++fail_cnt;
2223       continue;
2224     }
2225
2226     entry[entry_len - 1] = '\0';
2227
2228     if (handle_request(NULL, now, entry, entry_len) == 0)
2229       ++entry_cnt;
2230     else
2231       ++fail_cnt;
2232   }
2233
2234   fclose(fh);
2235
2236   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2237            entry_cnt, fail_cnt);
2238
2239   return entry_cnt > 0 ? 1 : 0;
2240 } /* }}} static int journal_replay */
2241
2242 static int journal_sort(const void *v1, const void *v2)
2243 {
2244   char **jn1 = (char **) v1;
2245   char **jn2 = (char **) v2;
2246
2247   return strcmp(*jn1,*jn2);
2248 }
2249
2250 static void journal_init(void) /* {{{ */
2251 {
2252   int had_journal = 0;
2253   DIR *dir;
2254   struct dirent *dent;
2255   char path[PATH_MAX+1];
2256
2257   if (journal_dir == NULL) return;
2258
2259   pthread_mutex_lock(&journal_lock);
2260
2261   journal_cur = calloc(1, sizeof(journal_set));
2262   if (journal_cur == NULL)
2263   {
2264     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2265     return;
2266   }
2267
2268   RRDD_LOG(LOG_INFO, "checking for journal files");
2269
2270   /* Handle old journal files during transition.  This gives them the
2271    * correct sort order.  TODO: remove after first release
2272    */
2273   {
2274     char old_path[PATH_MAX+1];
2275     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2276     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2277     rename(old_path, path);
2278
2279     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE        );
2280     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2281     rename(old_path, path);
2282   }
2283
2284   dir = opendir(journal_dir);
2285   while ((dent = readdir(dir)) != NULL)
2286   {
2287     /* looks like a journal file? */
2288     if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2289       continue;
2290
2291     snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2292
2293     if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2294     {
2295       RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2296                dent->d_name);
2297       break;
2298     }
2299   }
2300   closedir(dir);
2301
2302   qsort(journal_cur->files, journal_cur->files_num,
2303         sizeof(journal_cur->files[0]), journal_sort);
2304
2305   for (uint i=0; i < journal_cur->files_num; i++)
2306     had_journal += journal_replay(journal_cur->files[i]);
2307
2308   journal_new_file();
2309
2310   /* it must have been a crash.  start a flush */
2311   if (had_journal && config_flush_at_shutdown)
2312     flush_old_values(-1);
2313
2314   pthread_mutex_unlock(&journal_lock);
2315
2316   RRDD_LOG(LOG_INFO, "journal processing complete");
2317
2318 } /* }}} static void journal_init */
2319
2320 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2321 {
2322   assert(sock != NULL);
2323
2324   free(sock->rbuf);  sock->rbuf = NULL;
2325   free(sock->wbuf);  sock->wbuf = NULL;
2326   free(sock);
2327 } /* }}} void free_listen_socket */
2328
2329 static void close_connection(listen_socket_t *sock) /* {{{ */
2330 {
2331   if (sock->fd >= 0)
2332   {
2333     close(sock->fd);
2334     sock->fd = -1;
2335   }
2336
2337   free_listen_socket(sock);
2338
2339 } /* }}} void close_connection */
2340
2341 static void *connection_thread_main (void *args) /* {{{ */
2342 {
2343   listen_socket_t *sock;
2344   int fd;
2345
2346   sock = (listen_socket_t *) args;
2347   fd = sock->fd;
2348
2349   /* init read buffers */
2350   sock->next_read = sock->next_cmd = 0;
2351   sock->rbuf = malloc(RBUF_SIZE);
2352   if (sock->rbuf == NULL)
2353   {
2354     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2355     close_connection(sock);
2356     return NULL;
2357   }
2358
2359   pthread_mutex_lock (&connection_threads_lock);
2360   connection_threads_num++;
2361   pthread_mutex_unlock (&connection_threads_lock);
2362
2363   while (state == RUNNING)
2364   {
2365     char *cmd;
2366     ssize_t cmd_len;
2367     ssize_t rbytes;
2368     time_t now;
2369
2370     struct pollfd pollfd;
2371     int status;
2372
2373     pollfd.fd = fd;
2374     pollfd.events = POLLIN | POLLPRI;
2375     pollfd.revents = 0;
2376
2377     status = poll (&pollfd, 1, /* timeout = */ 500);
2378     if (state != RUNNING)
2379       break;
2380     else if (status == 0) /* timeout */
2381       continue;
2382     else if (status < 0) /* error */
2383     {
2384       status = errno;
2385       if (status != EINTR)
2386         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2387       continue;
2388     }
2389
2390     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2391       break;
2392     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2393     {
2394       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2395           "poll(2) returned something unexpected: %#04hx",
2396           pollfd.revents);
2397       break;
2398     }
2399
2400     rbytes = read(fd, sock->rbuf + sock->next_read,
2401                   RBUF_SIZE - sock->next_read);
2402     if (rbytes < 0)
2403     {
2404       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2405       break;
2406     }
2407     else if (rbytes == 0)
2408       break; /* eof */
2409
2410     sock->next_read += rbytes;
2411
2412     if (sock->batch_start)
2413       now = sock->batch_start;
2414     else
2415       now = time(NULL);
2416
2417     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2418     {
2419       status = handle_request (sock, now, cmd, cmd_len+1);
2420       if (status != 0)
2421         goto out_close;
2422     }
2423   }
2424
2425 out_close:
2426   close_connection(sock);
2427
2428   /* Remove this thread from the connection threads list */
2429   pthread_mutex_lock (&connection_threads_lock);
2430   connection_threads_num--;
2431   if (connection_threads_num <= 0)
2432     pthread_cond_broadcast(&connection_threads_done);
2433   pthread_mutex_unlock (&connection_threads_lock);
2434
2435   return (NULL);
2436 } /* }}} void *connection_thread_main */
2437
2438 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2439 {
2440   int fd;
2441   struct sockaddr_un sa;
2442   listen_socket_t *temp;
2443   int status;
2444   const char *path;
2445   char *path_copy, *dir;
2446
2447   path = sock->addr;
2448   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2449     path += strlen("unix:");
2450
2451   /* dirname may modify its argument */
2452   path_copy = strdup(path);
2453   if (path_copy == NULL)
2454   {
2455     fprintf(stderr, "rrdcached: strdup(): %s\n",
2456         rrd_strerror(errno));
2457     return (-1);
2458   }
2459
2460   dir = dirname(path_copy);
2461   if (rrd_mkdir_p(dir, 0777) != 0)
2462   {
2463     fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2464         dir, rrd_strerror(errno));
2465     return (-1);
2466   }
2467
2468   free(path_copy);
2469
2470   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2471       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2472   if (temp == NULL)
2473   {
2474     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2475     return (-1);
2476   }
2477   listen_fds = temp;
2478   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2479
2480   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2481   if (fd < 0)
2482   {
2483     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2484              rrd_strerror(errno));
2485     return (-1);
2486   }
2487
2488   memset (&sa, 0, sizeof (sa));
2489   sa.sun_family = AF_UNIX;
2490   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2491
2492   /* if we've gotten this far, we own the pid file.  any daemon started
2493    * with the same args must not be alive.  therefore, ensure that we can
2494    * create the socket...
2495    */
2496   unlink(path);
2497
2498   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2499   if (status != 0)
2500   {
2501     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2502              path, rrd_strerror(errno));
2503     close (fd);
2504     return (-1);
2505   }
2506
2507   status = listen (fd, /* backlog = */ 10);
2508   if (status != 0)
2509   {
2510     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2511              path, rrd_strerror(errno));
2512     close (fd);
2513     unlink (path);
2514     return (-1);
2515   }
2516
2517   listen_fds[listen_fds_num].fd = fd;
2518   listen_fds[listen_fds_num].family = PF_UNIX;
2519   strncpy(listen_fds[listen_fds_num].addr, path,
2520           sizeof (listen_fds[listen_fds_num].addr) - 1);
2521   listen_fds_num++;
2522
2523   return (0);
2524 } /* }}} int open_listen_socket_unix */
2525
2526 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2527 {
2528   struct addrinfo ai_hints;
2529   struct addrinfo *ai_res;
2530   struct addrinfo *ai_ptr;
2531   char addr_copy[NI_MAXHOST];
2532   char *addr;
2533   char *port;
2534   int status;
2535
2536   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2537   addr_copy[sizeof (addr_copy) - 1] = 0;
2538   addr = addr_copy;
2539
2540   memset (&ai_hints, 0, sizeof (ai_hints));
2541   ai_hints.ai_flags = 0;
2542 #ifdef AI_ADDRCONFIG
2543   ai_hints.ai_flags |= AI_ADDRCONFIG;
2544 #endif
2545   ai_hints.ai_family = AF_UNSPEC;
2546   ai_hints.ai_socktype = SOCK_STREAM;
2547
2548   port = NULL;
2549   if (*addr == '[') /* IPv6+port format */
2550   {
2551     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2552     addr++;
2553
2554     port = strchr (addr, ']');
2555     if (port == NULL)
2556     {
2557       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2558       return (-1);
2559     }
2560     *port = 0;
2561     port++;
2562
2563     if (*port == ':')
2564       port++;
2565     else if (*port == 0)
2566       port = NULL;
2567     else
2568     {
2569       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2570       return (-1);
2571     }
2572   } /* if (*addr == '[') */
2573   else
2574   {
2575     port = rindex(addr, ':');
2576     if (port != NULL)
2577     {
2578       *port = 0;
2579       port++;
2580     }
2581   }
2582   ai_res = NULL;
2583   status = getaddrinfo (addr,
2584                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2585                         &ai_hints, &ai_res);
2586   if (status != 0)
2587   {
2588     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2589              addr, gai_strerror (status));
2590     return (-1);
2591   }
2592
2593   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2594   {
2595     int fd;
2596     listen_socket_t *temp;
2597     int one = 1;
2598
2599     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2600         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2601     if (temp == NULL)
2602     {
2603       fprintf (stderr,
2604                "rrdcached: open_listen_socket_network: realloc failed.\n");
2605       continue;
2606     }
2607     listen_fds = temp;
2608     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2609
2610     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2611     if (fd < 0)
2612     {
2613       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2614                rrd_strerror(errno));
2615       continue;
2616     }
2617
2618     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2619
2620     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2621     if (status != 0)
2622     {
2623       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2624                sock->addr, rrd_strerror(errno));
2625       close (fd);
2626       continue;
2627     }
2628
2629     status = listen (fd, /* backlog = */ 10);
2630     if (status != 0)
2631     {
2632       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2633                sock->addr, rrd_strerror(errno));
2634       close (fd);
2635       freeaddrinfo(ai_res);
2636       return (-1);
2637     }
2638
2639     listen_fds[listen_fds_num].fd = fd;
2640     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2641     listen_fds_num++;
2642   } /* for (ai_ptr) */
2643
2644   freeaddrinfo(ai_res);
2645   return (0);
2646 } /* }}} static int open_listen_socket_network */
2647
2648 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2649 {
2650   assert(sock != NULL);
2651   assert(sock->addr != NULL);
2652
2653   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2654       || sock->addr[0] == '/')
2655     return (open_listen_socket_unix(sock));
2656   else
2657     return (open_listen_socket_network(sock));
2658 } /* }}} int open_listen_socket */
2659
2660 static int close_listen_sockets (void) /* {{{ */
2661 {
2662   size_t i;
2663
2664   for (i = 0; i < listen_fds_num; i++)
2665   {
2666     close (listen_fds[i].fd);
2667
2668     if (listen_fds[i].family == PF_UNIX)
2669       unlink(listen_fds[i].addr);
2670   }
2671
2672   free (listen_fds);
2673   listen_fds = NULL;
2674   listen_fds_num = 0;
2675
2676   return (0);
2677 } /* }}} int close_listen_sockets */
2678
2679 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2680 {
2681   struct pollfd *pollfds;
2682   int pollfds_num;
2683   int status;
2684   int i;
2685
2686   if (listen_fds_num < 1)
2687   {
2688     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2689     return (NULL);
2690   }
2691
2692   pollfds_num = listen_fds_num;
2693   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2694   if (pollfds == NULL)
2695   {
2696     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2697     return (NULL);
2698   }
2699   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2700
2701   RRDD_LOG(LOG_INFO, "listening for connections");
2702
2703   while (state == RUNNING)
2704   {
2705     for (i = 0; i < pollfds_num; i++)
2706     {
2707       pollfds[i].fd = listen_fds[i].fd;
2708       pollfds[i].events = POLLIN | POLLPRI;
2709       pollfds[i].revents = 0;
2710     }
2711
2712     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2713     if (state != RUNNING)
2714       break;
2715     else if (status == 0) /* timeout */
2716       continue;
2717     else if (status < 0) /* error */
2718     {
2719       status = errno;
2720       if (status != EINTR)
2721       {
2722         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2723       }
2724       continue;
2725     }
2726
2727     for (i = 0; i < pollfds_num; i++)
2728     {
2729       listen_socket_t *client_sock;
2730       struct sockaddr_storage client_sa;
2731       socklen_t client_sa_size;
2732       pthread_t tid;
2733       pthread_attr_t attr;
2734
2735       if (pollfds[i].revents == 0)
2736         continue;
2737
2738       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2739       {
2740         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2741             "poll(2) returned something unexpected for listen FD #%i.",
2742             pollfds[i].fd);
2743         continue;
2744       }
2745
2746       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2747       if (client_sock == NULL)
2748       {
2749         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2750         continue;
2751       }
2752       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2753
2754       client_sa_size = sizeof (client_sa);
2755       client_sock->fd = accept (pollfds[i].fd,
2756           (struct sockaddr *) &client_sa, &client_sa_size);
2757       if (client_sock->fd < 0)
2758       {
2759         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2760         free(client_sock);
2761         continue;
2762       }
2763
2764       pthread_attr_init (&attr);
2765       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2766
2767       status = pthread_create (&tid, &attr, connection_thread_main,
2768                                client_sock);
2769       if (status != 0)
2770       {
2771         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2772         close_connection(client_sock);
2773         continue;
2774       }
2775     } /* for (pollfds_num) */
2776   } /* while (state == RUNNING) */
2777
2778   RRDD_LOG(LOG_INFO, "starting shutdown");
2779
2780   close_listen_sockets ();
2781
2782   pthread_mutex_lock (&connection_threads_lock);
2783   while (connection_threads_num > 0)
2784     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2785   pthread_mutex_unlock (&connection_threads_lock);
2786
2787   free(pollfds);
2788
2789   return (NULL);
2790 } /* }}} void *listen_thread_main */
2791
2792 static int daemonize (void) /* {{{ */
2793 {
2794   int pid_fd;
2795   char *base_dir;
2796
2797   daemon_uid = geteuid();
2798
2799   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2800   if (pid_fd < 0)
2801     pid_fd = check_pidfile();
2802   if (pid_fd < 0)
2803     return pid_fd;
2804
2805   /* open all the listen sockets */
2806   if (config_listen_address_list_len > 0)
2807   {
2808     for (size_t i = 0; i < config_listen_address_list_len; i++)
2809       open_listen_socket (config_listen_address_list[i]);
2810
2811     rrd_free_ptrs((void ***) &config_listen_address_list,
2812                   &config_listen_address_list_len);
2813   }
2814   else
2815   {
2816     listen_socket_t sock;
2817     memset(&sock, 0, sizeof(sock));
2818     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2819     open_listen_socket (&sock);
2820   }
2821
2822   if (listen_fds_num < 1)
2823   {
2824     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2825     goto error;
2826   }
2827
2828   if (!stay_foreground)
2829   {
2830     pid_t child;
2831
2832     child = fork ();
2833     if (child < 0)
2834     {
2835       fprintf (stderr, "daemonize: fork(2) failed.\n");
2836       goto error;
2837     }
2838     else if (child > 0)
2839       exit(0);
2840
2841     /* Become session leader */
2842     setsid ();
2843
2844     /* Open the first three file descriptors to /dev/null */
2845     close (2);
2846     close (1);
2847     close (0);
2848
2849     open ("/dev/null", O_RDWR);
2850     if (dup(0) == -1 || dup(0) == -1){
2851         RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2852     }
2853   } /* if (!stay_foreground) */
2854
2855   /* Change into the /tmp directory. */
2856   base_dir = (config_base_dir != NULL)
2857     ? config_base_dir
2858     : "/tmp";
2859
2860   if (chdir (base_dir) != 0)
2861   {
2862     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2863     goto error;
2864   }
2865
2866   install_signal_handlers();
2867
2868   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2869   RRDD_LOG(LOG_INFO, "starting up");
2870
2871   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2872                                 (GDestroyNotify) free_cache_item);
2873   if (cache_tree == NULL)
2874   {
2875     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2876     goto error;
2877   }
2878
2879   return write_pidfile (pid_fd);
2880
2881 error:
2882   remove_pidfile();
2883   return -1;
2884 } /* }}} int daemonize */
2885
2886 static int cleanup (void) /* {{{ */
2887 {
2888   pthread_cond_broadcast (&flush_cond);
2889   pthread_join (flush_thread, NULL);
2890
2891   pthread_cond_broadcast (&queue_cond);
2892   for (int i = 0; i < config_queue_threads; i++)
2893     pthread_join (queue_threads[i], NULL);
2894
2895   if (config_flush_at_shutdown)
2896   {
2897     assert(cache_queue_head == NULL);
2898     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2899   }
2900
2901   free(queue_threads);
2902   free(config_base_dir);
2903
2904   pthread_mutex_lock(&cache_lock);
2905   g_tree_destroy(cache_tree);
2906
2907   pthread_mutex_lock(&journal_lock);
2908   journal_done();
2909
2910   RRDD_LOG(LOG_INFO, "goodbye");
2911   closelog ();
2912
2913   remove_pidfile ();
2914   free(config_pid_file);
2915
2916   return (0);
2917 } /* }}} int cleanup */
2918
2919 static int read_options (int argc, char **argv) /* {{{ */
2920 {
2921   int option;
2922   int status = 0;
2923
2924   char **permissions = NULL;
2925   size_t permissions_len = 0;
2926
2927   while ((option = getopt(argc, argv, "gl:P:f:w:z:t:Bb:p:Fj:h?")) != -1)
2928   {
2929     switch (option)
2930     {
2931       case 'g':
2932         stay_foreground=1;
2933         break;
2934
2935       case 'l':
2936       {
2937         listen_socket_t *new;
2938
2939         new = malloc(sizeof(listen_socket_t));
2940         if (new == NULL)
2941         {
2942           fprintf(stderr, "read_options: malloc failed.\n");
2943           return(2);
2944         }
2945         memset(new, 0, sizeof(listen_socket_t));
2946
2947         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2948
2949         /* Add permissions to the socket {{{ */
2950         if (permissions_len != 0)
2951         {
2952           size_t i;
2953           for (i = 0; i < permissions_len; i++)
2954           {
2955             status = socket_permission_add (new, permissions[i]);
2956             if (status != 0)
2957             {
2958               fprintf (stderr, "read_options: Adding permission \"%s\" to "
2959                   "socket failed. Most likely, this permission doesn't "
2960                   "exist. Check your command line.\n", permissions[i]);
2961               status = 4;
2962             }
2963           }
2964         }
2965         else /* if (permissions_len == 0) */
2966         {
2967           /* Add permission for ALL commands to the socket. */
2968           size_t i;
2969           for (i = 0; i < list_of_commands_len; i++)
2970           {
2971             status = socket_permission_add (new, list_of_commands[i].cmd);
2972             if (status != 0)
2973             {
2974               fprintf (stderr, "read_options: Adding permission \"%s\" to "
2975                   "socket failed. This should never happen, ever! Sorry.\n",
2976                   permissions[i]);
2977               status = 4;
2978             }
2979           }
2980         }
2981         /* }}} Done adding permissions. */
2982
2983         if (!rrd_add_ptr((void ***)&config_listen_address_list,
2984                          &config_listen_address_list_len, new))
2985         {
2986           fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2987           return (2);
2988         }
2989       }
2990       break;
2991
2992       case 'P':
2993       {
2994         char *optcopy;
2995         char *saveptr;
2996         char *dummy;
2997         char *ptr;
2998
2999         rrd_free_ptrs ((void *) &permissions, &permissions_len);
3000
3001         optcopy = strdup (optarg);
3002         dummy = optcopy;
3003         saveptr = NULL;
3004         while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
3005         {
3006           dummy = NULL;
3007           rrd_add_strdup ((void *) &permissions, &permissions_len, ptr);
3008         }
3009
3010         free (optcopy);
3011       }
3012       break;
3013
3014       case 'f':
3015       {
3016         int temp;
3017
3018         temp = atoi (optarg);
3019         if (temp > 0)
3020           config_flush_interval = temp;
3021         else
3022         {
3023           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
3024           status = 3;
3025         }
3026       }
3027       break;
3028
3029       case 'w':
3030       {
3031         int temp;
3032
3033         temp = atoi (optarg);
3034         if (temp > 0)
3035           config_write_interval = temp;
3036         else
3037         {
3038           fprintf (stderr, "Invalid write interval: %s\n", optarg);
3039           status = 2;
3040         }
3041       }
3042       break;
3043
3044       case 'z':
3045       {
3046         int temp;
3047
3048         temp = atoi(optarg);
3049         if (temp > 0)
3050           config_write_jitter = temp;
3051         else
3052         {
3053           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
3054           status = 2;
3055         }
3056
3057         break;
3058       }
3059
3060       case 't':
3061       {
3062         int threads;
3063         threads = atoi(optarg);
3064         if (threads >= 1)
3065           config_queue_threads = threads;
3066         else
3067         {
3068           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
3069           return 1;
3070         }
3071       }
3072       break;
3073
3074       case 'B':
3075         config_write_base_only = 1;
3076         break;
3077
3078       case 'b':
3079       {
3080         size_t len;
3081         char base_realpath[PATH_MAX];
3082
3083         if (config_base_dir != NULL)
3084           free (config_base_dir);
3085         config_base_dir = strdup (optarg);
3086         if (config_base_dir == NULL)
3087         {
3088           fprintf (stderr, "read_options: strdup failed.\n");
3089           return (3);
3090         }
3091
3092         if (rrd_mkdir_p (config_base_dir, 0777) != 0)
3093         {
3094           fprintf (stderr, "Failed to create base directory '%s': %s\n",
3095               config_base_dir, rrd_strerror (errno));
3096           return (3);
3097         }
3098
3099         /* make sure that the base directory is not resolved via
3100          * symbolic links.  this makes some performance-enhancing
3101          * assumptions possible (we don't have to resolve paths
3102          * that start with a "/")
3103          */
3104         if (realpath(config_base_dir, base_realpath) == NULL)
3105         {
3106           fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3107               "%s\n", config_base_dir, rrd_strerror(errno));
3108           return 5;
3109         }
3110
3111         len = strlen (config_base_dir);
3112         while ((len > 0) && (config_base_dir[len - 1] == '/'))
3113         {
3114           config_base_dir[len - 1] = 0;
3115           len--;
3116         }
3117
3118         if (len < 1)
3119         {
3120           fprintf (stderr, "Invalid base directory: %s\n", optarg);
3121           return (4);
3122         }
3123
3124         _config_base_dir_len = len;
3125
3126         len = strlen (base_realpath);
3127         while ((len > 0) && (base_realpath[len - 1] == '/'))
3128         {
3129           base_realpath[len - 1] = '\0';
3130           len--;
3131         }
3132
3133         if (strncmp(config_base_dir,
3134                          base_realpath, sizeof(base_realpath)) != 0)
3135         {
3136           fprintf(stderr,
3137                   "Base directory (-b) resolved via file system links!\n"
3138                   "Please consult rrdcached '-b' documentation!\n"
3139                   "Consider specifying the real directory (%s)\n",
3140                   base_realpath);
3141           return 5;
3142         }
3143       }
3144       break;
3145
3146       case 'p':
3147       {
3148         if (config_pid_file != NULL)
3149           free (config_pid_file);
3150         config_pid_file = strdup (optarg);
3151         if (config_pid_file == NULL)
3152         {
3153           fprintf (stderr, "read_options: strdup failed.\n");
3154           return (3);
3155         }
3156       }
3157       break;
3158
3159       case 'F':
3160         config_flush_at_shutdown = 1;
3161         break;
3162
3163       case 'j':
3164       {
3165         const char *dir = journal_dir = strdup(optarg);
3166
3167         status = rrd_mkdir_p(dir, 0777);
3168         if (status != 0)
3169         {
3170           fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3171               dir, rrd_strerror(errno));
3172           return 6;
3173         }
3174
3175         if (access(dir, R_OK|W_OK|X_OK) != 0)
3176         {
3177           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3178                   errno ? rrd_strerror(errno) : "");
3179           return 6;
3180         }
3181       }
3182       break;
3183
3184       case 'h':
3185       case '?':
3186         printf ("RRDCacheD %s\n"
3187             "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3188             "\n"
3189             "Usage: rrdcached [options]\n"
3190             "\n"
3191             "Valid options are:\n"
3192             "  -l <address>  Socket address to listen to.\n"
3193             "  -P <perms>    Sets the permissions to assign to all following "
3194                             "sockets\n"
3195             "  -w <seconds>  Interval in which to write data.\n"
3196             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
3197             "  -t <threads>  Number of write threads.\n"
3198             "  -f <seconds>  Interval in which to flush dead data.\n"
3199             "  -p <file>     Location of the PID-file.\n"
3200             "  -b <dir>      Base directory to change to.\n"
3201             "  -B            Restrict file access to paths within -b <dir>\n"
3202             "  -g            Do not fork and run in the foreground.\n"
3203             "  -j <dir>      Directory in which to create the journal files.\n"
3204             "  -F            Always flush all updates at shutdown\n"
3205             "\n"
3206             "For more information and a detailed description of all options "
3207             "please refer\n"
3208             "to the rrdcached(1) manual page.\n",
3209             VERSION);
3210         status = -1;
3211         break;
3212     } /* switch (option) */
3213   } /* while (getopt) */
3214
3215   /* advise the user when values are not sane */
3216   if (config_flush_interval < 2 * config_write_interval)
3217     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3218             " 2x write interval (-w) !\n");
3219   if (config_write_jitter > config_write_interval)
3220     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3221             " write interval (-w) !\n");
3222
3223   if (config_write_base_only && config_base_dir == NULL)
3224     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3225             "  Consult the rrdcached documentation\n");
3226
3227   if (journal_dir == NULL)
3228     config_flush_at_shutdown = 1;
3229
3230   rrd_free_ptrs ((void *) &permissions, &permissions_len);
3231
3232   return (status);
3233 } /* }}} int read_options */
3234
3235 int main (int argc, char **argv)
3236 {
3237   int status;
3238
3239   status = read_options (argc, argv);
3240   if (status != 0)
3241   {
3242     if (status < 0)
3243       status = 0;
3244     return (status);
3245   }
3246
3247   status = daemonize ();
3248   if (status != 0)
3249   {
3250     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3251     return (1);
3252   }
3253
3254   journal_init();
3255
3256   /* start the queue threads */
3257   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3258   if (queue_threads == NULL)
3259   {
3260     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3261     cleanup();
3262     return (1);
3263   }
3264   for (int i = 0; i < config_queue_threads; i++)
3265   {
3266     memset (&queue_threads[i], 0, sizeof (*queue_threads));
3267     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3268     if (status != 0)
3269     {
3270       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3271       cleanup();
3272       return (1);
3273     }
3274   }
3275
3276   /* start the flush thread */
3277   memset(&flush_thread, 0, sizeof(flush_thread));
3278   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3279   if (status != 0)
3280   {
3281     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3282     cleanup();
3283     return (1);
3284   }
3285
3286   listen_thread_main (NULL);
3287   cleanup ();
3288
3289   return (0);
3290 } /* int main */
3291
3292 /*
3293  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3294  */