clarify rrd_daemon code with JOURNAL_REPLAY macro -- kevin
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008,2009 Florian octo Forster
4  * Copyright (C) 2008,2009 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
73
74 #include "rrd.h"
75 #include "rrd_client.h"
76
77 #include <stdlib.h>
78
79 #ifndef WIN32
80 #ifdef HAVE_STDINT_H
81 #  include <stdint.h>
82 #endif
83 #include <unistd.h>
84 #include <strings.h>
85 #include <inttypes.h>
86 #include <sys/socket.h>
87
88 #else
89
90 #endif
91 #include <stdio.h>
92 #include <string.h>
93
94 #include <sys/types.h>
95 #include <sys/stat.h>
96 #include <dirent.h>
97 #include <fcntl.h>
98 #include <signal.h>
99 #include <sys/un.h>
100 #include <netdb.h>
101 #include <poll.h>
102 #include <syslog.h>
103 #include <pthread.h>
104 #include <errno.h>
105 #include <assert.h>
106 #include <sys/time.h>
107 #include <time.h>
108 #include <libgen.h>
109
110 #include <glib-2.0/glib.h>
111 /* }}} */
112
113 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
114
115 #ifndef __GNUC__
116 # define __attribute__(x) /**/
117 #endif
118
119 /*
120  * Types
121  */
122 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
123
124 struct listen_socket_s
125 {
126   int fd;
127   char addr[PATH_MAX + 1];
128   int family;
129
130   /* state for BATCH processing */
131   time_t batch_start;
132   int batch_cmd;
133
134   /* buffered IO */
135   char *rbuf;
136   off_t next_cmd;
137   off_t next_read;
138
139   char *wbuf;
140   ssize_t wbuf_len;
141
142   uint32_t permissions;
143 };
144 typedef struct listen_socket_s listen_socket_t;
145
146 struct command_s;
147 typedef struct command_s command_t;
148 /* note: guard against "unused" warnings in the handlers */
149 #define DISPATCH_PROTO  listen_socket_t *sock   __attribute__((unused)),\
150                         time_t now              __attribute__((unused)),\
151                         char  *buffer           __attribute__((unused)),\
152                         size_t buffer_size      __attribute__((unused))
153
154 #define HANDLER_PROTO   command_t *cmd          __attribute__((unused)),\
155                         DISPATCH_PROTO
156
157 struct command_s {
158   char   *cmd;
159   int (*handler)(HANDLER_PROTO);
160
161   char  context;                /* where we expect to see it */
162 #define CMD_CONTEXT_CLIENT      (1<<0)
163 #define CMD_CONTEXT_BATCH       (1<<1)
164 #define CMD_CONTEXT_JOURNAL     (1<<2)
165 #define CMD_CONTEXT_ANY         (0x7f)
166
167   char *syntax;
168   char *help;
169 };
170
171 struct cache_item_s;
172 typedef struct cache_item_s cache_item_t;
173 struct cache_item_s
174 {
175   char *file;
176   char **values;
177   size_t values_num;
178   time_t last_flush_time;
179   time_t last_update_stamp;
180 #define CI_FLAGS_IN_TREE  (1<<0)
181 #define CI_FLAGS_IN_QUEUE (1<<1)
182   int flags;
183   pthread_cond_t  flushed;
184   cache_item_t *prev;
185   cache_item_t *next;
186 };
187
188 struct callback_flush_data_s
189 {
190   time_t now;
191   time_t abs_timeout;
192   char **keys;
193   size_t keys_num;
194 };
195 typedef struct callback_flush_data_s callback_flush_data_t;
196
197 enum queue_side_e
198 {
199   HEAD,
200   TAIL
201 };
202 typedef enum queue_side_e queue_side_t;
203
204 /* describe a set of journal files */
205 typedef struct {
206   char **files;
207   size_t files_num;
208 } journal_set;
209
210 /* max length of socket command or response */
211 #define CMD_MAX 4096
212 #define RBUF_SIZE (CMD_MAX*2)
213
214 /*
215  * Variables
216  */
217 static int stay_foreground = 0;
218 static uid_t daemon_uid;
219
220 static listen_socket_t *listen_fds = NULL;
221 static size_t listen_fds_num = 0;
222
223 enum {
224   RUNNING,              /* normal operation */
225   FLUSHING,             /* flushing remaining values */
226   SHUTDOWN              /* shutting down */
227 } state = RUNNING;
228
229 static pthread_t *queue_threads;
230 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
231 static int config_queue_threads = 4;
232
233 static pthread_t flush_thread;
234 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
235
236 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
237 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
238 static int connection_threads_num = 0;
239
240 /* Cache stuff */
241 static GTree          *cache_tree = NULL;
242 static cache_item_t   *cache_queue_head = NULL;
243 static cache_item_t   *cache_queue_tail = NULL;
244 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
245
246 static int config_write_interval = 300;
247 static int config_write_jitter   = 0;
248 static int config_flush_interval = 3600;
249 static int config_flush_at_shutdown = 0;
250 static char *config_pid_file = NULL;
251 static char *config_base_dir = NULL;
252 static size_t _config_base_dir_len = 0;
253 static int config_write_base_only = 0;
254
255 static listen_socket_t **config_listen_address_list = NULL;
256 static size_t config_listen_address_list_len = 0;
257
258 static uint64_t stats_queue_length = 0;
259 static uint64_t stats_updates_received = 0;
260 static uint64_t stats_flush_received = 0;
261 static uint64_t stats_updates_written = 0;
262 static uint64_t stats_data_sets_written = 0;
263 static uint64_t stats_journal_bytes = 0;
264 static uint64_t stats_journal_rotate = 0;
265 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
266
267 /* Journaled updates */
268 #define JOURNAL_REPLAY(s) ((s) == NULL)
269 #define JOURNAL_BASE "rrd.journal"
270 static journal_set *journal_cur = NULL;
271 static journal_set *journal_old = NULL;
272 static char *journal_dir = NULL;
273 static FILE *journal_fh = NULL;         /* current journal file handle */
274 static long  journal_size = 0;          /* current journal size */
275 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
276 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
277 static int journal_write(char *cmd, char *args);
278 static void journal_done(void);
279 static void journal_rotate(void);
280
281 /* prototypes for forward refernces */
282 static int handle_request_help (HANDLER_PROTO);
283
284 /* 
285  * Functions
286  */
287 static void sig_common (const char *sig) /* {{{ */
288 {
289   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
290   state = FLUSHING;
291   pthread_cond_broadcast(&flush_cond);
292   pthread_cond_broadcast(&queue_cond);
293 } /* }}} void sig_common */
294
295 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
296 {
297   sig_common("INT");
298 } /* }}} void sig_int_handler */
299
300 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
301 {
302   sig_common("TERM");
303 } /* }}} void sig_term_handler */
304
305 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
306 {
307   config_flush_at_shutdown = 1;
308   sig_common("USR1");
309 } /* }}} void sig_usr1_handler */
310
311 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
312 {
313   config_flush_at_shutdown = 0;
314   sig_common("USR2");
315 } /* }}} void sig_usr2_handler */
316
317 static void install_signal_handlers(void) /* {{{ */
318 {
319   /* These structures are static, because `sigaction' behaves weird if the are
320    * overwritten.. */
321   static struct sigaction sa_int;
322   static struct sigaction sa_term;
323   static struct sigaction sa_pipe;
324   static struct sigaction sa_usr1;
325   static struct sigaction sa_usr2;
326
327   /* Install signal handlers */
328   memset (&sa_int, 0, sizeof (sa_int));
329   sa_int.sa_handler = sig_int_handler;
330   sigaction (SIGINT, &sa_int, NULL);
331
332   memset (&sa_term, 0, sizeof (sa_term));
333   sa_term.sa_handler = sig_term_handler;
334   sigaction (SIGTERM, &sa_term, NULL);
335
336   memset (&sa_pipe, 0, sizeof (sa_pipe));
337   sa_pipe.sa_handler = SIG_IGN;
338   sigaction (SIGPIPE, &sa_pipe, NULL);
339
340   memset (&sa_pipe, 0, sizeof (sa_usr1));
341   sa_usr1.sa_handler = sig_usr1_handler;
342   sigaction (SIGUSR1, &sa_usr1, NULL);
343
344   memset (&sa_usr2, 0, sizeof (sa_usr2));
345   sa_usr2.sa_handler = sig_usr2_handler;
346   sigaction (SIGUSR2, &sa_usr2, NULL);
347
348 } /* }}} void install_signal_handlers */
349
350 static int open_pidfile(char *action, int oflag) /* {{{ */
351 {
352   int fd;
353   const char *file;
354   char *file_copy, *dir;
355
356   file = (config_pid_file != NULL)
357     ? config_pid_file
358     : LOCALSTATEDIR "/run/rrdcached.pid";
359
360   /* dirname may modify its argument */
361   file_copy = strdup(file);
362   if (file_copy == NULL)
363   {
364     fprintf(stderr, "rrdcached: strdup(): %s\n",
365         rrd_strerror(errno));
366     return -1;
367   }
368
369   dir = dirname(file_copy);
370   if (rrd_mkdir_p(dir, 0777) != 0)
371   {
372     fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
373         dir, rrd_strerror(errno));
374     return -1;
375   }
376
377   free(file_copy);
378
379   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
380   if (fd < 0)
381     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
382             action, file, rrd_strerror(errno));
383
384   return(fd);
385 } /* }}} static int open_pidfile */
386
387 /* check existing pid file to see whether a daemon is running */
388 static int check_pidfile(void)
389 {
390   int pid_fd;
391   pid_t pid;
392   char pid_str[16];
393
394   pid_fd = open_pidfile("open", O_RDWR);
395   if (pid_fd < 0)
396     return pid_fd;
397
398   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
399     return -1;
400
401   pid = atoi(pid_str);
402   if (pid <= 0)
403     return -1;
404
405   /* another running process that we can signal COULD be
406    * a competing rrdcached */
407   if (pid != getpid() && kill(pid, 0) == 0)
408   {
409     fprintf(stderr,
410             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
411     close(pid_fd);
412     return -1;
413   }
414
415   lseek(pid_fd, 0, SEEK_SET);
416   if (ftruncate(pid_fd, 0) == -1)
417   {
418     fprintf(stderr,
419             "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
420     close(pid_fd);
421     return -1;
422   }
423
424   fprintf(stderr,
425           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
426           "rrdcached: starting normally.\n", pid);
427
428   return pid_fd;
429 } /* }}} static int check_pidfile */
430
431 static int write_pidfile (int fd) /* {{{ */
432 {
433   pid_t pid;
434   FILE *fh;
435
436   pid = getpid ();
437
438   fh = fdopen (fd, "w");
439   if (fh == NULL)
440   {
441     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
442     close(fd);
443     return (-1);
444   }
445
446   fprintf (fh, "%i\n", (int) pid);
447   fclose (fh);
448
449   return (0);
450 } /* }}} int write_pidfile */
451
452 static int remove_pidfile (void) /* {{{ */
453 {
454   char *file;
455   int status;
456
457   file = (config_pid_file != NULL)
458     ? config_pid_file
459     : LOCALSTATEDIR "/run/rrdcached.pid";
460
461   status = unlink (file);
462   if (status == 0)
463     return (0);
464   return (errno);
465 } /* }}} int remove_pidfile */
466
467 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
468 {
469   char *eol;
470
471   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
472                sock->next_read - sock->next_cmd);
473
474   if (eol == NULL)
475   {
476     /* no commands left, move remainder back to front of rbuf */
477     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
478             sock->next_read - sock->next_cmd);
479     sock->next_read -= sock->next_cmd;
480     sock->next_cmd = 0;
481     *len = 0;
482     return NULL;
483   }
484   else
485   {
486     char *cmd = sock->rbuf + sock->next_cmd;
487     *eol = '\0';
488
489     sock->next_cmd = eol - sock->rbuf + 1;
490
491     if (eol > sock->rbuf && *(eol-1) == '\r')
492       *(--eol) = '\0'; /* handle "\r\n" EOL */
493
494     *len = eol - cmd;
495
496     return cmd;
497   }
498
499   /* NOTREACHED */
500   assert(1==0);
501 } /* }}} char *next_cmd */
502
503 /* add the characters directly to the write buffer */
504 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
505 {
506   char *new_buf;
507
508   assert(sock != NULL);
509
510   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
511   if (new_buf == NULL)
512   {
513     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
514     return -1;
515   }
516
517   strncpy(new_buf + sock->wbuf_len, str, len + 1);
518
519   sock->wbuf = new_buf;
520   sock->wbuf_len += len;
521
522   return 0;
523 } /* }}} static int add_to_wbuf */
524
525 /* add the text to the "extra" info that's sent after the status line */
526 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
527 {
528   va_list argp;
529   char buffer[CMD_MAX];
530   int len;
531
532   if (JOURNAL_REPLAY(sock)) return 0;
533   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
534
535   va_start(argp, fmt);
536 #ifdef HAVE_VSNPRINTF
537   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
538 #else
539   len = vsprintf(buffer, fmt, argp);
540 #endif
541   va_end(argp);
542   if (len < 0)
543   {
544     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
545     return -1;
546   }
547
548   return add_to_wbuf(sock, buffer, len);
549 } /* }}} static int add_response_info */
550
551 static int count_lines(char *str) /* {{{ */
552 {
553   int lines = 0;
554
555   if (str != NULL)
556   {
557     while ((str = strchr(str, '\n')) != NULL)
558     {
559       ++lines;
560       ++str;
561     }
562   }
563
564   return lines;
565 } /* }}} static int count_lines */
566
567 /* send the response back to the user.
568  * returns 0 on success, -1 on error
569  * write buffer is always zeroed after this call */
570 static int send_response (listen_socket_t *sock, response_code rc,
571                           char *fmt, ...) /* {{{ */
572 {
573   va_list argp;
574   char buffer[CMD_MAX];
575   int lines;
576   ssize_t wrote;
577   int rclen, len;
578
579   if (JOURNAL_REPLAY(sock)) return rc;
580
581   if (sock->batch_start)
582   {
583     if (rc == RESP_OK)
584       return rc; /* no response on success during BATCH */
585     lines = sock->batch_cmd;
586   }
587   else if (rc == RESP_OK)
588     lines = count_lines(sock->wbuf);
589   else
590     lines = -1;
591
592   rclen = sprintf(buffer, "%d ", lines);
593   va_start(argp, fmt);
594 #ifdef HAVE_VSNPRINTF
595   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
596 #else
597   len = vsprintf(buffer+rclen, fmt, argp);
598 #endif
599   va_end(argp);
600   if (len < 0)
601     return -1;
602
603   len += rclen;
604
605   /* append the result to the wbuf, don't write to the user */
606   if (sock->batch_start)
607     return add_to_wbuf(sock, buffer, len);
608
609   /* first write must be complete */
610   if (len != write(sock->fd, buffer, len))
611   {
612     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
613     return -1;
614   }
615
616   if (sock->wbuf != NULL && rc == RESP_OK)
617   {
618     wrote = 0;
619     while (wrote < sock->wbuf_len)
620     {
621       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
622       if (wb <= 0)
623       {
624         RRDD_LOG(LOG_INFO, "send_response: could not write results");
625         return -1;
626       }
627       wrote += wb;
628     }
629   }
630
631   free(sock->wbuf); sock->wbuf = NULL;
632   sock->wbuf_len = 0;
633
634   return 0;
635 } /* }}} */
636
637 static void wipe_ci_values(cache_item_t *ci, time_t when)
638 {
639   ci->values = NULL;
640   ci->values_num = 0;
641
642   ci->last_flush_time = when;
643   if (config_write_jitter > 0)
644     ci->last_flush_time += (rrd_random() % config_write_jitter);
645 }
646
647 /* remove_from_queue
648  * remove a "cache_item_t" item from the queue.
649  * must hold 'cache_lock' when calling this
650  */
651 static void remove_from_queue(cache_item_t *ci) /* {{{ */
652 {
653   if (ci == NULL) return;
654   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
655
656   if (ci->prev == NULL)
657     cache_queue_head = ci->next; /* reset head */
658   else
659     ci->prev->next = ci->next;
660
661   if (ci->next == NULL)
662     cache_queue_tail = ci->prev; /* reset the tail */
663   else
664     ci->next->prev = ci->prev;
665
666   ci->next = ci->prev = NULL;
667   ci->flags &= ~CI_FLAGS_IN_QUEUE;
668
669   pthread_mutex_lock (&stats_lock);
670   assert (stats_queue_length > 0);
671   stats_queue_length--;
672   pthread_mutex_unlock (&stats_lock);
673
674 } /* }}} static void remove_from_queue */
675
676 /* free the resources associated with the cache_item_t
677  * must hold cache_lock when calling this function
678  */
679 static void *free_cache_item(cache_item_t *ci) /* {{{ */
680 {
681   if (ci == NULL) return NULL;
682
683   remove_from_queue(ci);
684
685   for (size_t i=0; i < ci->values_num; i++)
686     free(ci->values[i]);
687
688   free (ci->values);
689   free (ci->file);
690
691   /* in case anyone is waiting */
692   pthread_cond_broadcast(&ci->flushed);
693   pthread_cond_destroy(&ci->flushed);
694
695   free (ci);
696
697   return NULL;
698 } /* }}} static void *free_cache_item */
699
700 /*
701  * enqueue_cache_item:
702  * `cache_lock' must be acquired before calling this function!
703  */
704 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
705     queue_side_t side)
706 {
707   if (ci == NULL)
708     return (-1);
709
710   if (ci->values_num == 0)
711     return (0);
712
713   if (side == HEAD)
714   {
715     if (cache_queue_head == ci)
716       return 0;
717
718     /* remove if further down in queue */
719     remove_from_queue(ci);
720
721     ci->prev = NULL;
722     ci->next = cache_queue_head;
723     if (ci->next != NULL)
724       ci->next->prev = ci;
725     cache_queue_head = ci;
726
727     if (cache_queue_tail == NULL)
728       cache_queue_tail = cache_queue_head;
729   }
730   else /* (side == TAIL) */
731   {
732     /* We don't move values back in the list.. */
733     if (ci->flags & CI_FLAGS_IN_QUEUE)
734       return (0);
735
736     assert (ci->next == NULL);
737     assert (ci->prev == NULL);
738
739     ci->prev = cache_queue_tail;
740
741     if (cache_queue_tail == NULL)
742       cache_queue_head = ci;
743     else
744       cache_queue_tail->next = ci;
745
746     cache_queue_tail = ci;
747   }
748
749   ci->flags |= CI_FLAGS_IN_QUEUE;
750
751   pthread_cond_signal(&queue_cond);
752   pthread_mutex_lock (&stats_lock);
753   stats_queue_length++;
754   pthread_mutex_unlock (&stats_lock);
755
756   return (0);
757 } /* }}} int enqueue_cache_item */
758
759 /*
760  * tree_callback_flush:
761  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
762  * while this is in progress.
763  */
764 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
765     gpointer data)
766 {
767   cache_item_t *ci;
768   callback_flush_data_t *cfd;
769
770   ci = (cache_item_t *) value;
771   cfd = (callback_flush_data_t *) data;
772
773   if (ci->flags & CI_FLAGS_IN_QUEUE)
774     return FALSE;
775
776   if (ci->values_num > 0
777       && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
778   {
779     enqueue_cache_item (ci, TAIL);
780   }
781   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
782       && (ci->values_num <= 0))
783   {
784     assert ((char *) key == ci->file);
785     if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
786     {
787       RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
788       return (FALSE);
789     }
790   }
791
792   return (FALSE);
793 } /* }}} gboolean tree_callback_flush */
794
795 static int flush_old_values (int max_age)
796 {
797   callback_flush_data_t cfd;
798   size_t k;
799
800   memset (&cfd, 0, sizeof (cfd));
801   /* Pass the current time as user data so that we don't need to call
802    * `time' for each node. */
803   cfd.now = time (NULL);
804   cfd.keys = NULL;
805   cfd.keys_num = 0;
806
807   if (max_age > 0)
808     cfd.abs_timeout = cfd.now - max_age;
809   else
810     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
811
812   /* `tree_callback_flush' will return the keys of all values that haven't
813    * been touched in the last `config_flush_interval' seconds in `cfd'.
814    * The char*'s in this array point to the same memory as ci->file, so we
815    * don't need to free them separately. */
816   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
817
818   for (k = 0; k < cfd.keys_num; k++)
819   {
820     /* should never fail, since we have held the cache_lock
821      * the entire time */
822     assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
823   }
824
825   if (cfd.keys != NULL)
826   {
827     free (cfd.keys);
828     cfd.keys = NULL;
829   }
830
831   return (0);
832 } /* int flush_old_values */
833
834 static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
835 {
836   struct timeval now;
837   struct timespec next_flush;
838   int status;
839
840   gettimeofday (&now, NULL);
841   next_flush.tv_sec = now.tv_sec + config_flush_interval;
842   next_flush.tv_nsec = 1000 * now.tv_usec;
843
844   pthread_mutex_lock(&cache_lock);
845
846   while (state == RUNNING)
847   {
848     gettimeofday (&now, NULL);
849     if ((now.tv_sec > next_flush.tv_sec)
850         || ((now.tv_sec == next_flush.tv_sec)
851           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
852     {
853       RRDD_LOG(LOG_DEBUG, "flushing old values");
854
855       /* Determine the time of the next cache flush. */
856       next_flush.tv_sec = now.tv_sec + config_flush_interval;
857
858       /* Flush all values that haven't been written in the last
859        * `config_write_interval' seconds. */
860       flush_old_values (config_write_interval);
861
862       /* unlock the cache while we rotate so we don't block incoming
863        * updates if the fsync() blocks on disk I/O */
864       pthread_mutex_unlock(&cache_lock);
865       journal_rotate();
866       pthread_mutex_lock(&cache_lock);
867     }
868
869     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
870     if (status != 0 && status != ETIMEDOUT)
871     {
872       RRDD_LOG (LOG_ERR, "flush_thread_main: "
873                 "pthread_cond_timedwait returned %i.", status);
874     }
875   }
876
877   if (config_flush_at_shutdown)
878     flush_old_values (-1); /* flush everything */
879
880   state = SHUTDOWN;
881
882   pthread_mutex_unlock(&cache_lock);
883
884   return NULL;
885 } /* void *flush_thread_main */
886
887 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
888 {
889   pthread_mutex_lock (&cache_lock);
890
891   while (state != SHUTDOWN
892          || (cache_queue_head != NULL && config_flush_at_shutdown))
893   {
894     cache_item_t *ci;
895     char *file;
896     char **values;
897     size_t values_num;
898     int status;
899
900     /* Now, check if there's something to store away. If not, wait until
901      * something comes in. */
902     if (cache_queue_head == NULL)
903     {
904       status = pthread_cond_wait (&queue_cond, &cache_lock);
905       if ((status != 0) && (status != ETIMEDOUT))
906       {
907         RRDD_LOG (LOG_ERR, "queue_thread_main: "
908             "pthread_cond_wait returned %i.", status);
909       }
910     }
911
912     /* Check if a value has arrived. This may be NULL if we timed out or there
913      * was an interrupt such as a signal. */
914     if (cache_queue_head == NULL)
915       continue;
916
917     ci = cache_queue_head;
918
919     /* copy the relevant parts */
920     file = strdup (ci->file);
921     if (file == NULL)
922     {
923       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
924       continue;
925     }
926
927     assert(ci->values != NULL);
928     assert(ci->values_num > 0);
929
930     values = ci->values;
931     values_num = ci->values_num;
932
933     wipe_ci_values(ci, time(NULL));
934     remove_from_queue(ci);
935
936     pthread_mutex_unlock (&cache_lock);
937
938     rrd_clear_error ();
939     status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
940     if (status != 0)
941     {
942       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
943           "rrd_update_r (%s) failed with status %i. (%s)",
944           file, status, rrd_get_error());
945     }
946
947     journal_write("wrote", file);
948
949     /* Search again in the tree.  It's possible someone issued a "FORGET"
950      * while we were writing the update values. */
951     pthread_mutex_lock(&cache_lock);
952     ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
953     if (ci)
954       pthread_cond_broadcast(&ci->flushed);
955     pthread_mutex_unlock(&cache_lock);
956
957     if (status == 0)
958     {
959       pthread_mutex_lock (&stats_lock);
960       stats_updates_written++;
961       stats_data_sets_written += values_num;
962       pthread_mutex_unlock (&stats_lock);
963     }
964
965     rrd_free_ptrs((void ***) &values, &values_num);
966     free(file);
967
968     pthread_mutex_lock (&cache_lock);
969   }
970   pthread_mutex_unlock (&cache_lock);
971
972   return (NULL);
973 } /* }}} void *queue_thread_main */
974
975 static int buffer_get_field (char **buffer_ret, /* {{{ */
976     size_t *buffer_size_ret, char **field_ret)
977 {
978   char *buffer;
979   size_t buffer_pos;
980   size_t buffer_size;
981   char *field;
982   size_t field_size;
983   int status;
984
985   buffer = *buffer_ret;
986   buffer_pos = 0;
987   buffer_size = *buffer_size_ret;
988   field = *buffer_ret;
989   field_size = 0;
990
991   if (buffer_size <= 0)
992     return (-1);
993
994   /* This is ensured by `handle_request'. */
995   assert (buffer[buffer_size - 1] == '\0');
996
997   status = -1;
998   while (buffer_pos < buffer_size)
999   {
1000     /* Check for end-of-field or end-of-buffer */
1001     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1002     {
1003       field[field_size] = 0;
1004       field_size++;
1005       buffer_pos++;
1006       status = 0;
1007       break;
1008     }
1009     /* Handle escaped characters. */
1010     else if (buffer[buffer_pos] == '\\')
1011     {
1012       if (buffer_pos >= (buffer_size - 1))
1013         break;
1014       buffer_pos++;
1015       field[field_size] = buffer[buffer_pos];
1016       field_size++;
1017       buffer_pos++;
1018     }
1019     /* Normal operation */ 
1020     else
1021     {
1022       field[field_size] = buffer[buffer_pos];
1023       field_size++;
1024       buffer_pos++;
1025     }
1026   } /* while (buffer_pos < buffer_size) */
1027
1028   if (status != 0)
1029     return (status);
1030
1031   *buffer_ret = buffer + buffer_pos;
1032   *buffer_size_ret = buffer_size - buffer_pos;
1033   *field_ret = field;
1034
1035   return (0);
1036 } /* }}} int buffer_get_field */
1037
1038 /* if we're restricting writes to the base directory,
1039  * check whether the file falls within the dir
1040  * returns 1 if OK, otherwise 0
1041  */
1042 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1043 {
1044   assert(file != NULL);
1045
1046   if (!config_write_base_only
1047       || JOURNAL_REPLAY(sock)
1048       || config_base_dir == NULL)
1049     return 1;
1050
1051   if (strstr(file, "../") != NULL) goto err;
1052
1053   /* relative paths without "../" are ok */
1054   if (*file != '/') return 1;
1055
1056   /* file must be of the format base + "/" + <1+ char filename> */
1057   if (strlen(file) < _config_base_dir_len + 2) goto err;
1058   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1059   if (*(file + _config_base_dir_len) != '/') goto err;
1060
1061   return 1;
1062
1063 err:
1064   if (sock != NULL && sock->fd >= 0)
1065     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1066
1067   return 0;
1068 } /* }}} static int check_file_access */
1069
1070 /* when using a base dir, convert relative paths to absolute paths.
1071  * if necessary, modifies the "filename" pointer to point
1072  * to the new path created in "tmp".  "tmp" is provided
1073  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1074  *
1075  * this allows us to optimize for the expected case (absolute path)
1076  * with a no-op.
1077  */
1078 static void get_abs_path(char **filename, char *tmp)
1079 {
1080   assert(tmp != NULL);
1081   assert(filename != NULL && *filename != NULL);
1082
1083   if (config_base_dir == NULL || **filename == '/')
1084     return;
1085
1086   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1087   *filename = tmp;
1088 } /* }}} static int get_abs_path */
1089
1090 static int flush_file (const char *filename) /* {{{ */
1091 {
1092   cache_item_t *ci;
1093
1094   pthread_mutex_lock (&cache_lock);
1095
1096   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1097   if (ci == NULL)
1098   {
1099     pthread_mutex_unlock (&cache_lock);
1100     return (ENOENT);
1101   }
1102
1103   if (ci->values_num > 0)
1104   {
1105     /* Enqueue at head */
1106     enqueue_cache_item (ci, HEAD);
1107     pthread_cond_wait(&ci->flushed, &cache_lock);
1108   }
1109
1110   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1111    * may have been purged during our cond_wait() */
1112
1113   pthread_mutex_unlock(&cache_lock);
1114
1115   return (0);
1116 } /* }}} int flush_file */
1117
1118 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1119 {
1120   char *err = "Syntax error.\n";
1121
1122   if (cmd && cmd->syntax)
1123     err = cmd->syntax;
1124
1125   return send_response(sock, RESP_ERR, "Usage: %s", err);
1126 } /* }}} static int syntax_error() */
1127
1128 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1129 {
1130   uint64_t copy_queue_length;
1131   uint64_t copy_updates_received;
1132   uint64_t copy_flush_received;
1133   uint64_t copy_updates_written;
1134   uint64_t copy_data_sets_written;
1135   uint64_t copy_journal_bytes;
1136   uint64_t copy_journal_rotate;
1137
1138   uint64_t tree_nodes_number;
1139   uint64_t tree_depth;
1140
1141   pthread_mutex_lock (&stats_lock);
1142   copy_queue_length       = stats_queue_length;
1143   copy_updates_received   = stats_updates_received;
1144   copy_flush_received     = stats_flush_received;
1145   copy_updates_written    = stats_updates_written;
1146   copy_data_sets_written  = stats_data_sets_written;
1147   copy_journal_bytes      = stats_journal_bytes;
1148   copy_journal_rotate     = stats_journal_rotate;
1149   pthread_mutex_unlock (&stats_lock);
1150
1151   pthread_mutex_lock (&cache_lock);
1152   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1153   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1154   pthread_mutex_unlock (&cache_lock);
1155
1156   add_response_info(sock,
1157                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1158   add_response_info(sock,
1159                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1160   add_response_info(sock,
1161                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1162   add_response_info(sock,
1163                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1164   add_response_info(sock,
1165                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1166   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1167   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1168   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1169   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1170
1171   send_response(sock, RESP_OK, "Statistics follow\n");
1172
1173   return (0);
1174 } /* }}} int handle_request_stats */
1175
1176 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1177 {
1178   char *file, file_tmp[PATH_MAX];
1179   int status;
1180
1181   status = buffer_get_field (&buffer, &buffer_size, &file);
1182   if (status != 0)
1183   {
1184     return syntax_error(sock,cmd);
1185   }
1186   else
1187   {
1188     pthread_mutex_lock(&stats_lock);
1189     stats_flush_received++;
1190     pthread_mutex_unlock(&stats_lock);
1191
1192     get_abs_path(&file, file_tmp);
1193     if (!check_file_access(file, sock)) return 0;
1194
1195     status = flush_file (file);
1196     if (status == 0)
1197       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1198     else if (status == ENOENT)
1199     {
1200       /* no file in our tree; see whether it exists at all */
1201       struct stat statbuf;
1202
1203       memset(&statbuf, 0, sizeof(statbuf));
1204       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1205         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1206       else
1207         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1208     }
1209     else if (status < 0)
1210       return send_response(sock, RESP_ERR, "Internal error.\n");
1211     else
1212       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1213   }
1214
1215   /* NOTREACHED */
1216   assert(1==0);
1217 } /* }}} int handle_request_flush */
1218
1219 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1220 {
1221   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1222
1223   pthread_mutex_lock(&cache_lock);
1224   flush_old_values(-1);
1225   pthread_mutex_unlock(&cache_lock);
1226
1227   return send_response(sock, RESP_OK, "Started flush.\n");
1228 } /* }}} static int handle_request_flushall */
1229
1230 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1231 {
1232   int status;
1233   char *file, file_tmp[PATH_MAX];
1234   cache_item_t *ci;
1235
1236   status = buffer_get_field(&buffer, &buffer_size, &file);
1237   if (status != 0)
1238     return syntax_error(sock,cmd);
1239
1240   get_abs_path(&file, file_tmp);
1241
1242   pthread_mutex_lock(&cache_lock);
1243   ci = g_tree_lookup(cache_tree, file);
1244   if (ci == NULL)
1245   {
1246     pthread_mutex_unlock(&cache_lock);
1247     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1248   }
1249
1250   for (size_t i=0; i < ci->values_num; i++)
1251     add_response_info(sock, "%s\n", ci->values[i]);
1252
1253   pthread_mutex_unlock(&cache_lock);
1254   return send_response(sock, RESP_OK, "updates pending\n");
1255 } /* }}} static int handle_request_pending */
1256
1257 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1258 {
1259   int status;
1260   gboolean found;
1261   char *file, file_tmp[PATH_MAX];
1262
1263   status = buffer_get_field(&buffer, &buffer_size, &file);
1264   if (status != 0)
1265     return syntax_error(sock,cmd);
1266
1267   get_abs_path(&file, file_tmp);
1268   if (!check_file_access(file, sock)) return 0;
1269
1270   pthread_mutex_lock(&cache_lock);
1271   found = g_tree_remove(cache_tree, file);
1272   pthread_mutex_unlock(&cache_lock);
1273
1274   if (found == TRUE)
1275   {
1276     if (!JOURNAL_REPLAY(sock))
1277       journal_write("forget", file);
1278
1279     return send_response(sock, RESP_OK, "Gone!\n");
1280   }
1281   else
1282     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1283
1284   /* NOTREACHED */
1285   assert(1==0);
1286 } /* }}} static int handle_request_forget */
1287
1288 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1289 {
1290   cache_item_t *ci;
1291
1292   pthread_mutex_lock(&cache_lock);
1293
1294   ci = cache_queue_head;
1295   while (ci != NULL)
1296   {
1297     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1298     ci = ci->next;
1299   }
1300
1301   pthread_mutex_unlock(&cache_lock);
1302
1303   return send_response(sock, RESP_OK, "in queue.\n");
1304 } /* }}} int handle_request_queue */
1305
1306 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1307 {
1308   char *file, file_tmp[PATH_MAX];
1309   int values_num = 0;
1310   int status;
1311   char orig_buf[CMD_MAX];
1312
1313   cache_item_t *ci;
1314
1315   /* save it for the journal later */
1316   if (!JOURNAL_REPLAY(sock))
1317     strncpy(orig_buf, buffer, buffer_size);
1318
1319   status = buffer_get_field (&buffer, &buffer_size, &file);
1320   if (status != 0)
1321     return syntax_error(sock,cmd);
1322
1323   pthread_mutex_lock(&stats_lock);
1324   stats_updates_received++;
1325   pthread_mutex_unlock(&stats_lock);
1326
1327   get_abs_path(&file, file_tmp);
1328   if (!check_file_access(file, sock)) return 0;
1329
1330   pthread_mutex_lock (&cache_lock);
1331   ci = g_tree_lookup (cache_tree, file);
1332
1333   if (ci == NULL) /* {{{ */
1334   {
1335     struct stat statbuf;
1336     cache_item_t *tmp;
1337
1338     /* don't hold the lock while we setup; stat(2) might block */
1339     pthread_mutex_unlock(&cache_lock);
1340
1341     memset (&statbuf, 0, sizeof (statbuf));
1342     status = stat (file, &statbuf);
1343     if (status != 0)
1344     {
1345       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1346
1347       status = errno;
1348       if (status == ENOENT)
1349         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1350       else
1351         return send_response(sock, RESP_ERR,
1352                              "stat failed with error %i.\n", status);
1353     }
1354     if (!S_ISREG (statbuf.st_mode))
1355       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1356
1357     if (access(file, R_OK|W_OK) != 0)
1358       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1359                            file, rrd_strerror(errno));
1360
1361     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1362     if (ci == NULL)
1363     {
1364       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1365
1366       return send_response(sock, RESP_ERR, "malloc failed.\n");
1367     }
1368     memset (ci, 0, sizeof (cache_item_t));
1369
1370     ci->file = strdup (file);
1371     if (ci->file == NULL)
1372     {
1373       free (ci);
1374       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1375
1376       return send_response(sock, RESP_ERR, "strdup failed.\n");
1377     }
1378
1379     wipe_ci_values(ci, now);
1380     ci->flags = CI_FLAGS_IN_TREE;
1381     pthread_cond_init(&ci->flushed, NULL);
1382
1383     pthread_mutex_lock(&cache_lock);
1384
1385     /* another UPDATE might have added this entry in the meantime */
1386     tmp = g_tree_lookup (cache_tree, file);
1387     if (tmp == NULL)
1388       g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1389     else
1390     {
1391       free_cache_item (ci);
1392       ci = tmp;
1393     }
1394
1395     /* state may have changed while we were unlocked */
1396     if (state == SHUTDOWN)
1397       return -1;
1398   } /* }}} */
1399   assert (ci != NULL);
1400
1401   /* don't re-write updates in replay mode */
1402   if (!JOURNAL_REPLAY(sock))
1403     journal_write("update", orig_buf);
1404
1405   while (buffer_size > 0)
1406   {
1407     char *value;
1408     time_t stamp;
1409     char *eostamp;
1410
1411     status = buffer_get_field (&buffer, &buffer_size, &value);
1412     if (status != 0)
1413     {
1414       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1415       break;
1416     }
1417
1418     /* make sure update time is always moving forward */
1419     stamp = strtol(value, &eostamp, 10);
1420     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1421     {
1422       pthread_mutex_unlock(&cache_lock);
1423       return send_response(sock, RESP_ERR,
1424                            "Cannot find timestamp in '%s'!\n", value);
1425     }
1426     else if (stamp <= ci->last_update_stamp)
1427     {
1428       pthread_mutex_unlock(&cache_lock);
1429       return send_response(sock, RESP_ERR,
1430                            "illegal attempt to update using time %ld when last"
1431                            " update time is %ld (minimum one second step)\n",
1432                            stamp, ci->last_update_stamp);
1433     }
1434     else
1435       ci->last_update_stamp = stamp;
1436
1437     if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
1438     {
1439       RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1440       continue;
1441     }
1442
1443     values_num++;
1444   }
1445
1446   if (((now - ci->last_flush_time) >= config_write_interval)
1447       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1448       && (ci->values_num > 0))
1449   {
1450     enqueue_cache_item (ci, TAIL);
1451   }
1452
1453   pthread_mutex_unlock (&cache_lock);
1454
1455   if (values_num < 1)
1456     return send_response(sock, RESP_ERR, "No values updated.\n");
1457   else
1458     return send_response(sock, RESP_OK,
1459                          "errors, enqueued %i value(s).\n", values_num);
1460
1461   /* NOTREACHED */
1462   assert(1==0);
1463
1464 } /* }}} int handle_request_update */
1465
1466 /* we came across a "WROTE" entry during journal replay.
1467  * throw away any values that we have accumulated for this file
1468  */
1469 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1470 {
1471   cache_item_t *ci;
1472   const char *file = buffer;
1473
1474   pthread_mutex_lock(&cache_lock);
1475
1476   ci = g_tree_lookup(cache_tree, file);
1477   if (ci == NULL)
1478   {
1479     pthread_mutex_unlock(&cache_lock);
1480     return (0);
1481   }
1482
1483   if (ci->values)
1484     rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1485
1486   wipe_ci_values(ci, now);
1487   remove_from_queue(ci);
1488
1489   pthread_mutex_unlock(&cache_lock);
1490   return (0);
1491 } /* }}} int handle_request_wrote */
1492
1493 /* start "BATCH" processing */
1494 static int batch_start (HANDLER_PROTO) /* {{{ */
1495 {
1496   int status;
1497   if (sock->batch_start)
1498     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1499
1500   status = send_response(sock, RESP_OK,
1501                          "Go ahead.  End with dot '.' on its own line.\n");
1502   sock->batch_start = time(NULL);
1503   sock->batch_cmd = 0;
1504
1505   return status;
1506 } /* }}} static int batch_start */
1507
1508 /* finish "BATCH" processing and return results to the client */
1509 static int batch_done (HANDLER_PROTO) /* {{{ */
1510 {
1511   assert(sock->batch_start);
1512   sock->batch_start = 0;
1513   sock->batch_cmd  = 0;
1514   return send_response(sock, RESP_OK, "errors\n");
1515 } /* }}} static int batch_done */
1516
1517 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1518 {
1519   return -1;
1520 } /* }}} static int handle_request_quit */
1521
1522 static command_t list_of_commands[] = { /* {{{ */
1523   {
1524     "UPDATE",
1525     handle_request_update,
1526     CMD_CONTEXT_ANY,
1527     "UPDATE <filename> <values> [<values> ...]\n"
1528     ,
1529     "Adds the given file to the internal cache if it is not yet known and\n"
1530     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1531     "for details.\n"
1532     "\n"
1533     "Each <values> has the following form:\n"
1534     "  <values> = <time>:<value>[:<value>[...]]\n"
1535     "See the rrdupdate(1) manpage for details.\n"
1536   },
1537   {
1538     "WROTE",
1539     handle_request_wrote,
1540     CMD_CONTEXT_JOURNAL,
1541     NULL,
1542     NULL
1543   },
1544   {
1545     "FLUSH",
1546     handle_request_flush,
1547     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1548     "FLUSH <filename>\n"
1549     ,
1550     "Adds the given filename to the head of the update queue and returns\n"
1551     "after it has been dequeued.\n"
1552   },
1553   {
1554     "FLUSHALL",
1555     handle_request_flushall,
1556     CMD_CONTEXT_CLIENT,
1557     "FLUSHALL\n"
1558     ,
1559     "Triggers writing of all pending updates.  Returns immediately.\n"
1560   },
1561   {
1562     "PENDING",
1563     handle_request_pending,
1564     CMD_CONTEXT_CLIENT,
1565     "PENDING <filename>\n"
1566     ,
1567     "Shows any 'pending' updates for a file, in order.\n"
1568     "The updates shown have not yet been written to the underlying RRD file.\n"
1569   },
1570   {
1571     "FORGET",
1572     handle_request_forget,
1573     CMD_CONTEXT_ANY,
1574     "FORGET <filename>\n"
1575     ,
1576     "Removes the file completely from the cache.\n"
1577     "Any pending updates for the file will be lost.\n"
1578   },
1579   {
1580     "QUEUE",
1581     handle_request_queue,
1582     CMD_CONTEXT_CLIENT,
1583     "QUEUE\n"
1584     ,
1585         "Shows all files in the output queue.\n"
1586     "The output is zero or more lines in the following format:\n"
1587     "(where <num_vals> is the number of values to be written)\n"
1588     "\n"
1589     "<num_vals> <filename>\n"
1590   },
1591   {
1592     "STATS",
1593     handle_request_stats,
1594     CMD_CONTEXT_CLIENT,
1595     "STATS\n"
1596     ,
1597     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1598     "a description of the values.\n"
1599   },
1600   {
1601     "HELP",
1602     handle_request_help,
1603     CMD_CONTEXT_CLIENT,
1604     "HELP [<command>]\n",
1605     NULL, /* special! */
1606   },
1607   {
1608     "BATCH",
1609     batch_start,
1610     CMD_CONTEXT_CLIENT,
1611     "BATCH\n"
1612     ,
1613     "The 'BATCH' command permits the client to initiate a bulk load\n"
1614     "   of commands to rrdcached.\n"
1615     "\n"
1616     "Usage:\n"
1617     "\n"
1618     "    client: BATCH\n"
1619     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1620     "    client: command #1\n"
1621     "    client: command #2\n"
1622     "    client: ... and so on\n"
1623     "    client: .\n"
1624     "    server: 2 errors\n"
1625     "    server: 7 message for command #7\n"
1626     "    server: 9 message for command #9\n"
1627     "\n"
1628     "For more information, consult the rrdcached(1) documentation.\n"
1629   },
1630   {
1631     ".",   /* BATCH terminator */
1632     batch_done,
1633     CMD_CONTEXT_BATCH,
1634     NULL,
1635     NULL
1636   },
1637   {
1638     "QUIT",
1639     handle_request_quit,
1640     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1641     "QUIT\n"
1642     ,
1643     "Disconnect from rrdcached.\n"
1644   }
1645 }; /* }}} command_t list_of_commands[] */
1646 static size_t list_of_commands_len = sizeof (list_of_commands)
1647   / sizeof (list_of_commands[0]);
1648
1649 static command_t *find_command(char *cmd)
1650 {
1651   size_t i;
1652
1653   for (i = 0; i < list_of_commands_len; i++)
1654     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1655       return (&list_of_commands[i]);
1656   return NULL;
1657 }
1658
1659 /* We currently use the index in the `list_of_commands' array as a bit position
1660  * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
1661  * outside these functions so that switching to a more elegant storage method
1662  * is easily possible. */
1663 static ssize_t find_command_index (const char *cmd) /* {{{ */
1664 {
1665   size_t i;
1666
1667   for (i = 0; i < list_of_commands_len; i++)
1668     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1669       return ((ssize_t) i);
1670   return (-1);
1671 } /* }}} ssize_t find_command_index */
1672
1673 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
1674     const char *cmd)
1675 {
1676   ssize_t i;
1677
1678   if (JOURNAL_REPLAY(sock))
1679     return (1);
1680
1681   if (cmd == NULL)
1682     return (-1);
1683
1684   if ((strcasecmp ("QUIT", cmd) == 0)
1685       || (strcasecmp ("HELP", cmd) == 0))
1686     return (1);
1687   else if (strcmp (".", cmd) == 0)
1688     cmd = "BATCH";
1689
1690   i = find_command_index (cmd);
1691   if (i < 0)
1692     return (-1);
1693   assert (i < 32);
1694
1695   if ((sock->permissions & (1 << i)) != 0)
1696     return (1);
1697   return (0);
1698 } /* }}} int socket_permission_check */
1699
1700 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
1701     const char *cmd)
1702 {
1703   ssize_t i;
1704
1705   i = find_command_index (cmd);
1706   if (i < 0)
1707     return (-1);
1708   assert (i < 32);
1709
1710   sock->permissions |= (1 << i);
1711   return (0);
1712 } /* }}} int socket_permission_add */
1713
1714 /* check whether commands are received in the expected context */
1715 static int command_check_context(listen_socket_t *sock, command_t *cmd)
1716 {
1717   if (JOURNAL_REPLAY(sock))
1718     return (cmd->context & CMD_CONTEXT_JOURNAL);
1719   else if (sock->batch_start)
1720     return (cmd->context & CMD_CONTEXT_BATCH);
1721   else
1722     return (cmd->context & CMD_CONTEXT_CLIENT);
1723
1724   /* NOTREACHED */
1725   assert(1==0);
1726 }
1727
1728 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1729 {
1730   int status;
1731   char *cmd_str;
1732   char *resp_txt;
1733   command_t *help = NULL;
1734
1735   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1736   if (status == 0)
1737     help = find_command(cmd_str);
1738
1739   if (help && (help->syntax || help->help))
1740   {
1741     char tmp[CMD_MAX];
1742
1743     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1744     resp_txt = tmp;
1745
1746     if (help->syntax)
1747       add_response_info(sock, "Usage: %s\n", help->syntax);
1748
1749     if (help->help)
1750       add_response_info(sock, "%s\n", help->help);
1751   }
1752   else
1753   {
1754     size_t i;
1755
1756     resp_txt = "Command overview\n";
1757
1758     for (i = 0; i < list_of_commands_len; i++)
1759     {
1760       if (list_of_commands[i].syntax == NULL)
1761         continue;
1762       add_response_info (sock, "%s", list_of_commands[i].syntax);
1763     }
1764   }
1765
1766   return send_response(sock, RESP_OK, resp_txt);
1767 } /* }}} int handle_request_help */
1768
1769 static int handle_request (DISPATCH_PROTO) /* {{{ */
1770 {
1771   char *buffer_ptr = buffer;
1772   char *cmd_str = NULL;
1773   command_t *cmd = NULL;
1774   int status;
1775
1776   assert (buffer[buffer_size - 1] == '\0');
1777
1778   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1779   if (status != 0)
1780   {
1781     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1782     return (-1);
1783   }
1784
1785   if (sock != NULL && sock->batch_start)
1786     sock->batch_cmd++;
1787
1788   cmd = find_command(cmd_str);
1789   if (!cmd)
1790     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1791
1792   if (!socket_permission_check (sock, cmd->cmd))
1793     return send_response(sock, RESP_ERR, "Permission denied.\n");
1794
1795   if (!command_check_context(sock, cmd))
1796     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1797
1798   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1799 } /* }}} int handle_request */
1800
1801 static void journal_set_free (journal_set *js) /* {{{ */
1802 {
1803   if (js == NULL)
1804     return;
1805
1806   rrd_free_ptrs((void ***) &js->files, &js->files_num);
1807
1808   free(js);
1809 } /* }}} journal_set_free */
1810
1811 static void journal_set_remove (journal_set *js) /* {{{ */
1812 {
1813   if (js == NULL)
1814     return;
1815
1816   for (uint i=0; i < js->files_num; i++)
1817   {
1818     RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
1819     unlink(js->files[i]);
1820   }
1821 } /* }}} journal_set_remove */
1822
1823 /* close current journal file handle.
1824  * MUST hold journal_lock before calling */
1825 static void journal_close(void) /* {{{ */
1826 {
1827   if (journal_fh != NULL)
1828   {
1829     if (fclose(journal_fh) != 0)
1830       RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
1831   }
1832
1833   journal_fh = NULL;
1834   journal_size = 0;
1835 } /* }}} journal_close */
1836
1837 /* MUST hold journal_lock before calling */
1838 static void journal_new_file(void) /* {{{ */
1839 {
1840   struct timeval now;
1841   int  new_fd;
1842   char new_file[PATH_MAX + 1];
1843
1844   assert(journal_dir != NULL);
1845   assert(journal_cur != NULL);
1846
1847   journal_close();
1848
1849   gettimeofday(&now, NULL);
1850   /* this format assures that the files sort in strcmp() order */
1851   snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
1852            journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
1853
1854   new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
1855                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1856   if (new_fd < 0)
1857     goto error;
1858
1859   journal_fh = fdopen(new_fd, "a");
1860   if (journal_fh == NULL)
1861     goto error;
1862
1863   journal_size = ftell(journal_fh);
1864   RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
1865
1866   /* record the file in the journal set */
1867   rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
1868
1869   return;
1870
1871 error:
1872   RRDD_LOG(LOG_CRIT,
1873            "JOURNALING DISABLED: Error while trying to create %s : %s",
1874            new_file, rrd_strerror(errno));
1875   RRDD_LOG(LOG_CRIT,
1876            "JOURNALING DISABLED: All values will be flushed at shutdown");
1877
1878   close(new_fd);
1879   config_flush_at_shutdown = 1;
1880
1881 } /* }}} journal_new_file */
1882
1883 /* MUST NOT hold journal_lock before calling this */
1884 static void journal_rotate(void) /* {{{ */
1885 {
1886   journal_set *old_js = NULL;
1887
1888   if (journal_dir == NULL)
1889     return;
1890
1891   RRDD_LOG(LOG_DEBUG, "rotating journals");
1892
1893   pthread_mutex_lock(&stats_lock);
1894   ++stats_journal_rotate;
1895   pthread_mutex_unlock(&stats_lock);
1896
1897   pthread_mutex_lock(&journal_lock);
1898
1899   journal_close();
1900
1901   /* rotate the journal sets */
1902   old_js = journal_old;
1903   journal_old = journal_cur;
1904   journal_cur = calloc(1, sizeof(journal_set));
1905
1906   if (journal_cur != NULL)
1907     journal_new_file();
1908   else
1909     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
1910
1911   pthread_mutex_unlock(&journal_lock);
1912
1913   journal_set_remove(old_js);
1914   journal_set_free  (old_js);
1915
1916 } /* }}} static void journal_rotate */
1917
1918 /* MUST hold journal_lock when calling */
1919 static void journal_done(void) /* {{{ */
1920 {
1921   if (journal_cur == NULL)
1922     return;
1923
1924   journal_close();
1925
1926   if (config_flush_at_shutdown)
1927   {
1928     RRDD_LOG(LOG_INFO, "removing journals");
1929     journal_set_remove(journal_old);
1930     journal_set_remove(journal_cur);
1931   }
1932   else
1933   {
1934     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1935              "journals will be used at next startup");
1936   }
1937
1938   journal_set_free(journal_cur);
1939   journal_set_free(journal_old);
1940   free(journal_dir);
1941
1942 } /* }}} static void journal_done */
1943
1944 static int journal_write(char *cmd, char *args) /* {{{ */
1945 {
1946   int chars;
1947
1948   if (journal_fh == NULL)
1949     return 0;
1950
1951   pthread_mutex_lock(&journal_lock);
1952   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1953   journal_size += chars;
1954
1955   if (journal_size > JOURNAL_MAX)
1956     journal_new_file();
1957
1958   pthread_mutex_unlock(&journal_lock);
1959
1960   if (chars > 0)
1961   {
1962     pthread_mutex_lock(&stats_lock);
1963     stats_journal_bytes += chars;
1964     pthread_mutex_unlock(&stats_lock);
1965   }
1966
1967   return chars;
1968 } /* }}} static int journal_write */
1969
1970 static int journal_replay (const char *file) /* {{{ */
1971 {
1972   FILE *fh;
1973   int entry_cnt = 0;
1974   int fail_cnt = 0;
1975   uint64_t line = 0;
1976   char entry[CMD_MAX];
1977   time_t now;
1978
1979   if (file == NULL) return 0;
1980
1981   {
1982     char *reason = "unknown error";
1983     int status = 0;
1984     struct stat statbuf;
1985
1986     memset(&statbuf, 0, sizeof(statbuf));
1987     if (stat(file, &statbuf) != 0)
1988     {
1989       reason = "stat error";
1990       status = errno;
1991     }
1992     else if (!S_ISREG(statbuf.st_mode))
1993     {
1994       reason = "not a regular file";
1995       status = EPERM;
1996     }
1997     if (statbuf.st_uid != daemon_uid)
1998     {
1999       reason = "not owned by daemon user";
2000       status = EACCES;
2001     }
2002     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2003     {
2004       reason = "must not be user/group writable";
2005       status = EACCES;
2006     }
2007
2008     if (status != 0)
2009     {
2010       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2011                file, rrd_strerror(status), reason);
2012       return 0;
2013     }
2014   }
2015
2016   fh = fopen(file, "r");
2017   if (fh == NULL)
2018   {
2019     if (errno != ENOENT)
2020       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2021                file, rrd_strerror(errno));
2022     return 0;
2023   }
2024   else
2025     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2026
2027   now = time(NULL);
2028
2029   while(!feof(fh))
2030   {
2031     size_t entry_len;
2032
2033     ++line;
2034     if (fgets(entry, sizeof(entry), fh) == NULL)
2035       break;
2036     entry_len = strlen(entry);
2037
2038     /* check \n termination in case journal writing crashed mid-line */
2039     if (entry_len == 0)
2040       continue;
2041     else if (entry[entry_len - 1] != '\n')
2042     {
2043       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2044       ++fail_cnt;
2045       continue;
2046     }
2047
2048     entry[entry_len - 1] = '\0';
2049
2050     if (handle_request(NULL, now, entry, entry_len) == 0)
2051       ++entry_cnt;
2052     else
2053       ++fail_cnt;
2054   }
2055
2056   fclose(fh);
2057
2058   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2059            entry_cnt, fail_cnt);
2060
2061   return entry_cnt > 0 ? 1 : 0;
2062 } /* }}} static int journal_replay */
2063
2064 static int journal_sort(const void *v1, const void *v2)
2065 {
2066   char **jn1 = (char **) v1;
2067   char **jn2 = (char **) v2;
2068
2069   return strcmp(*jn1,*jn2);
2070 }
2071
2072 static void journal_init(void) /* {{{ */
2073 {
2074   int had_journal = 0;
2075   DIR *dir;
2076   struct dirent *dent;
2077   char path[PATH_MAX+1];
2078
2079   if (journal_dir == NULL) return;
2080
2081   pthread_mutex_lock(&journal_lock);
2082
2083   journal_cur = calloc(1, sizeof(journal_set));
2084   if (journal_cur == NULL)
2085   {
2086     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2087     return;
2088   }
2089
2090   RRDD_LOG(LOG_INFO, "checking for journal files");
2091
2092   /* Handle old journal files during transition.  This gives them the
2093    * correct sort order.  TODO: remove after first release
2094    */
2095   {
2096     char old_path[PATH_MAX+1];
2097     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2098     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2099     rename(old_path, path);
2100
2101     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE        );
2102     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2103     rename(old_path, path);
2104   }
2105
2106   dir = opendir(journal_dir);
2107   while ((dent = readdir(dir)) != NULL)
2108   {
2109     /* looks like a journal file? */
2110     if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2111       continue;
2112
2113     snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2114
2115     if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2116     {
2117       RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2118                dent->d_name);
2119       break;
2120     }
2121   }
2122   closedir(dir);
2123
2124   qsort(journal_cur->files, journal_cur->files_num,
2125         sizeof(journal_cur->files[0]), journal_sort);
2126
2127   for (uint i=0; i < journal_cur->files_num; i++)
2128     had_journal += journal_replay(journal_cur->files[i]);
2129
2130   journal_new_file();
2131
2132   /* it must have been a crash.  start a flush */
2133   if (had_journal && config_flush_at_shutdown)
2134     flush_old_values(-1);
2135
2136   pthread_mutex_unlock(&journal_lock);
2137
2138   RRDD_LOG(LOG_INFO, "journal processing complete");
2139
2140 } /* }}} static void journal_init */
2141
2142 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2143 {
2144   assert(sock != NULL);
2145
2146   free(sock->rbuf);  sock->rbuf = NULL;
2147   free(sock->wbuf);  sock->wbuf = NULL;
2148   free(sock);
2149 } /* }}} void free_listen_socket */
2150
2151 static void close_connection(listen_socket_t *sock) /* {{{ */
2152 {
2153   if (sock->fd >= 0)
2154   {
2155     close(sock->fd);
2156     sock->fd = -1;
2157   }
2158
2159   free_listen_socket(sock);
2160
2161 } /* }}} void close_connection */
2162
2163 static void *connection_thread_main (void *args) /* {{{ */
2164 {
2165   listen_socket_t *sock;
2166   int fd;
2167
2168   sock = (listen_socket_t *) args;
2169   fd = sock->fd;
2170
2171   /* init read buffers */
2172   sock->next_read = sock->next_cmd = 0;
2173   sock->rbuf = malloc(RBUF_SIZE);
2174   if (sock->rbuf == NULL)
2175   {
2176     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2177     close_connection(sock);
2178     return NULL;
2179   }
2180
2181   pthread_mutex_lock (&connection_threads_lock);
2182   connection_threads_num++;
2183   pthread_mutex_unlock (&connection_threads_lock);
2184
2185   while (state == RUNNING)
2186   {
2187     char *cmd;
2188     ssize_t cmd_len;
2189     ssize_t rbytes;
2190     time_t now;
2191
2192     struct pollfd pollfd;
2193     int status;
2194
2195     pollfd.fd = fd;
2196     pollfd.events = POLLIN | POLLPRI;
2197     pollfd.revents = 0;
2198
2199     status = poll (&pollfd, 1, /* timeout = */ 500);
2200     if (state != RUNNING)
2201       break;
2202     else if (status == 0) /* timeout */
2203       continue;
2204     else if (status < 0) /* error */
2205     {
2206       status = errno;
2207       if (status != EINTR)
2208         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2209       continue;
2210     }
2211
2212     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2213       break;
2214     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2215     {
2216       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2217           "poll(2) returned something unexpected: %#04hx",
2218           pollfd.revents);
2219       break;
2220     }
2221
2222     rbytes = read(fd, sock->rbuf + sock->next_read,
2223                   RBUF_SIZE - sock->next_read);
2224     if (rbytes < 0)
2225     {
2226       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2227       break;
2228     }
2229     else if (rbytes == 0)
2230       break; /* eof */
2231
2232     sock->next_read += rbytes;
2233
2234     if (sock->batch_start)
2235       now = sock->batch_start;
2236     else
2237       now = time(NULL);
2238
2239     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2240     {
2241       status = handle_request (sock, now, cmd, cmd_len+1);
2242       if (status != 0)
2243         goto out_close;
2244     }
2245   }
2246
2247 out_close:
2248   close_connection(sock);
2249
2250   /* Remove this thread from the connection threads list */
2251   pthread_mutex_lock (&connection_threads_lock);
2252   connection_threads_num--;
2253   if (connection_threads_num <= 0)
2254     pthread_cond_broadcast(&connection_threads_done);
2255   pthread_mutex_unlock (&connection_threads_lock);
2256
2257   return (NULL);
2258 } /* }}} void *connection_thread_main */
2259
2260 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2261 {
2262   int fd;
2263   struct sockaddr_un sa;
2264   listen_socket_t *temp;
2265   int status;
2266   const char *path;
2267   char *path_copy, *dir;
2268
2269   path = sock->addr;
2270   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2271     path += strlen("unix:");
2272
2273   /* dirname may modify its argument */
2274   path_copy = strdup(path);
2275   if (path_copy == NULL)
2276   {
2277     fprintf(stderr, "rrdcached: strdup(): %s\n",
2278         rrd_strerror(errno));
2279     return (-1);
2280   }
2281
2282   dir = dirname(path_copy);
2283   if (rrd_mkdir_p(dir, 0777) != 0)
2284   {
2285     fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2286         dir, rrd_strerror(errno));
2287     return (-1);
2288   }
2289
2290   free(path_copy);
2291
2292   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2293       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2294   if (temp == NULL)
2295   {
2296     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2297     return (-1);
2298   }
2299   listen_fds = temp;
2300   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2301
2302   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2303   if (fd < 0)
2304   {
2305     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2306              rrd_strerror(errno));
2307     return (-1);
2308   }
2309
2310   memset (&sa, 0, sizeof (sa));
2311   sa.sun_family = AF_UNIX;
2312   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2313
2314   /* if we've gotten this far, we own the pid file.  any daemon started
2315    * with the same args must not be alive.  therefore, ensure that we can
2316    * create the socket...
2317    */
2318   unlink(path);
2319
2320   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2321   if (status != 0)
2322   {
2323     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2324              path, rrd_strerror(errno));
2325     close (fd);
2326     return (-1);
2327   }
2328
2329   status = listen (fd, /* backlog = */ 10);
2330   if (status != 0)
2331   {
2332     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2333              path, rrd_strerror(errno));
2334     close (fd);
2335     unlink (path);
2336     return (-1);
2337   }
2338
2339   listen_fds[listen_fds_num].fd = fd;
2340   listen_fds[listen_fds_num].family = PF_UNIX;
2341   strncpy(listen_fds[listen_fds_num].addr, path,
2342           sizeof (listen_fds[listen_fds_num].addr) - 1);
2343   listen_fds_num++;
2344
2345   return (0);
2346 } /* }}} int open_listen_socket_unix */
2347
2348 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2349 {
2350   struct addrinfo ai_hints;
2351   struct addrinfo *ai_res;
2352   struct addrinfo *ai_ptr;
2353   char addr_copy[NI_MAXHOST];
2354   char *addr;
2355   char *port;
2356   int status;
2357
2358   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2359   addr_copy[sizeof (addr_copy) - 1] = 0;
2360   addr = addr_copy;
2361
2362   memset (&ai_hints, 0, sizeof (ai_hints));
2363   ai_hints.ai_flags = 0;
2364 #ifdef AI_ADDRCONFIG
2365   ai_hints.ai_flags |= AI_ADDRCONFIG;
2366 #endif
2367   ai_hints.ai_family = AF_UNSPEC;
2368   ai_hints.ai_socktype = SOCK_STREAM;
2369
2370   port = NULL;
2371   if (*addr == '[') /* IPv6+port format */
2372   {
2373     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2374     addr++;
2375
2376     port = strchr (addr, ']');
2377     if (port == NULL)
2378     {
2379       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2380       return (-1);
2381     }
2382     *port = 0;
2383     port++;
2384
2385     if (*port == ':')
2386       port++;
2387     else if (*port == 0)
2388       port = NULL;
2389     else
2390     {
2391       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2392       return (-1);
2393     }
2394   } /* if (*addr == '[') */
2395   else
2396   {
2397     port = rindex(addr, ':');
2398     if (port != NULL)
2399     {
2400       *port = 0;
2401       port++;
2402     }
2403   }
2404   ai_res = NULL;
2405   status = getaddrinfo (addr,
2406                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2407                         &ai_hints, &ai_res);
2408   if (status != 0)
2409   {
2410     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2411              addr, gai_strerror (status));
2412     return (-1);
2413   }
2414
2415   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2416   {
2417     int fd;
2418     listen_socket_t *temp;
2419     int one = 1;
2420
2421     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2422         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2423     if (temp == NULL)
2424     {
2425       fprintf (stderr,
2426                "rrdcached: open_listen_socket_network: realloc failed.\n");
2427       continue;
2428     }
2429     listen_fds = temp;
2430     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2431
2432     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2433     if (fd < 0)
2434     {
2435       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2436                rrd_strerror(errno));
2437       continue;
2438     }
2439
2440     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2441
2442     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2443     if (status != 0)
2444     {
2445       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2446                sock->addr, rrd_strerror(errno));
2447       close (fd);
2448       continue;
2449     }
2450
2451     status = listen (fd, /* backlog = */ 10);
2452     if (status != 0)
2453     {
2454       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2455                sock->addr, rrd_strerror(errno));
2456       close (fd);
2457       freeaddrinfo(ai_res);
2458       return (-1);
2459     }
2460
2461     listen_fds[listen_fds_num].fd = fd;
2462     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2463     listen_fds_num++;
2464   } /* for (ai_ptr) */
2465
2466   freeaddrinfo(ai_res);
2467   return (0);
2468 } /* }}} static int open_listen_socket_network */
2469
2470 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2471 {
2472   assert(sock != NULL);
2473   assert(sock->addr != NULL);
2474
2475   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2476       || sock->addr[0] == '/')
2477     return (open_listen_socket_unix(sock));
2478   else
2479     return (open_listen_socket_network(sock));
2480 } /* }}} int open_listen_socket */
2481
2482 static int close_listen_sockets (void) /* {{{ */
2483 {
2484   size_t i;
2485
2486   for (i = 0; i < listen_fds_num; i++)
2487   {
2488     close (listen_fds[i].fd);
2489
2490     if (listen_fds[i].family == PF_UNIX)
2491       unlink(listen_fds[i].addr);
2492   }
2493
2494   free (listen_fds);
2495   listen_fds = NULL;
2496   listen_fds_num = 0;
2497
2498   return (0);
2499 } /* }}} int close_listen_sockets */
2500
2501 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2502 {
2503   struct pollfd *pollfds;
2504   int pollfds_num;
2505   int status;
2506   int i;
2507
2508   if (listen_fds_num < 1)
2509   {
2510     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2511     return (NULL);
2512   }
2513
2514   pollfds_num = listen_fds_num;
2515   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2516   if (pollfds == NULL)
2517   {
2518     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2519     return (NULL);
2520   }
2521   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2522
2523   RRDD_LOG(LOG_INFO, "listening for connections");
2524
2525   while (state == RUNNING)
2526   {
2527     for (i = 0; i < pollfds_num; i++)
2528     {
2529       pollfds[i].fd = listen_fds[i].fd;
2530       pollfds[i].events = POLLIN | POLLPRI;
2531       pollfds[i].revents = 0;
2532     }
2533
2534     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2535     if (state != RUNNING)
2536       break;
2537     else if (status == 0) /* timeout */
2538       continue;
2539     else if (status < 0) /* error */
2540     {
2541       status = errno;
2542       if (status != EINTR)
2543       {
2544         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2545       }
2546       continue;
2547     }
2548
2549     for (i = 0; i < pollfds_num; i++)
2550     {
2551       listen_socket_t *client_sock;
2552       struct sockaddr_storage client_sa;
2553       socklen_t client_sa_size;
2554       pthread_t tid;
2555       pthread_attr_t attr;
2556
2557       if (pollfds[i].revents == 0)
2558         continue;
2559
2560       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2561       {
2562         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2563             "poll(2) returned something unexpected for listen FD #%i.",
2564             pollfds[i].fd);
2565         continue;
2566       }
2567
2568       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2569       if (client_sock == NULL)
2570       {
2571         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2572         continue;
2573       }
2574       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2575
2576       client_sa_size = sizeof (client_sa);
2577       client_sock->fd = accept (pollfds[i].fd,
2578           (struct sockaddr *) &client_sa, &client_sa_size);
2579       if (client_sock->fd < 0)
2580       {
2581         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2582         free(client_sock);
2583         continue;
2584       }
2585
2586       pthread_attr_init (&attr);
2587       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2588
2589       status = pthread_create (&tid, &attr, connection_thread_main,
2590                                client_sock);
2591       if (status != 0)
2592       {
2593         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2594         close_connection(client_sock);
2595         continue;
2596       }
2597     } /* for (pollfds_num) */
2598   } /* while (state == RUNNING) */
2599
2600   RRDD_LOG(LOG_INFO, "starting shutdown");
2601
2602   close_listen_sockets ();
2603
2604   pthread_mutex_lock (&connection_threads_lock);
2605   while (connection_threads_num > 0)
2606     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2607   pthread_mutex_unlock (&connection_threads_lock);
2608
2609   free(pollfds);
2610
2611   return (NULL);
2612 } /* }}} void *listen_thread_main */
2613
2614 static int daemonize (void) /* {{{ */
2615 {
2616   int pid_fd;
2617   char *base_dir;
2618
2619   daemon_uid = geteuid();
2620
2621   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2622   if (pid_fd < 0)
2623     pid_fd = check_pidfile();
2624   if (pid_fd < 0)
2625     return pid_fd;
2626
2627   /* open all the listen sockets */
2628   if (config_listen_address_list_len > 0)
2629   {
2630     for (size_t i = 0; i < config_listen_address_list_len; i++)
2631       open_listen_socket (config_listen_address_list[i]);
2632
2633     rrd_free_ptrs((void ***) &config_listen_address_list,
2634                   &config_listen_address_list_len);
2635   }
2636   else
2637   {
2638     listen_socket_t sock;
2639     memset(&sock, 0, sizeof(sock));
2640     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2641     open_listen_socket (&sock);
2642   }
2643
2644   if (listen_fds_num < 1)
2645   {
2646     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2647     goto error;
2648   }
2649
2650   if (!stay_foreground)
2651   {
2652     pid_t child;
2653
2654     child = fork ();
2655     if (child < 0)
2656     {
2657       fprintf (stderr, "daemonize: fork(2) failed.\n");
2658       goto error;
2659     }
2660     else if (child > 0)
2661       exit(0);
2662
2663     /* Become session leader */
2664     setsid ();
2665
2666     /* Open the first three file descriptors to /dev/null */
2667     close (2);
2668     close (1);
2669     close (0);
2670
2671     open ("/dev/null", O_RDWR);
2672     if (dup(0) == -1 || dup(0) == -1){
2673         RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2674     }
2675   } /* if (!stay_foreground) */
2676
2677   /* Change into the /tmp directory. */
2678   base_dir = (config_base_dir != NULL)
2679     ? config_base_dir
2680     : "/tmp";
2681
2682   if (chdir (base_dir) != 0)
2683   {
2684     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2685     goto error;
2686   }
2687
2688   install_signal_handlers();
2689
2690   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2691   RRDD_LOG(LOG_INFO, "starting up");
2692
2693   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2694                                 (GDestroyNotify) free_cache_item);
2695   if (cache_tree == NULL)
2696   {
2697     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2698     goto error;
2699   }
2700
2701   return write_pidfile (pid_fd);
2702
2703 error:
2704   remove_pidfile();
2705   return -1;
2706 } /* }}} int daemonize */
2707
2708 static int cleanup (void) /* {{{ */
2709 {
2710   pthread_cond_broadcast (&flush_cond);
2711   pthread_join (flush_thread, NULL);
2712
2713   pthread_cond_broadcast (&queue_cond);
2714   for (int i = 0; i < config_queue_threads; i++)
2715     pthread_join (queue_threads[i], NULL);
2716
2717   if (config_flush_at_shutdown)
2718   {
2719     assert(cache_queue_head == NULL);
2720     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2721   }
2722
2723   free(queue_threads);
2724   free(config_base_dir);
2725
2726   pthread_mutex_lock(&cache_lock);
2727   g_tree_destroy(cache_tree);
2728
2729   pthread_mutex_lock(&journal_lock);
2730   journal_done();
2731
2732   RRDD_LOG(LOG_INFO, "goodbye");
2733   closelog ();
2734
2735   remove_pidfile ();
2736   free(config_pid_file);
2737
2738   return (0);
2739 } /* }}} int cleanup */
2740
2741 static int read_options (int argc, char **argv) /* {{{ */
2742 {
2743   int option;
2744   int status = 0;
2745
2746   char **permissions = NULL;
2747   size_t permissions_len = 0;
2748
2749   while ((option = getopt(argc, argv, "gl:P:f:w:z:t:Bb:p:Fj:h?")) != -1)
2750   {
2751     switch (option)
2752     {
2753       case 'g':
2754         stay_foreground=1;
2755         break;
2756
2757       case 'l':
2758       {
2759         listen_socket_t *new;
2760
2761         new = malloc(sizeof(listen_socket_t));
2762         if (new == NULL)
2763         {
2764           fprintf(stderr, "read_options: malloc failed.\n");
2765           return(2);
2766         }
2767         memset(new, 0, sizeof(listen_socket_t));
2768
2769         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2770
2771         /* Add permissions to the socket {{{ */
2772         if (permissions_len != 0)
2773         {
2774           size_t i;
2775           for (i = 0; i < permissions_len; i++)
2776           {
2777             status = socket_permission_add (new, permissions[i]);
2778             if (status != 0)
2779             {
2780               fprintf (stderr, "read_options: Adding permission \"%s\" to "
2781                   "socket failed. Most likely, this permission doesn't "
2782                   "exist. Check your command line.\n", permissions[i]);
2783               status = 4;
2784             }
2785           }
2786         }
2787         else /* if (permissions_len == 0) */
2788         {
2789           /* Add permission for ALL commands to the socket. */
2790           size_t i;
2791           for (i = 0; i < list_of_commands_len; i++)
2792           {
2793             status = socket_permission_add (new, list_of_commands[i].cmd);
2794             if (status != 0)
2795             {
2796               fprintf (stderr, "read_options: Adding permission \"%s\" to "
2797                   "socket failed. This should never happen, ever! Sorry.\n",
2798                   permissions[i]);
2799               status = 4;
2800             }
2801           }
2802         }
2803         /* }}} Done adding permissions. */
2804
2805         if (!rrd_add_ptr((void ***)&config_listen_address_list,
2806                          &config_listen_address_list_len, new))
2807         {
2808           fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2809           return (2);
2810         }
2811       }
2812       break;
2813
2814       case 'P':
2815       {
2816         char *optcopy;
2817         char *saveptr;
2818         char *dummy;
2819         char *ptr;
2820
2821         rrd_free_ptrs ((void *) &permissions, &permissions_len);
2822
2823         optcopy = strdup (optarg);
2824         dummy = optcopy;
2825         saveptr = NULL;
2826         while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
2827         {
2828           dummy = NULL;
2829           rrd_add_strdup ((void *) &permissions, &permissions_len, ptr);
2830         }
2831
2832         free (optcopy);
2833       }
2834       break;
2835
2836       case 'f':
2837       {
2838         int temp;
2839
2840         temp = atoi (optarg);
2841         if (temp > 0)
2842           config_flush_interval = temp;
2843         else
2844         {
2845           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2846           status = 3;
2847         }
2848       }
2849       break;
2850
2851       case 'w':
2852       {
2853         int temp;
2854
2855         temp = atoi (optarg);
2856         if (temp > 0)
2857           config_write_interval = temp;
2858         else
2859         {
2860           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2861           status = 2;
2862         }
2863       }
2864       break;
2865
2866       case 'z':
2867       {
2868         int temp;
2869
2870         temp = atoi(optarg);
2871         if (temp > 0)
2872           config_write_jitter = temp;
2873         else
2874         {
2875           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2876           status = 2;
2877         }
2878
2879         break;
2880       }
2881
2882       case 't':
2883       {
2884         int threads;
2885         threads = atoi(optarg);
2886         if (threads >= 1)
2887           config_queue_threads = threads;
2888         else
2889         {
2890           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2891           return 1;
2892         }
2893       }
2894       break;
2895
2896       case 'B':
2897         config_write_base_only = 1;
2898         break;
2899
2900       case 'b':
2901       {
2902         size_t len;
2903         char base_realpath[PATH_MAX];
2904
2905         if (config_base_dir != NULL)
2906           free (config_base_dir);
2907         config_base_dir = strdup (optarg);
2908         if (config_base_dir == NULL)
2909         {
2910           fprintf (stderr, "read_options: strdup failed.\n");
2911           return (3);
2912         }
2913
2914         if (rrd_mkdir_p (config_base_dir, 0777) != 0)
2915         {
2916           fprintf (stderr, "Failed to create base directory '%s': %s\n",
2917               config_base_dir, rrd_strerror (errno));
2918           return (3);
2919         }
2920
2921         /* make sure that the base directory is not resolved via
2922          * symbolic links.  this makes some performance-enhancing
2923          * assumptions possible (we don't have to resolve paths
2924          * that start with a "/")
2925          */
2926         if (realpath(config_base_dir, base_realpath) == NULL)
2927         {
2928           fprintf (stderr, "Failed to canonicalize the base directory '%s': "
2929               "%s\n", config_base_dir, rrd_strerror(errno));
2930           return 5;
2931         }
2932
2933         len = strlen (config_base_dir);
2934         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2935         {
2936           config_base_dir[len - 1] = 0;
2937           len--;
2938         }
2939
2940         if (len < 1)
2941         {
2942           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2943           return (4);
2944         }
2945
2946         _config_base_dir_len = len;
2947
2948         len = strlen (base_realpath);
2949         while ((len > 0) && (base_realpath[len - 1] == '/'))
2950         {
2951           base_realpath[len - 1] = '\0';
2952           len--;
2953         }
2954
2955         if (strncmp(config_base_dir,
2956                          base_realpath, sizeof(base_realpath)) != 0)
2957         {
2958           fprintf(stderr,
2959                   "Base directory (-b) resolved via file system links!\n"
2960                   "Please consult rrdcached '-b' documentation!\n"
2961                   "Consider specifying the real directory (%s)\n",
2962                   base_realpath);
2963           return 5;
2964         }
2965       }
2966       break;
2967
2968       case 'p':
2969       {
2970         if (config_pid_file != NULL)
2971           free (config_pid_file);
2972         config_pid_file = strdup (optarg);
2973         if (config_pid_file == NULL)
2974         {
2975           fprintf (stderr, "read_options: strdup failed.\n");
2976           return (3);
2977         }
2978       }
2979       break;
2980
2981       case 'F':
2982         config_flush_at_shutdown = 1;
2983         break;
2984
2985       case 'j':
2986       {
2987         const char *dir = journal_dir = strdup(optarg);
2988
2989         status = rrd_mkdir_p(dir, 0777);
2990         if (status != 0)
2991         {
2992           fprintf(stderr, "Failed to create journal directory '%s': %s\n",
2993               dir, rrd_strerror(errno));
2994           return 6;
2995         }
2996
2997         if (access(dir, R_OK|W_OK|X_OK) != 0)
2998         {
2999           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3000                   errno ? rrd_strerror(errno) : "");
3001           return 6;
3002         }
3003       }
3004       break;
3005
3006       case 'h':
3007       case '?':
3008         printf ("RRDCacheD %s\n"
3009             "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3010             "\n"
3011             "Usage: rrdcached [options]\n"
3012             "\n"
3013             "Valid options are:\n"
3014             "  -l <address>  Socket address to listen to.\n"
3015             "  -P <perms>    Sets the permissions to assign to all following "
3016                             "sockets\n"
3017             "  -w <seconds>  Interval in which to write data.\n"
3018             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
3019             "  -t <threads>  Number of write threads.\n"
3020             "  -f <seconds>  Interval in which to flush dead data.\n"
3021             "  -p <file>     Location of the PID-file.\n"
3022             "  -b <dir>      Base directory to change to.\n"
3023             "  -B            Restrict file access to paths within -b <dir>\n"
3024             "  -g            Do not fork and run in the foreground.\n"
3025             "  -j <dir>      Directory in which to create the journal files.\n"
3026             "  -F            Always flush all updates at shutdown\n"
3027             "\n"
3028             "For more information and a detailed description of all options "
3029             "please refer\n"
3030             "to the rrdcached(1) manual page.\n",
3031             VERSION);
3032         status = -1;
3033         break;
3034     } /* switch (option) */
3035   } /* while (getopt) */
3036
3037   /* advise the user when values are not sane */
3038   if (config_flush_interval < 2 * config_write_interval)
3039     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3040             " 2x write interval (-w) !\n");
3041   if (config_write_jitter > config_write_interval)
3042     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3043             " write interval (-w) !\n");
3044
3045   if (config_write_base_only && config_base_dir == NULL)
3046     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3047             "  Consult the rrdcached documentation\n");
3048
3049   if (journal_dir == NULL)
3050     config_flush_at_shutdown = 1;
3051
3052   rrd_free_ptrs ((void *) &permissions, &permissions_len);
3053
3054   return (status);
3055 } /* }}} int read_options */
3056
3057 int main (int argc, char **argv)
3058 {
3059   int status;
3060
3061   status = read_options (argc, argv);
3062   if (status != 0)
3063   {
3064     if (status < 0)
3065       status = 0;
3066     return (status);
3067   }
3068
3069   status = daemonize ();
3070   if (status != 0)
3071   {
3072     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3073     return (1);
3074   }
3075
3076   journal_init();
3077
3078   /* start the queue threads */
3079   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3080   if (queue_threads == NULL)
3081   {
3082     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3083     cleanup();
3084     return (1);
3085   }
3086   for (int i = 0; i < config_queue_threads; i++)
3087   {
3088     memset (&queue_threads[i], 0, sizeof (*queue_threads));
3089     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3090     if (status != 0)
3091     {
3092       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3093       cleanup();
3094       return (1);
3095     }
3096   }
3097
3098   /* start the flush thread */
3099   memset(&flush_thread, 0, sizeof(flush_thread));
3100   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3101   if (status != 0)
3102   {
3103     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3104     cleanup();
3105     return (1);
3106   }
3107
3108   listen_thread_main (NULL);
3109   cleanup ();
3110
3111   return (0);
3112 } /* int main */
3113
3114 /*
3115  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3116  */