print \n for log messages while runing rrdcached in the foreground ... suggested...
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008-2010 Florian octo Forster
4  * Copyright (C) 2008,2009 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
73
74 #include "rrd.h"
75 #include "rrd_client.h"
76 #include "unused.h"
77
78 #include <stdlib.h>
79
80 #ifndef WIN32
81 #ifdef HAVE_STDINT_H
82 #  include <stdint.h>
83 #endif
84 #include <unistd.h>
85 #include <strings.h>
86 #include <inttypes.h>
87 #include <sys/socket.h>
88
89 #else
90
91 #endif
92 #include <stdio.h>
93 #include <string.h>
94
95 #include <sys/types.h>
96 #include <sys/stat.h>
97 #include <dirent.h>
98 #include <fcntl.h>
99 #include <signal.h>
100 #include <sys/un.h>
101 #include <netdb.h>
102 #include <poll.h>
103 #include <syslog.h>
104 #include <pthread.h>
105 #include <errno.h>
106 #include <assert.h>
107 #include <sys/time.h>
108 #include <time.h>
109 #include <libgen.h>
110 #include <grp.h>
111
112 #include <glib-2.0/glib.h>
113 /* }}} */
114
115 #define RRDD_LOG(severity, ...) \
116   do { \
117     if (stay_foreground) { \
118       fprintf(stderr, __VA_ARGS__); \
119       fprintf(stderr, "\n"); } \
120     syslog ((severity), __VA_ARGS__); \
121   } while (0)
122
123 /*
124  * Types
125  */
126 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
127
128 struct listen_socket_s
129 {
130   int fd;
131   char addr[PATH_MAX + 1];
132   int family;
133
134   /* state for BATCH processing */
135   time_t batch_start;
136   int batch_cmd;
137
138   /* buffered IO */
139   char *rbuf;
140   off_t next_cmd;
141   off_t next_read;
142
143   char *wbuf;
144   ssize_t wbuf_len;
145
146   uint32_t permissions;
147
148   gid_t  socket_group;
149   mode_t socket_permissions;
150 };
151 typedef struct listen_socket_s listen_socket_t;
152
153 struct command_s;
154 typedef struct command_s command_t;
155 /* note: guard against "unused" warnings in the handlers */
156 #define DISPATCH_PROTO  listen_socket_t UNUSED(*sock),\
157                         time_t UNUSED(now),\
158                         char  UNUSED(*buffer),\
159                         size_t UNUSED(buffer_size)
160
161 #define HANDLER_PROTO   command_t UNUSED(*cmd),\
162                         DISPATCH_PROTO
163
164 struct command_s {
165   char   *cmd;
166   int (*handler)(HANDLER_PROTO);
167
168   char  context;                /* where we expect to see it */
169 #define CMD_CONTEXT_CLIENT      (1<<0)
170 #define CMD_CONTEXT_BATCH       (1<<1)
171 #define CMD_CONTEXT_JOURNAL     (1<<2)
172 #define CMD_CONTEXT_ANY         (0x7f)
173
174   char *syntax;
175   char *help;
176 };
177
178 struct cache_item_s;
179 typedef struct cache_item_s cache_item_t;
180 struct cache_item_s
181 {
182   char *file;
183   char **values;
184   size_t values_num;            /* number of valid pointers */
185   size_t values_alloc;          /* number of allocated pointers */
186   time_t last_flush_time;
187   time_t last_update_stamp;
188 #define CI_FLAGS_IN_TREE  (1<<0)
189 #define CI_FLAGS_IN_QUEUE (1<<1)
190   int flags;
191   pthread_cond_t  flushed;
192   cache_item_t *prev;
193   cache_item_t *next;
194 };
195
196 struct callback_flush_data_s
197 {
198   time_t now;
199   time_t abs_timeout;
200   char **keys;
201   size_t keys_num;
202 };
203 typedef struct callback_flush_data_s callback_flush_data_t;
204
205 enum queue_side_e
206 {
207   HEAD,
208   TAIL
209 };
210 typedef enum queue_side_e queue_side_t;
211
212 /* describe a set of journal files */
213 typedef struct {
214   char **files;
215   size_t files_num;
216 } journal_set;
217
218 /* max length of socket command or response */
219 #define CMD_MAX 4096
220 #define RBUF_SIZE (CMD_MAX*2)
221
222 /*
223  * Variables
224  */
225 static int stay_foreground = 0;
226 static uid_t daemon_uid;
227
228 static listen_socket_t *listen_fds = NULL;
229 static size_t listen_fds_num = 0;
230
231 enum {
232   RUNNING,              /* normal operation */
233   FLUSHING,             /* flushing remaining values */
234   SHUTDOWN              /* shutting down */
235 } state = RUNNING;
236
237 static pthread_t *queue_threads;
238 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
239 static int config_queue_threads = 4;
240
241 static pthread_t flush_thread;
242 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
243
244 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
245 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
246 static int connection_threads_num = 0;
247
248 /* Cache stuff */
249 static GTree          *cache_tree = NULL;
250 static cache_item_t   *cache_queue_head = NULL;
251 static cache_item_t   *cache_queue_tail = NULL;
252 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
253
254 static int config_write_interval = 300;
255 static int config_write_jitter   = 0;
256 static int config_flush_interval = 3600;
257 static int config_flush_at_shutdown = 0;
258 static char *config_pid_file = NULL;
259 static char *config_base_dir = NULL;
260 static size_t _config_base_dir_len = 0;
261 static int config_write_base_only = 0;
262 static size_t config_alloc_chunk = 1;
263
264 static listen_socket_t **config_listen_address_list = NULL;
265 static size_t config_listen_address_list_len = 0;
266
267 static uint64_t stats_queue_length = 0;
268 static uint64_t stats_updates_received = 0;
269 static uint64_t stats_flush_received = 0;
270 static uint64_t stats_updates_written = 0;
271 static uint64_t stats_data_sets_written = 0;
272 static uint64_t stats_journal_bytes = 0;
273 static uint64_t stats_journal_rotate = 0;
274 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
275
276 /* Journaled updates */
277 #define JOURNAL_REPLAY(s) ((s) == NULL)
278 #define JOURNAL_BASE "rrd.journal"
279 static journal_set *journal_cur = NULL;
280 static journal_set *journal_old = NULL;
281 static char *journal_dir = NULL;
282 static FILE *journal_fh = NULL;         /* current journal file handle */
283 static long  journal_size = 0;          /* current journal size */
284 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
285 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
286 static int journal_write(char *cmd, char *args);
287 static void journal_done(void);
288 static void journal_rotate(void);
289
290 /* prototypes for forward refernces */
291 static int handle_request_help (HANDLER_PROTO);
292
293 /* 
294  * Functions
295  */
296 static void sig_common (const char *sig) /* {{{ */
297 {
298   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
299   state = FLUSHING;
300   pthread_cond_broadcast(&flush_cond);
301   pthread_cond_broadcast(&queue_cond);
302 } /* }}} void sig_common */
303
304 static void sig_int_handler (int UNUSED(s)) /* {{{ */
305 {
306   sig_common("INT");
307 } /* }}} void sig_int_handler */
308
309 static void sig_term_handler (int UNUSED(s)) /* {{{ */
310 {
311   sig_common("TERM");
312 } /* }}} void sig_term_handler */
313
314 static void sig_usr1_handler (int UNUSED(s)) /* {{{ */
315 {
316   config_flush_at_shutdown = 1;
317   sig_common("USR1");
318 } /* }}} void sig_usr1_handler */
319
320 static void sig_usr2_handler (int UNUSED(s)) /* {{{ */
321 {
322   config_flush_at_shutdown = 0;
323   sig_common("USR2");
324 } /* }}} void sig_usr2_handler */
325
326 static void install_signal_handlers(void) /* {{{ */
327 {
328   /* These structures are static, because `sigaction' behaves weird if the are
329    * overwritten.. */
330   static struct sigaction sa_int;
331   static struct sigaction sa_term;
332   static struct sigaction sa_pipe;
333   static struct sigaction sa_usr1;
334   static struct sigaction sa_usr2;
335
336   /* Install signal handlers */
337   memset (&sa_int, 0, sizeof (sa_int));
338   sa_int.sa_handler = sig_int_handler;
339   sigaction (SIGINT, &sa_int, NULL);
340
341   memset (&sa_term, 0, sizeof (sa_term));
342   sa_term.sa_handler = sig_term_handler;
343   sigaction (SIGTERM, &sa_term, NULL);
344
345   memset (&sa_pipe, 0, sizeof (sa_pipe));
346   sa_pipe.sa_handler = SIG_IGN;
347   sigaction (SIGPIPE, &sa_pipe, NULL);
348
349   memset (&sa_pipe, 0, sizeof (sa_usr1));
350   sa_usr1.sa_handler = sig_usr1_handler;
351   sigaction (SIGUSR1, &sa_usr1, NULL);
352
353   memset (&sa_usr2, 0, sizeof (sa_usr2));
354   sa_usr2.sa_handler = sig_usr2_handler;
355   sigaction (SIGUSR2, &sa_usr2, NULL);
356
357 } /* }}} void install_signal_handlers */
358
359 static int open_pidfile(char *action, int oflag) /* {{{ */
360 {
361   int fd;
362   const char *file;
363   char *file_copy, *dir;
364
365   file = (config_pid_file != NULL)
366     ? config_pid_file
367     : LOCALSTATEDIR "/run/rrdcached.pid";
368
369   /* dirname may modify its argument */
370   file_copy = strdup(file);
371   if (file_copy == NULL)
372   {
373     fprintf(stderr, "rrdcached: strdup(): %s\n",
374         rrd_strerror(errno));
375     return -1;
376   }
377
378   dir = dirname(file_copy);
379   if (rrd_mkdir_p(dir, 0777) != 0)
380   {
381     fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
382         dir, rrd_strerror(errno));
383     return -1;
384   }
385
386   free(file_copy);
387
388   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
389   if (fd < 0)
390     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
391             action, file, rrd_strerror(errno));
392
393   return(fd);
394 } /* }}} static int open_pidfile */
395
396 /* check existing pid file to see whether a daemon is running */
397 static int check_pidfile(void)
398 {
399   int pid_fd;
400   pid_t pid;
401   char pid_str[16];
402
403   pid_fd = open_pidfile("open", O_RDWR);
404   if (pid_fd < 0)
405     return pid_fd;
406
407   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
408     return -1;
409
410   pid = atoi(pid_str);
411   if (pid <= 0)
412     return -1;
413
414   /* another running process that we can signal COULD be
415    * a competing rrdcached */
416   if (pid != getpid() && kill(pid, 0) == 0)
417   {
418     fprintf(stderr,
419             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
420     close(pid_fd);
421     return -1;
422   }
423
424   lseek(pid_fd, 0, SEEK_SET);
425   if (ftruncate(pid_fd, 0) == -1)
426   {
427     fprintf(stderr,
428             "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
429     close(pid_fd);
430     return -1;
431   }
432
433   fprintf(stderr,
434           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
435           "rrdcached: starting normally.\n", pid);
436
437   return pid_fd;
438 } /* }}} static int check_pidfile */
439
440 static int write_pidfile (int fd) /* {{{ */
441 {
442   pid_t pid;
443   FILE *fh;
444
445   pid = getpid ();
446
447   fh = fdopen (fd, "w");
448   if (fh == NULL)
449   {
450     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
451     close(fd);
452     return (-1);
453   }
454
455   fprintf (fh, "%i\n", (int) pid);
456   fclose (fh);
457
458   return (0);
459 } /* }}} int write_pidfile */
460
461 static int remove_pidfile (void) /* {{{ */
462 {
463   char *file;
464   int status;
465
466   file = (config_pid_file != NULL)
467     ? config_pid_file
468     : LOCALSTATEDIR "/run/rrdcached.pid";
469
470   status = unlink (file);
471   if (status == 0)
472     return (0);
473   return (errno);
474 } /* }}} int remove_pidfile */
475
476 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
477 {
478   char *eol;
479
480   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
481                sock->next_read - sock->next_cmd);
482
483   if (eol == NULL)
484   {
485     /* no commands left, move remainder back to front of rbuf */
486     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
487             sock->next_read - sock->next_cmd);
488     sock->next_read -= sock->next_cmd;
489     sock->next_cmd = 0;
490     *len = 0;
491     return NULL;
492   }
493   else
494   {
495     char *cmd = sock->rbuf + sock->next_cmd;
496     *eol = '\0';
497
498     sock->next_cmd = eol - sock->rbuf + 1;
499
500     if (eol > sock->rbuf && *(eol-1) == '\r')
501       *(--eol) = '\0'; /* handle "\r\n" EOL */
502
503     *len = eol - cmd;
504
505     return cmd;
506   }
507
508   /* NOTREACHED */
509   assert(1==0);
510 } /* }}} char *next_cmd */
511
512 /* add the characters directly to the write buffer */
513 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
514 {
515   char *new_buf;
516
517   assert(sock != NULL);
518
519   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
520   if (new_buf == NULL)
521   {
522     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
523     return -1;
524   }
525
526   strncpy(new_buf + sock->wbuf_len, str, len + 1);
527
528   sock->wbuf = new_buf;
529   sock->wbuf_len += len;
530
531   return 0;
532 } /* }}} static int add_to_wbuf */
533
534 /* add the text to the "extra" info that's sent after the status line */
535 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
536 {
537   va_list argp;
538   char buffer[CMD_MAX];
539   int len;
540
541   if (JOURNAL_REPLAY(sock)) return 0;
542   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
543
544   va_start(argp, fmt);
545 #ifdef HAVE_VSNPRINTF
546   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
547 #else
548   len = vsprintf(buffer, fmt, argp);
549 #endif
550   va_end(argp);
551   if (len < 0)
552   {
553     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
554     return -1;
555   }
556
557   return add_to_wbuf(sock, buffer, len);
558 } /* }}} static int add_response_info */
559
560 static int count_lines(char *str) /* {{{ */
561 {
562   int lines = 0;
563
564   if (str != NULL)
565   {
566     while ((str = strchr(str, '\n')) != NULL)
567     {
568       ++lines;
569       ++str;
570     }
571   }
572
573   return lines;
574 } /* }}} static int count_lines */
575
576 /* send the response back to the user.
577  * returns 0 on success, -1 on error
578  * write buffer is always zeroed after this call */
579 static int send_response (listen_socket_t *sock, response_code rc,
580                           char *fmt, ...) /* {{{ */
581 {
582   va_list argp;
583   char buffer[CMD_MAX];
584   int lines;
585   ssize_t wrote;
586   int rclen, len;
587
588   if (JOURNAL_REPLAY(sock)) return rc;
589
590   if (sock->batch_start)
591   {
592     if (rc == RESP_OK)
593       return rc; /* no response on success during BATCH */
594     lines = sock->batch_cmd;
595   }
596   else if (rc == RESP_OK)
597     lines = count_lines(sock->wbuf);
598   else
599     lines = -1;
600
601   rclen = sprintf(buffer, "%d ", lines);
602   va_start(argp, fmt);
603 #ifdef HAVE_VSNPRINTF
604   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
605 #else
606   len = vsprintf(buffer+rclen, fmt, argp);
607 #endif
608   va_end(argp);
609   if (len < 0)
610     return -1;
611
612   len += rclen;
613
614   /* append the result to the wbuf, don't write to the user */
615   if (sock->batch_start)
616     return add_to_wbuf(sock, buffer, len);
617
618   /* first write must be complete */
619   if (len != write(sock->fd, buffer, len))
620   {
621     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
622     return -1;
623   }
624
625   if (sock->wbuf != NULL && rc == RESP_OK)
626   {
627     wrote = 0;
628     while (wrote < sock->wbuf_len)
629     {
630       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
631       if (wb <= 0)
632       {
633         RRDD_LOG(LOG_INFO, "send_response: could not write results");
634         return -1;
635       }
636       wrote += wb;
637     }
638   }
639
640   free(sock->wbuf); sock->wbuf = NULL;
641   sock->wbuf_len = 0;
642
643   return 0;
644 } /* }}} */
645
646 static void wipe_ci_values(cache_item_t *ci, time_t when)
647 {
648   ci->values = NULL;
649   ci->values_num = 0;
650   ci->values_alloc = 0;
651
652   ci->last_flush_time = when;
653   if (config_write_jitter > 0)
654     ci->last_flush_time += (rrd_random() % config_write_jitter);
655 }
656
657 /* remove_from_queue
658  * remove a "cache_item_t" item from the queue.
659  * must hold 'cache_lock' when calling this
660  */
661 static void remove_from_queue(cache_item_t *ci) /* {{{ */
662 {
663   if (ci == NULL) return;
664   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
665
666   if (ci->prev == NULL)
667     cache_queue_head = ci->next; /* reset head */
668   else
669     ci->prev->next = ci->next;
670
671   if (ci->next == NULL)
672     cache_queue_tail = ci->prev; /* reset the tail */
673   else
674     ci->next->prev = ci->prev;
675
676   ci->next = ci->prev = NULL;
677   ci->flags &= ~CI_FLAGS_IN_QUEUE;
678
679   pthread_mutex_lock (&stats_lock);
680   assert (stats_queue_length > 0);
681   stats_queue_length--;
682   pthread_mutex_unlock (&stats_lock);
683
684 } /* }}} static void remove_from_queue */
685
686 /* free the resources associated with the cache_item_t
687  * must hold cache_lock when calling this function
688  */
689 static void *free_cache_item(cache_item_t *ci) /* {{{ */
690 {
691   if (ci == NULL) return NULL;
692
693   remove_from_queue(ci);
694
695   for (size_t i=0; i < ci->values_num; i++)
696     free(ci->values[i]);
697
698   free (ci->values);
699   free (ci->file);
700
701   /* in case anyone is waiting */
702   pthread_cond_broadcast(&ci->flushed);
703   pthread_cond_destroy(&ci->flushed);
704
705   free (ci);
706
707   return NULL;
708 } /* }}} static void *free_cache_item */
709
710 /*
711  * enqueue_cache_item:
712  * `cache_lock' must be acquired before calling this function!
713  */
714 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
715     queue_side_t side)
716 {
717   if (ci == NULL)
718     return (-1);
719
720   if (ci->values_num == 0)
721     return (0);
722
723   if (side == HEAD)
724   {
725     if (cache_queue_head == ci)
726       return 0;
727
728     /* remove if further down in queue */
729     remove_from_queue(ci);
730
731     ci->prev = NULL;
732     ci->next = cache_queue_head;
733     if (ci->next != NULL)
734       ci->next->prev = ci;
735     cache_queue_head = ci;
736
737     if (cache_queue_tail == NULL)
738       cache_queue_tail = cache_queue_head;
739   }
740   else /* (side == TAIL) */
741   {
742     /* We don't move values back in the list.. */
743     if (ci->flags & CI_FLAGS_IN_QUEUE)
744       return (0);
745
746     assert (ci->next == NULL);
747     assert (ci->prev == NULL);
748
749     ci->prev = cache_queue_tail;
750
751     if (cache_queue_tail == NULL)
752       cache_queue_head = ci;
753     else
754       cache_queue_tail->next = ci;
755
756     cache_queue_tail = ci;
757   }
758
759   ci->flags |= CI_FLAGS_IN_QUEUE;
760
761   pthread_cond_signal(&queue_cond);
762   pthread_mutex_lock (&stats_lock);
763   stats_queue_length++;
764   pthread_mutex_unlock (&stats_lock);
765
766   return (0);
767 } /* }}} int enqueue_cache_item */
768
769 /*
770  * tree_callback_flush:
771  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
772  * while this is in progress.
773  */
774 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
775     gpointer data)
776 {
777   cache_item_t *ci;
778   callback_flush_data_t *cfd;
779
780   ci = (cache_item_t *) value;
781   cfd = (callback_flush_data_t *) data;
782
783   if (ci->flags & CI_FLAGS_IN_QUEUE)
784     return FALSE;
785
786   if (ci->values_num > 0
787       && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
788   {
789     enqueue_cache_item (ci, TAIL);
790   }
791   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
792       && (ci->values_num <= 0))
793   {
794     assert ((char *) key == ci->file);
795     if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
796     {
797       RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
798       return (FALSE);
799     }
800   }
801
802   return (FALSE);
803 } /* }}} gboolean tree_callback_flush */
804
805 static int flush_old_values (int max_age)
806 {
807   callback_flush_data_t cfd;
808   size_t k;
809
810   memset (&cfd, 0, sizeof (cfd));
811   /* Pass the current time as user data so that we don't need to call
812    * `time' for each node. */
813   cfd.now = time (NULL);
814   cfd.keys = NULL;
815   cfd.keys_num = 0;
816
817   if (max_age > 0)
818     cfd.abs_timeout = cfd.now - max_age;
819   else
820     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
821
822   /* `tree_callback_flush' will return the keys of all values that haven't
823    * been touched in the last `config_flush_interval' seconds in `cfd'.
824    * The char*'s in this array point to the same memory as ci->file, so we
825    * don't need to free them separately. */
826   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
827
828   for (k = 0; k < cfd.keys_num; k++)
829   {
830     gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
831     /* should never fail, since we have held the cache_lock
832      * the entire time */
833     assert(status == TRUE);
834   }
835
836   if (cfd.keys != NULL)
837   {
838     free (cfd.keys);
839     cfd.keys = NULL;
840   }
841
842   return (0);
843 } /* int flush_old_values */
844
845 static void *flush_thread_main (void UNUSED(*args)) /* {{{ */
846 {
847   struct timeval now;
848   struct timespec next_flush;
849   int status;
850
851   gettimeofday (&now, NULL);
852   next_flush.tv_sec = now.tv_sec + config_flush_interval;
853   next_flush.tv_nsec = 1000 * now.tv_usec;
854
855   pthread_mutex_lock(&cache_lock);
856
857   while (state == RUNNING)
858   {
859     gettimeofday (&now, NULL);
860     if ((now.tv_sec > next_flush.tv_sec)
861         || ((now.tv_sec == next_flush.tv_sec)
862           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
863     {
864       RRDD_LOG(LOG_DEBUG, "flushing old values");
865
866       /* Determine the time of the next cache flush. */
867       next_flush.tv_sec = now.tv_sec + config_flush_interval;
868
869       /* Flush all values that haven't been written in the last
870        * `config_write_interval' seconds. */
871       flush_old_values (config_write_interval);
872
873       /* unlock the cache while we rotate so we don't block incoming
874        * updates if the fsync() blocks on disk I/O */
875       pthread_mutex_unlock(&cache_lock);
876       journal_rotate();
877       pthread_mutex_lock(&cache_lock);
878     }
879
880     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
881     if (status != 0 && status != ETIMEDOUT)
882     {
883       RRDD_LOG (LOG_ERR, "flush_thread_main: "
884                 "pthread_cond_timedwait returned %i.", status);
885     }
886   }
887
888   if (config_flush_at_shutdown)
889     flush_old_values (-1); /* flush everything */
890
891   state = SHUTDOWN;
892
893   pthread_mutex_unlock(&cache_lock);
894
895   return NULL;
896 } /* void *flush_thread_main */
897
898 static void *queue_thread_main (void UNUSED(*args)) /* {{{ */
899 {
900   pthread_mutex_lock (&cache_lock);
901
902   while (state != SHUTDOWN
903          || (cache_queue_head != NULL && config_flush_at_shutdown))
904   {
905     cache_item_t *ci;
906     char *file;
907     char **values;
908     size_t values_num;
909     int status;
910
911     /* Now, check if there's something to store away. If not, wait until
912      * something comes in. */
913     if (cache_queue_head == NULL)
914     {
915       status = pthread_cond_wait (&queue_cond, &cache_lock);
916       if ((status != 0) && (status != ETIMEDOUT))
917       {
918         RRDD_LOG (LOG_ERR, "queue_thread_main: "
919             "pthread_cond_wait returned %i.", status);
920       }
921     }
922
923     /* Check if a value has arrived. This may be NULL if we timed out or there
924      * was an interrupt such as a signal. */
925     if (cache_queue_head == NULL)
926       continue;
927
928     ci = cache_queue_head;
929
930     /* copy the relevant parts */
931     file = strdup (ci->file);
932     if (file == NULL)
933     {
934       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
935       continue;
936     }
937
938     assert(ci->values != NULL);
939     assert(ci->values_num > 0);
940
941     values = ci->values;
942     values_num = ci->values_num;
943
944     wipe_ci_values(ci, time(NULL));
945     remove_from_queue(ci);
946
947     pthread_mutex_unlock (&cache_lock);
948
949     rrd_clear_error ();
950     status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
951     if (status != 0)
952     {
953       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
954           "rrd_update_r (%s) failed with status %i. (%s)",
955           file, status, rrd_get_error());
956     }
957
958     journal_write("wrote", file);
959
960     /* Search again in the tree.  It's possible someone issued a "FORGET"
961      * while we were writing the update values. */
962     pthread_mutex_lock(&cache_lock);
963     ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
964     if (ci)
965       pthread_cond_broadcast(&ci->flushed);
966     pthread_mutex_unlock(&cache_lock);
967
968     if (status == 0)
969     {
970       pthread_mutex_lock (&stats_lock);
971       stats_updates_written++;
972       stats_data_sets_written += values_num;
973       pthread_mutex_unlock (&stats_lock);
974     }
975
976     rrd_free_ptrs((void ***) &values, &values_num);
977     free(file);
978
979     pthread_mutex_lock (&cache_lock);
980   }
981   pthread_mutex_unlock (&cache_lock);
982
983   return (NULL);
984 } /* }}} void *queue_thread_main */
985
986 static int buffer_get_field (char **buffer_ret, /* {{{ */
987     size_t *buffer_size_ret, char **field_ret)
988 {
989   char *buffer;
990   size_t buffer_pos;
991   size_t buffer_size;
992   char *field;
993   size_t field_size;
994   int status;
995
996   buffer = *buffer_ret;
997   buffer_pos = 0;
998   buffer_size = *buffer_size_ret;
999   field = *buffer_ret;
1000   field_size = 0;
1001
1002   if (buffer_size <= 0)
1003     return (-1);
1004
1005   /* This is ensured by `handle_request'. */
1006   assert (buffer[buffer_size - 1] == '\0');
1007
1008   status = -1;
1009   while (buffer_pos < buffer_size)
1010   {
1011     /* Check for end-of-field or end-of-buffer */
1012     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1013     {
1014       field[field_size] = 0;
1015       field_size++;
1016       buffer_pos++;
1017       status = 0;
1018       break;
1019     }
1020     /* Handle escaped characters. */
1021     else if (buffer[buffer_pos] == '\\')
1022     {
1023       if (buffer_pos >= (buffer_size - 1))
1024         break;
1025       buffer_pos++;
1026       field[field_size] = buffer[buffer_pos];
1027       field_size++;
1028       buffer_pos++;
1029     }
1030     /* Normal operation */ 
1031     else
1032     {
1033       field[field_size] = buffer[buffer_pos];
1034       field_size++;
1035       buffer_pos++;
1036     }
1037   } /* while (buffer_pos < buffer_size) */
1038
1039   if (status != 0)
1040     return (status);
1041
1042   *buffer_ret = buffer + buffer_pos;
1043   *buffer_size_ret = buffer_size - buffer_pos;
1044   *field_ret = field;
1045
1046   return (0);
1047 } /* }}} int buffer_get_field */
1048
1049 /* if we're restricting writes to the base directory,
1050  * check whether the file falls within the dir
1051  * returns 1 if OK, otherwise 0
1052  */
1053 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1054 {
1055   assert(file != NULL);
1056
1057   if (!config_write_base_only
1058       || JOURNAL_REPLAY(sock)
1059       || config_base_dir == NULL)
1060     return 1;
1061
1062   if (strstr(file, "../") != NULL) goto err;
1063
1064   /* relative paths without "../" are ok */
1065   if (*file != '/') return 1;
1066
1067   /* file must be of the format base + "/" + <1+ char filename> */
1068   if (strlen(file) < _config_base_dir_len + 2) goto err;
1069   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1070   if (*(file + _config_base_dir_len) != '/') goto err;
1071
1072   return 1;
1073
1074 err:
1075   if (sock != NULL && sock->fd >= 0)
1076     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1077
1078   return 0;
1079 } /* }}} static int check_file_access */
1080
1081 /* when using a base dir, convert relative paths to absolute paths.
1082  * if necessary, modifies the "filename" pointer to point
1083  * to the new path created in "tmp".  "tmp" is provided
1084  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1085  *
1086  * this allows us to optimize for the expected case (absolute path)
1087  * with a no-op.
1088  */
1089 static void get_abs_path(char **filename, char *tmp)
1090 {
1091   assert(tmp != NULL);
1092   assert(filename != NULL && *filename != NULL);
1093
1094   if (config_base_dir == NULL || **filename == '/')
1095     return;
1096
1097   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1098   *filename = tmp;
1099 } /* }}} static int get_abs_path */
1100
1101 static int flush_file (const char *filename) /* {{{ */
1102 {
1103   cache_item_t *ci;
1104
1105   pthread_mutex_lock (&cache_lock);
1106
1107   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1108   if (ci == NULL)
1109   {
1110     pthread_mutex_unlock (&cache_lock);
1111     return (ENOENT);
1112   }
1113
1114   if (ci->values_num > 0)
1115   {
1116     /* Enqueue at head */
1117     enqueue_cache_item (ci, HEAD);
1118     pthread_cond_wait(&ci->flushed, &cache_lock);
1119   }
1120
1121   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1122    * may have been purged during our cond_wait() */
1123
1124   pthread_mutex_unlock(&cache_lock);
1125
1126   return (0);
1127 } /* }}} int flush_file */
1128
1129 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1130 {
1131   char *err = "Syntax error.\n";
1132
1133   if (cmd && cmd->syntax)
1134     err = cmd->syntax;
1135
1136   return send_response(sock, RESP_ERR, "Usage: %s", err);
1137 } /* }}} static int syntax_error() */
1138
1139 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1140 {
1141   uint64_t copy_queue_length;
1142   uint64_t copy_updates_received;
1143   uint64_t copy_flush_received;
1144   uint64_t copy_updates_written;
1145   uint64_t copy_data_sets_written;
1146   uint64_t copy_journal_bytes;
1147   uint64_t copy_journal_rotate;
1148
1149   uint64_t tree_nodes_number;
1150   uint64_t tree_depth;
1151
1152   pthread_mutex_lock (&stats_lock);
1153   copy_queue_length       = stats_queue_length;
1154   copy_updates_received   = stats_updates_received;
1155   copy_flush_received     = stats_flush_received;
1156   copy_updates_written    = stats_updates_written;
1157   copy_data_sets_written  = stats_data_sets_written;
1158   copy_journal_bytes      = stats_journal_bytes;
1159   copy_journal_rotate     = stats_journal_rotate;
1160   pthread_mutex_unlock (&stats_lock);
1161
1162   pthread_mutex_lock (&cache_lock);
1163   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1164   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1165   pthread_mutex_unlock (&cache_lock);
1166
1167   add_response_info(sock,
1168                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1169   add_response_info(sock,
1170                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1171   add_response_info(sock,
1172                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1173   add_response_info(sock,
1174                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1175   add_response_info(sock,
1176                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1177   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1178   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1179   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1180   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1181
1182   send_response(sock, RESP_OK, "Statistics follow\n");
1183
1184   return (0);
1185 } /* }}} int handle_request_stats */
1186
1187 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1188 {
1189   char *file, file_tmp[PATH_MAX];
1190   int status;
1191
1192   status = buffer_get_field (&buffer, &buffer_size, &file);
1193   if (status != 0)
1194   {
1195     return syntax_error(sock,cmd);
1196   }
1197   else
1198   {
1199     pthread_mutex_lock(&stats_lock);
1200     stats_flush_received++;
1201     pthread_mutex_unlock(&stats_lock);
1202
1203     get_abs_path(&file, file_tmp);
1204     if (!check_file_access(file, sock)) return 0;
1205
1206     status = flush_file (file);
1207     if (status == 0)
1208       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1209     else if (status == ENOENT)
1210     {
1211       /* no file in our tree; see whether it exists at all */
1212       struct stat statbuf;
1213
1214       memset(&statbuf, 0, sizeof(statbuf));
1215       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1216         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1217       else
1218         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1219     }
1220     else if (status < 0)
1221       return send_response(sock, RESP_ERR, "Internal error.\n");
1222     else
1223       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1224   }
1225
1226   /* NOTREACHED */
1227   assert(1==0);
1228 } /* }}} int handle_request_flush */
1229
1230 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1231 {
1232   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1233
1234   pthread_mutex_lock(&cache_lock);
1235   flush_old_values(-1);
1236   pthread_mutex_unlock(&cache_lock);
1237
1238   return send_response(sock, RESP_OK, "Started flush.\n");
1239 } /* }}} static int handle_request_flushall */
1240
1241 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1242 {
1243   int status;
1244   char *file, file_tmp[PATH_MAX];
1245   cache_item_t *ci;
1246
1247   status = buffer_get_field(&buffer, &buffer_size, &file);
1248   if (status != 0)
1249     return syntax_error(sock,cmd);
1250
1251   get_abs_path(&file, file_tmp);
1252
1253   pthread_mutex_lock(&cache_lock);
1254   ci = g_tree_lookup(cache_tree, file);
1255   if (ci == NULL)
1256   {
1257     pthread_mutex_unlock(&cache_lock);
1258     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1259   }
1260
1261   for (size_t i=0; i < ci->values_num; i++)
1262     add_response_info(sock, "%s\n", ci->values[i]);
1263
1264   pthread_mutex_unlock(&cache_lock);
1265   return send_response(sock, RESP_OK, "updates pending\n");
1266 } /* }}} static int handle_request_pending */
1267
1268 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1269 {
1270   int status;
1271   gboolean found;
1272   char *file, file_tmp[PATH_MAX];
1273
1274   status = buffer_get_field(&buffer, &buffer_size, &file);
1275   if (status != 0)
1276     return syntax_error(sock,cmd);
1277
1278   get_abs_path(&file, file_tmp);
1279   if (!check_file_access(file, sock)) return 0;
1280
1281   pthread_mutex_lock(&cache_lock);
1282   found = g_tree_remove(cache_tree, file);
1283   pthread_mutex_unlock(&cache_lock);
1284
1285   if (found == TRUE)
1286   {
1287     if (!JOURNAL_REPLAY(sock))
1288       journal_write("forget", file);
1289
1290     return send_response(sock, RESP_OK, "Gone!\n");
1291   }
1292   else
1293     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1294
1295   /* NOTREACHED */
1296   assert(1==0);
1297 } /* }}} static int handle_request_forget */
1298
1299 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1300 {
1301   cache_item_t *ci;
1302
1303   pthread_mutex_lock(&cache_lock);
1304
1305   ci = cache_queue_head;
1306   while (ci != NULL)
1307   {
1308     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1309     ci = ci->next;
1310   }
1311
1312   pthread_mutex_unlock(&cache_lock);
1313
1314   return send_response(sock, RESP_OK, "in queue.\n");
1315 } /* }}} int handle_request_queue */
1316
1317 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1318 {
1319   char *file, file_tmp[PATH_MAX];
1320   int values_num = 0;
1321   int status;
1322   char orig_buf[CMD_MAX];
1323
1324   cache_item_t *ci;
1325
1326   /* save it for the journal later */
1327   if (!JOURNAL_REPLAY(sock))
1328     strncpy(orig_buf, buffer, buffer_size);
1329
1330   status = buffer_get_field (&buffer, &buffer_size, &file);
1331   if (status != 0)
1332     return syntax_error(sock,cmd);
1333
1334   pthread_mutex_lock(&stats_lock);
1335   stats_updates_received++;
1336   pthread_mutex_unlock(&stats_lock);
1337
1338   get_abs_path(&file, file_tmp);
1339   if (!check_file_access(file, sock)) return 0;
1340
1341   pthread_mutex_lock (&cache_lock);
1342   ci = g_tree_lookup (cache_tree, file);
1343
1344   if (ci == NULL) /* {{{ */
1345   {
1346     struct stat statbuf;
1347     cache_item_t *tmp;
1348
1349     /* don't hold the lock while we setup; stat(2) might block */
1350     pthread_mutex_unlock(&cache_lock);
1351
1352     memset (&statbuf, 0, sizeof (statbuf));
1353     status = stat (file, &statbuf);
1354     if (status != 0)
1355     {
1356       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1357
1358       status = errno;
1359       if (status == ENOENT)
1360         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1361       else
1362         return send_response(sock, RESP_ERR,
1363                              "stat failed with error %i.\n", status);
1364     }
1365     if (!S_ISREG (statbuf.st_mode))
1366       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1367
1368     if (access(file, R_OK|W_OK) != 0)
1369       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1370                            file, rrd_strerror(errno));
1371
1372     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1373     if (ci == NULL)
1374     {
1375       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1376
1377       return send_response(sock, RESP_ERR, "malloc failed.\n");
1378     }
1379     memset (ci, 0, sizeof (cache_item_t));
1380
1381     ci->file = strdup (file);
1382     if (ci->file == NULL)
1383     {
1384       free (ci);
1385       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1386
1387       return send_response(sock, RESP_ERR, "strdup failed.\n");
1388     }
1389
1390     wipe_ci_values(ci, now);
1391     ci->flags = CI_FLAGS_IN_TREE;
1392     pthread_cond_init(&ci->flushed, NULL);
1393
1394     pthread_mutex_lock(&cache_lock);
1395
1396     /* another UPDATE might have added this entry in the meantime */
1397     tmp = g_tree_lookup (cache_tree, file);
1398     if (tmp == NULL)
1399       g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1400     else
1401     {
1402       free_cache_item (ci);
1403       ci = tmp;
1404     }
1405
1406     /* state may have changed while we were unlocked */
1407     if (state == SHUTDOWN)
1408       return -1;
1409   } /* }}} */
1410   assert (ci != NULL);
1411
1412   /* don't re-write updates in replay mode */
1413   if (!JOURNAL_REPLAY(sock))
1414     journal_write("update", orig_buf);
1415
1416   while (buffer_size > 0)
1417   {
1418     char *value;
1419     time_t stamp;
1420     char *eostamp;
1421
1422     status = buffer_get_field (&buffer, &buffer_size, &value);
1423     if (status != 0)
1424     {
1425       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1426       break;
1427     }
1428
1429     /* make sure update time is always moving forward */
1430     stamp = strtol(value, &eostamp, 10);
1431     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1432     {
1433       pthread_mutex_unlock(&cache_lock);
1434       return send_response(sock, RESP_ERR,
1435                            "Cannot find timestamp in '%s'!\n", value);
1436     }
1437     else if (stamp <= ci->last_update_stamp)
1438     {
1439       pthread_mutex_unlock(&cache_lock);
1440       return send_response(sock, RESP_ERR,
1441                            "illegal attempt to update using time %ld when last"
1442                            " update time is %ld (minimum one second step)\n",
1443                            stamp, ci->last_update_stamp);
1444     }
1445     else
1446       ci->last_update_stamp = stamp;
1447
1448     if (!rrd_add_strdup_chunk(&ci->values, &ci->values_num, value,
1449                               &ci->values_alloc, config_alloc_chunk))
1450     {
1451       RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1452       continue;
1453     }
1454
1455     values_num++;
1456   }
1457
1458   if (((now - ci->last_flush_time) >= config_write_interval)
1459       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1460       && (ci->values_num > 0))
1461   {
1462     enqueue_cache_item (ci, TAIL);
1463   }
1464
1465   pthread_mutex_unlock (&cache_lock);
1466
1467   if (values_num < 1)
1468     return send_response(sock, RESP_ERR, "No values updated.\n");
1469   else
1470     return send_response(sock, RESP_OK,
1471                          "errors, enqueued %i value(s).\n", values_num);
1472
1473   /* NOTREACHED */
1474   assert(1==0);
1475
1476 } /* }}} int handle_request_update */
1477
1478 static int handle_request_fetch (HANDLER_PROTO) /* {{{ */
1479 {
1480   char *file, file_tmp[PATH_MAX];
1481   char *cf;
1482
1483   char *start_str;
1484   char *end_str;
1485   time_t start_tm;
1486   time_t end_tm;
1487
1488   unsigned long step;
1489   unsigned long ds_cnt;
1490   char **ds_namv;
1491   rrd_value_t *data;
1492
1493   int status;
1494   unsigned long i;
1495   time_t t;
1496   rrd_value_t *data_ptr;
1497
1498   file = NULL;
1499   cf = NULL;
1500   start_str = NULL;
1501   end_str = NULL;
1502
1503   /* Read the arguments */
1504   do /* while (0) */
1505   {
1506     status = buffer_get_field (&buffer, &buffer_size, &file);
1507     if (status != 0)
1508       break;
1509
1510     status = buffer_get_field (&buffer, &buffer_size, &cf);
1511     if (status != 0)
1512       break;
1513
1514     status = buffer_get_field (&buffer, &buffer_size, &start_str);
1515     if (status != 0)
1516     {
1517       start_str = NULL;
1518       status = 0;
1519       break;
1520     }
1521
1522     status = buffer_get_field (&buffer, &buffer_size, &end_str);
1523     if (status != 0)
1524     {
1525       end_str = NULL;
1526       status = 0;
1527       break;
1528     }
1529   } while (0);
1530
1531   if (status != 0)
1532     return (syntax_error(sock,cmd));
1533
1534   get_abs_path(&file, file_tmp);
1535   if (!check_file_access(file, sock)) return 0;
1536
1537   status = flush_file (file);
1538   if ((status != 0) && (status != ENOENT))
1539     return (send_response (sock, RESP_ERR,
1540           "flush_file (%s) failed with status %i.\n", file, status));
1541
1542   t = time (NULL); /* "now" */
1543
1544   /* Parse start time */
1545   if (start_str != NULL)
1546   {
1547     char *endptr;
1548     long value;
1549
1550     endptr = NULL;
1551     errno = 0;
1552     value = strtol (start_str, &endptr, /* base = */ 0);
1553     if ((endptr == start_str) || (errno != 0))
1554       return (send_response(sock, RESP_ERR,
1555             "Cannot parse start time `%s': Only simple integers are allowed.\n",
1556             start_str));
1557
1558     if (value > 0)
1559       start_tm = (time_t) value;
1560     else
1561       start_tm = (time_t) (t + value);
1562   }
1563   else
1564   {
1565     start_tm = t - 86400;
1566   }
1567
1568   /* Parse end time */
1569   if (end_str != NULL)
1570   {
1571     char *endptr;
1572     long value;
1573
1574     endptr = NULL;
1575     errno = 0;
1576     value = strtol (end_str, &endptr, /* base = */ 0);
1577     if ((endptr == end_str) || (errno != 0))
1578       return (send_response(sock, RESP_ERR,
1579             "Cannot parse end time `%s': Only simple integers are allowed.\n",
1580             end_str));
1581
1582     if (value > 0)
1583       end_tm = (time_t) value;
1584     else
1585       end_tm = (time_t) (t + value);
1586   }
1587   else
1588   {
1589     end_tm = t;
1590   }
1591
1592   step = -1;
1593   ds_cnt = 0;
1594   ds_namv = NULL;
1595   data = NULL;
1596
1597   status = rrd_fetch_r (file, cf, &start_tm, &end_tm, &step,
1598       &ds_cnt, &ds_namv, &data);
1599   if (status != 0)
1600     return (send_response(sock, RESP_ERR,
1601           "rrd_fetch_r failed: %s\n", rrd_get_error ()));
1602
1603   add_response_info (sock, "FlushVersion: %lu\n", 1);
1604   add_response_info (sock, "Start: %lu\n", (unsigned long) start_tm);
1605   add_response_info (sock, "End: %lu\n", (unsigned long) end_tm);
1606   add_response_info (sock, "Step: %lu\n", step);
1607   add_response_info (sock, "DSCount: %lu\n", ds_cnt);
1608
1609 #define SSTRCAT(buffer,str,buffer_fill) do { \
1610     size_t str_len = strlen (str); \
1611     if ((buffer_fill + str_len) > sizeof (buffer)) \
1612       str_len = sizeof (buffer) - buffer_fill; \
1613     if (str_len > 0) { \
1614       strncpy (buffer + buffer_fill, str, str_len); \
1615       buffer_fill += str_len; \
1616       assert (buffer_fill <= sizeof (buffer)); \
1617       if (buffer_fill == sizeof (buffer)) \
1618         buffer[buffer_fill - 1] = 0; \
1619       else \
1620         buffer[buffer_fill] = 0; \
1621     } \
1622   } while (0)
1623
1624   { /* Add list of DS names */
1625     char linebuf[1024];
1626     size_t linebuf_fill;
1627
1628     memset (linebuf, 0, sizeof (linebuf));
1629     linebuf_fill = 0;
1630     for (i = 0; i < ds_cnt; i++)
1631     {
1632       if (i > 0)
1633         SSTRCAT (linebuf, " ", linebuf_fill);
1634       SSTRCAT (linebuf, ds_namv[i], linebuf_fill);
1635       rrd_freemem(ds_namv[i]);
1636     }
1637     rrd_freemem(ds_namv);
1638     add_response_info (sock, "DSName: %s\n", linebuf);
1639   }
1640
1641   /* Add the actual data */
1642   assert (step > 0);
1643   data_ptr = data;
1644   for (t = start_tm + step; t <= end_tm; t += step)
1645   {
1646     char linebuf[1024];
1647     size_t linebuf_fill;
1648     char tmp[128];
1649
1650     memset (linebuf, 0, sizeof (linebuf));
1651     linebuf_fill = 0;
1652     for (i = 0; i < ds_cnt; i++)
1653     {
1654       snprintf (tmp, sizeof (tmp), " %0.10e", *data_ptr);
1655       tmp[sizeof (tmp) - 1] = 0;
1656       SSTRCAT (linebuf, tmp, linebuf_fill);
1657
1658       data_ptr++;
1659     }
1660
1661     add_response_info (sock, "%10lu:%s\n", (unsigned long) t, linebuf);
1662   } /* for (t) */
1663   rrd_freemem(data);
1664
1665   return (send_response (sock, RESP_OK, "Success\n"));
1666 #undef SSTRCAT
1667 } /* }}} int handle_request_fetch */
1668
1669 /* we came across a "WROTE" entry during journal replay.
1670  * throw away any values that we have accumulated for this file
1671  */
1672 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1673 {
1674   cache_item_t *ci;
1675   const char *file = buffer;
1676
1677   pthread_mutex_lock(&cache_lock);
1678
1679   ci = g_tree_lookup(cache_tree, file);
1680   if (ci == NULL)
1681   {
1682     pthread_mutex_unlock(&cache_lock);
1683     return (0);
1684   }
1685
1686   if (ci->values)
1687     rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1688
1689   wipe_ci_values(ci, now);
1690   remove_from_queue(ci);
1691
1692   pthread_mutex_unlock(&cache_lock);
1693   return (0);
1694 } /* }}} int handle_request_wrote */
1695
1696 /* start "BATCH" processing */
1697 static int batch_start (HANDLER_PROTO) /* {{{ */
1698 {
1699   int status;
1700   if (sock->batch_start)
1701     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1702
1703   status = send_response(sock, RESP_OK,
1704                          "Go ahead.  End with dot '.' on its own line.\n");
1705   sock->batch_start = time(NULL);
1706   sock->batch_cmd = 0;
1707
1708   return status;
1709 } /* }}} static int batch_start */
1710
1711 /* finish "BATCH" processing and return results to the client */
1712 static int batch_done (HANDLER_PROTO) /* {{{ */
1713 {
1714   assert(sock->batch_start);
1715   sock->batch_start = 0;
1716   sock->batch_cmd  = 0;
1717   return send_response(sock, RESP_OK, "errors\n");
1718 } /* }}} static int batch_done */
1719
1720 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1721 {
1722   return -1;
1723 } /* }}} static int handle_request_quit */
1724
1725 static command_t list_of_commands[] = { /* {{{ */
1726   {
1727     "UPDATE",
1728     handle_request_update,
1729     CMD_CONTEXT_ANY,
1730     "UPDATE <filename> <values> [<values> ...]\n"
1731     ,
1732     "Adds the given file to the internal cache if it is not yet known and\n"
1733     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1734     "for details.\n"
1735     "\n"
1736     "Each <values> has the following form:\n"
1737     "  <values> = <time>:<value>[:<value>[...]]\n"
1738     "See the rrdupdate(1) manpage for details.\n"
1739   },
1740   {
1741     "WROTE",
1742     handle_request_wrote,
1743     CMD_CONTEXT_JOURNAL,
1744     NULL,
1745     NULL
1746   },
1747   {
1748     "FLUSH",
1749     handle_request_flush,
1750     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1751     "FLUSH <filename>\n"
1752     ,
1753     "Adds the given filename to the head of the update queue and returns\n"
1754     "after it has been dequeued.\n"
1755   },
1756   {
1757     "FLUSHALL",
1758     handle_request_flushall,
1759     CMD_CONTEXT_CLIENT,
1760     "FLUSHALL\n"
1761     ,
1762     "Triggers writing of all pending updates.  Returns immediately.\n"
1763   },
1764   {
1765     "PENDING",
1766     handle_request_pending,
1767     CMD_CONTEXT_CLIENT,
1768     "PENDING <filename>\n"
1769     ,
1770     "Shows any 'pending' updates for a file, in order.\n"
1771     "The updates shown have not yet been written to the underlying RRD file.\n"
1772   },
1773   {
1774     "FORGET",
1775     handle_request_forget,
1776     CMD_CONTEXT_ANY,
1777     "FORGET <filename>\n"
1778     ,
1779     "Removes the file completely from the cache.\n"
1780     "Any pending updates for the file will be lost.\n"
1781   },
1782   {
1783     "QUEUE",
1784     handle_request_queue,
1785     CMD_CONTEXT_CLIENT,
1786     "QUEUE\n"
1787     ,
1788         "Shows all files in the output queue.\n"
1789     "The output is zero or more lines in the following format:\n"
1790     "(where <num_vals> is the number of values to be written)\n"
1791     "\n"
1792     "<num_vals> <filename>\n"
1793   },
1794   {
1795     "STATS",
1796     handle_request_stats,
1797     CMD_CONTEXT_CLIENT,
1798     "STATS\n"
1799     ,
1800     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1801     "a description of the values.\n"
1802   },
1803   {
1804     "HELP",
1805     handle_request_help,
1806     CMD_CONTEXT_CLIENT,
1807     "HELP [<command>]\n",
1808     NULL, /* special! */
1809   },
1810   {
1811     "BATCH",
1812     batch_start,
1813     CMD_CONTEXT_CLIENT,
1814     "BATCH\n"
1815     ,
1816     "The 'BATCH' command permits the client to initiate a bulk load\n"
1817     "   of commands to rrdcached.\n"
1818     "\n"
1819     "Usage:\n"
1820     "\n"
1821     "    client: BATCH\n"
1822     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1823     "    client: command #1\n"
1824     "    client: command #2\n"
1825     "    client: ... and so on\n"
1826     "    client: .\n"
1827     "    server: 2 errors\n"
1828     "    server: 7 message for command #7\n"
1829     "    server: 9 message for command #9\n"
1830     "\n"
1831     "For more information, consult the rrdcached(1) documentation.\n"
1832   },
1833   {
1834     ".",   /* BATCH terminator */
1835     batch_done,
1836     CMD_CONTEXT_BATCH,
1837     NULL,
1838     NULL
1839   },
1840   {
1841     "FETCH",
1842     handle_request_fetch,
1843     CMD_CONTEXT_CLIENT,
1844     "FETCH <file> <CF> [<start> [<end>]]\n"
1845     ,
1846     "The 'FETCH' can be used by the client to retrieve values from an RRD file.\n"
1847   },
1848   {
1849     "QUIT",
1850     handle_request_quit,
1851     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1852     "QUIT\n"
1853     ,
1854     "Disconnect from rrdcached.\n"
1855   }
1856 }; /* }}} command_t list_of_commands[] */
1857 static size_t list_of_commands_len = sizeof (list_of_commands)
1858   / sizeof (list_of_commands[0]);
1859
1860 static command_t *find_command(char *cmd)
1861 {
1862   size_t i;
1863
1864   for (i = 0; i < list_of_commands_len; i++)
1865     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1866       return (&list_of_commands[i]);
1867   return NULL;
1868 }
1869
1870 /* We currently use the index in the `list_of_commands' array as a bit position
1871  * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
1872  * outside these functions so that switching to a more elegant storage method
1873  * is easily possible. */
1874 static ssize_t find_command_index (const char *cmd) /* {{{ */
1875 {
1876   size_t i;
1877
1878   for (i = 0; i < list_of_commands_len; i++)
1879     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1880       return ((ssize_t) i);
1881   return (-1);
1882 } /* }}} ssize_t find_command_index */
1883
1884 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
1885     const char *cmd)
1886 {
1887   ssize_t i;
1888
1889   if (JOURNAL_REPLAY(sock))
1890     return (1);
1891
1892   if (cmd == NULL)
1893     return (-1);
1894
1895   if ((strcasecmp ("QUIT", cmd) == 0)
1896       || (strcasecmp ("HELP", cmd) == 0))
1897     return (1);
1898   else if (strcmp (".", cmd) == 0)
1899     cmd = "BATCH";
1900
1901   i = find_command_index (cmd);
1902   if (i < 0)
1903     return (-1);
1904   assert (i < 32);
1905
1906   if ((sock->permissions & (1 << i)) != 0)
1907     return (1);
1908   return (0);
1909 } /* }}} int socket_permission_check */
1910
1911 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
1912     const char *cmd)
1913 {
1914   ssize_t i;
1915
1916   i = find_command_index (cmd);
1917   if (i < 0)
1918     return (-1);
1919   assert (i < 32);
1920
1921   sock->permissions |= (1 << i);
1922   return (0);
1923 } /* }}} int socket_permission_add */
1924
1925 /* check whether commands are received in the expected context */
1926 static int command_check_context(listen_socket_t *sock, command_t *cmd)
1927 {
1928   if (JOURNAL_REPLAY(sock))
1929     return (cmd->context & CMD_CONTEXT_JOURNAL);
1930   else if (sock->batch_start)
1931     return (cmd->context & CMD_CONTEXT_BATCH);
1932   else
1933     return (cmd->context & CMD_CONTEXT_CLIENT);
1934
1935   /* NOTREACHED */
1936   assert(1==0);
1937 }
1938
1939 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1940 {
1941   int status;
1942   char *cmd_str;
1943   char *resp_txt;
1944   command_t *help = NULL;
1945
1946   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1947   if (status == 0)
1948     help = find_command(cmd_str);
1949
1950   if (help && (help->syntax || help->help))
1951   {
1952     char tmp[CMD_MAX];
1953
1954     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1955     resp_txt = tmp;
1956
1957     if (help->syntax)
1958       add_response_info(sock, "Usage: %s\n", help->syntax);
1959
1960     if (help->help)
1961       add_response_info(sock, "%s\n", help->help);
1962   }
1963   else
1964   {
1965     size_t i;
1966
1967     resp_txt = "Command overview\n";
1968
1969     for (i = 0; i < list_of_commands_len; i++)
1970     {
1971       if (list_of_commands[i].syntax == NULL)
1972         continue;
1973       add_response_info (sock, "%s", list_of_commands[i].syntax);
1974     }
1975   }
1976
1977   return send_response(sock, RESP_OK, resp_txt);
1978 } /* }}} int handle_request_help */
1979
1980 static int handle_request (DISPATCH_PROTO) /* {{{ */
1981 {
1982   char *buffer_ptr = buffer;
1983   char *cmd_str = NULL;
1984   command_t *cmd = NULL;
1985   int status;
1986
1987   assert (buffer[buffer_size - 1] == '\0');
1988
1989   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1990   if (status != 0)
1991   {
1992     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1993     return (-1);
1994   }
1995
1996   if (sock != NULL && sock->batch_start)
1997     sock->batch_cmd++;
1998
1999   cmd = find_command(cmd_str);
2000   if (!cmd)
2001     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
2002
2003   if (!socket_permission_check (sock, cmd->cmd))
2004     return send_response(sock, RESP_ERR, "Permission denied.\n");
2005
2006   if (!command_check_context(sock, cmd))
2007     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
2008
2009   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
2010 } /* }}} int handle_request */
2011
2012 static void journal_set_free (journal_set *js) /* {{{ */
2013 {
2014   if (js == NULL)
2015     return;
2016
2017   rrd_free_ptrs((void ***) &js->files, &js->files_num);
2018
2019   free(js);
2020 } /* }}} journal_set_free */
2021
2022 static void journal_set_remove (journal_set *js) /* {{{ */
2023 {
2024   if (js == NULL)
2025     return;
2026
2027   for (uint i=0; i < js->files_num; i++)
2028   {
2029     RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
2030     unlink(js->files[i]);
2031   }
2032 } /* }}} journal_set_remove */
2033
2034 /* close current journal file handle.
2035  * MUST hold journal_lock before calling */
2036 static void journal_close(void) /* {{{ */
2037 {
2038   if (journal_fh != NULL)
2039   {
2040     if (fclose(journal_fh) != 0)
2041       RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
2042   }
2043
2044   journal_fh = NULL;
2045   journal_size = 0;
2046 } /* }}} journal_close */
2047
2048 /* MUST hold journal_lock before calling */
2049 static void journal_new_file(void) /* {{{ */
2050 {
2051   struct timeval now;
2052   int  new_fd;
2053   char new_file[PATH_MAX + 1];
2054
2055   assert(journal_dir != NULL);
2056   assert(journal_cur != NULL);
2057
2058   journal_close();
2059
2060   gettimeofday(&now, NULL);
2061   /* this format assures that the files sort in strcmp() order */
2062   snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
2063            journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
2064
2065   new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
2066                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
2067   if (new_fd < 0)
2068     goto error;
2069
2070   journal_fh = fdopen(new_fd, "a");
2071   if (journal_fh == NULL)
2072     goto error;
2073
2074   journal_size = ftell(journal_fh);
2075   RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
2076
2077   /* record the file in the journal set */
2078   rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
2079
2080   return;
2081
2082 error:
2083   RRDD_LOG(LOG_CRIT,
2084            "JOURNALING DISABLED: Error while trying to create %s : %s",
2085            new_file, rrd_strerror(errno));
2086   RRDD_LOG(LOG_CRIT,
2087            "JOURNALING DISABLED: All values will be flushed at shutdown");
2088
2089   close(new_fd);
2090   config_flush_at_shutdown = 1;
2091
2092 } /* }}} journal_new_file */
2093
2094 /* MUST NOT hold journal_lock before calling this */
2095 static void journal_rotate(void) /* {{{ */
2096 {
2097   journal_set *old_js = NULL;
2098
2099   if (journal_dir == NULL)
2100     return;
2101
2102   RRDD_LOG(LOG_DEBUG, "rotating journals");
2103
2104   pthread_mutex_lock(&stats_lock);
2105   ++stats_journal_rotate;
2106   pthread_mutex_unlock(&stats_lock);
2107
2108   pthread_mutex_lock(&journal_lock);
2109
2110   journal_close();
2111
2112   /* rotate the journal sets */
2113   old_js = journal_old;
2114   journal_old = journal_cur;
2115   journal_cur = calloc(1, sizeof(journal_set));
2116
2117   if (journal_cur != NULL)
2118     journal_new_file();
2119   else
2120     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2121
2122   pthread_mutex_unlock(&journal_lock);
2123
2124   journal_set_remove(old_js);
2125   journal_set_free  (old_js);
2126
2127 } /* }}} static void journal_rotate */
2128
2129 /* MUST hold journal_lock when calling */
2130 static void journal_done(void) /* {{{ */
2131 {
2132   if (journal_cur == NULL)
2133     return;
2134
2135   journal_close();
2136
2137   if (config_flush_at_shutdown)
2138   {
2139     RRDD_LOG(LOG_INFO, "removing journals");
2140     journal_set_remove(journal_old);
2141     journal_set_remove(journal_cur);
2142   }
2143   else
2144   {
2145     RRDD_LOG(LOG_INFO, "expedited shutdown; "
2146              "journals will be used at next startup");
2147   }
2148
2149   journal_set_free(journal_cur);
2150   journal_set_free(journal_old);
2151   free(journal_dir);
2152
2153 } /* }}} static void journal_done */
2154
2155 static int journal_write(char *cmd, char *args) /* {{{ */
2156 {
2157   int chars;
2158
2159   if (journal_fh == NULL)
2160     return 0;
2161
2162   pthread_mutex_lock(&journal_lock);
2163   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
2164   journal_size += chars;
2165
2166   if (journal_size > JOURNAL_MAX)
2167     journal_new_file();
2168
2169   pthread_mutex_unlock(&journal_lock);
2170
2171   if (chars > 0)
2172   {
2173     pthread_mutex_lock(&stats_lock);
2174     stats_journal_bytes += chars;
2175     pthread_mutex_unlock(&stats_lock);
2176   }
2177
2178   return chars;
2179 } /* }}} static int journal_write */
2180
2181 static int journal_replay (const char *file) /* {{{ */
2182 {
2183   FILE *fh;
2184   int entry_cnt = 0;
2185   int fail_cnt = 0;
2186   uint64_t line = 0;
2187   char entry[CMD_MAX];
2188   time_t now;
2189
2190   if (file == NULL) return 0;
2191
2192   {
2193     char *reason = "unknown error";
2194     int status = 0;
2195     struct stat statbuf;
2196
2197     memset(&statbuf, 0, sizeof(statbuf));
2198     if (stat(file, &statbuf) != 0)
2199     {
2200       reason = "stat error";
2201       status = errno;
2202     }
2203     else if (!S_ISREG(statbuf.st_mode))
2204     {
2205       reason = "not a regular file";
2206       status = EPERM;
2207     }
2208     if (statbuf.st_uid != daemon_uid)
2209     {
2210       reason = "not owned by daemon user";
2211       status = EACCES;
2212     }
2213     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2214     {
2215       reason = "must not be user/group writable";
2216       status = EACCES;
2217     }
2218
2219     if (status != 0)
2220     {
2221       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2222                file, rrd_strerror(status), reason);
2223       return 0;
2224     }
2225   }
2226
2227   fh = fopen(file, "r");
2228   if (fh == NULL)
2229   {
2230     if (errno != ENOENT)
2231       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2232                file, rrd_strerror(errno));
2233     return 0;
2234   }
2235   else
2236     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2237
2238   now = time(NULL);
2239
2240   while(!feof(fh))
2241   {
2242     size_t entry_len;
2243
2244     ++line;
2245     if (fgets(entry, sizeof(entry), fh) == NULL)
2246       break;
2247     entry_len = strlen(entry);
2248
2249     /* check \n termination in case journal writing crashed mid-line */
2250     if (entry_len == 0)
2251       continue;
2252     else if (entry[entry_len - 1] != '\n')
2253     {
2254       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2255       ++fail_cnt;
2256       continue;
2257     }
2258
2259     entry[entry_len - 1] = '\0';
2260
2261     if (handle_request(NULL, now, entry, entry_len) == 0)
2262       ++entry_cnt;
2263     else
2264       ++fail_cnt;
2265   }
2266
2267   fclose(fh);
2268
2269   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2270            entry_cnt, fail_cnt);
2271
2272   return entry_cnt > 0 ? 1 : 0;
2273 } /* }}} static int journal_replay */
2274
2275 static int journal_sort(const void *v1, const void *v2)
2276 {
2277   char **jn1 = (char **) v1;
2278   char **jn2 = (char **) v2;
2279
2280   return strcmp(*jn1,*jn2);
2281 }
2282
2283 static void journal_init(void) /* {{{ */
2284 {
2285   int had_journal = 0;
2286   DIR *dir;
2287   struct dirent *dent;
2288   char path[PATH_MAX+1];
2289
2290   if (journal_dir == NULL) return;
2291
2292   pthread_mutex_lock(&journal_lock);
2293
2294   journal_cur = calloc(1, sizeof(journal_set));
2295   if (journal_cur == NULL)
2296   {
2297     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2298     return;
2299   }
2300
2301   RRDD_LOG(LOG_INFO, "checking for journal files");
2302
2303   /* Handle old journal files during transition.  This gives them the
2304    * correct sort order.  TODO: remove after first release
2305    */
2306   {
2307     char old_path[PATH_MAX+1];
2308     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2309     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2310     rename(old_path, path);
2311
2312     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE        );
2313     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2314     rename(old_path, path);
2315   }
2316
2317   dir = opendir(journal_dir);
2318   if (!dir) {
2319     RRDD_LOG(LOG_CRIT, "journal_init: opendir(%s) failed\n", journal_dir);
2320     return;
2321   }
2322   while ((dent = readdir(dir)) != NULL)
2323   {
2324     /* looks like a journal file? */
2325     if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2326       continue;
2327
2328     snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2329
2330     if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2331     {
2332       RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2333                dent->d_name);
2334       break;
2335     }
2336   }
2337   closedir(dir);
2338
2339   qsort(journal_cur->files, journal_cur->files_num,
2340         sizeof(journal_cur->files[0]), journal_sort);
2341
2342   for (uint i=0; i < journal_cur->files_num; i++)
2343     had_journal += journal_replay(journal_cur->files[i]);
2344
2345   journal_new_file();
2346
2347   /* it must have been a crash.  start a flush */
2348   if (had_journal && config_flush_at_shutdown)
2349     flush_old_values(-1);
2350
2351   pthread_mutex_unlock(&journal_lock);
2352
2353   RRDD_LOG(LOG_INFO, "journal processing complete");
2354
2355 } /* }}} static void journal_init */
2356
2357 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2358 {
2359   assert(sock != NULL);
2360
2361   free(sock->rbuf);  sock->rbuf = NULL;
2362   free(sock->wbuf);  sock->wbuf = NULL;
2363   free(sock);
2364 } /* }}} void free_listen_socket */
2365
2366 static void close_connection(listen_socket_t *sock) /* {{{ */
2367 {
2368   if (sock->fd >= 0)
2369   {
2370     close(sock->fd);
2371     sock->fd = -1;
2372   }
2373
2374   free_listen_socket(sock);
2375
2376 } /* }}} void close_connection */
2377
2378 static void *connection_thread_main (void *args) /* {{{ */
2379 {
2380   listen_socket_t *sock;
2381   int fd;
2382
2383   sock = (listen_socket_t *) args;
2384   fd = sock->fd;
2385
2386   /* init read buffers */
2387   sock->next_read = sock->next_cmd = 0;
2388   sock->rbuf = malloc(RBUF_SIZE);
2389   if (sock->rbuf == NULL)
2390   {
2391     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2392     close_connection(sock);
2393     return NULL;
2394   }
2395
2396   pthread_mutex_lock (&connection_threads_lock);
2397   connection_threads_num++;
2398   pthread_mutex_unlock (&connection_threads_lock);
2399
2400   while (state == RUNNING)
2401   {
2402     char *cmd;
2403     ssize_t cmd_len;
2404     ssize_t rbytes;
2405     time_t now;
2406
2407     struct pollfd pollfd;
2408     int status;
2409
2410     pollfd.fd = fd;
2411     pollfd.events = POLLIN | POLLPRI;
2412     pollfd.revents = 0;
2413
2414     status = poll (&pollfd, 1, /* timeout = */ 500);
2415     if (state != RUNNING)
2416       break;
2417     else if (status == 0) /* timeout */
2418       continue;
2419     else if (status < 0) /* error */
2420     {
2421       status = errno;
2422       if (status != EINTR)
2423         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2424       continue;
2425     }
2426
2427     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2428       break;
2429     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2430     {
2431       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2432           "poll(2) returned something unexpected: %#04hx",
2433           pollfd.revents);
2434       break;
2435     }
2436
2437     rbytes = read(fd, sock->rbuf + sock->next_read,
2438                   RBUF_SIZE - sock->next_read);
2439     if (rbytes < 0)
2440     {
2441       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2442       break;
2443     }
2444     else if (rbytes == 0)
2445       break; /* eof */
2446
2447     sock->next_read += rbytes;
2448
2449     if (sock->batch_start)
2450       now = sock->batch_start;
2451     else
2452       now = time(NULL);
2453
2454     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2455     {
2456       status = handle_request (sock, now, cmd, cmd_len+1);
2457       if (status != 0)
2458         goto out_close;
2459     }
2460   }
2461
2462 out_close:
2463   close_connection(sock);
2464
2465   /* Remove this thread from the connection threads list */
2466   pthread_mutex_lock (&connection_threads_lock);
2467   connection_threads_num--;
2468   if (connection_threads_num <= 0)
2469     pthread_cond_broadcast(&connection_threads_done);
2470   pthread_mutex_unlock (&connection_threads_lock);
2471
2472   return (NULL);
2473 } /* }}} void *connection_thread_main */
2474
2475 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2476 {
2477   int fd;
2478   struct sockaddr_un sa;
2479   listen_socket_t *temp;
2480   int status;
2481   const char *path;
2482   char *path_copy, *dir;
2483
2484   path = sock->addr;
2485   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2486     path += strlen("unix:");
2487
2488   /* dirname may modify its argument */
2489   path_copy = strdup(path);
2490   if (path_copy == NULL)
2491   {
2492     fprintf(stderr, "rrdcached: strdup(): %s\n",
2493         rrd_strerror(errno));
2494     return (-1);
2495   }
2496
2497   dir = dirname(path_copy);
2498   if (rrd_mkdir_p(dir, 0777) != 0)
2499   {
2500     fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2501         dir, rrd_strerror(errno));
2502     return (-1);
2503   }
2504
2505   free(path_copy);
2506
2507   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2508       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2509   if (temp == NULL)
2510   {
2511     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2512     return (-1);
2513   }
2514   listen_fds = temp;
2515   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2516
2517   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2518   if (fd < 0)
2519   {
2520     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2521              rrd_strerror(errno));
2522     return (-1);
2523   }
2524
2525   memset (&sa, 0, sizeof (sa));
2526   sa.sun_family = AF_UNIX;
2527   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2528
2529   /* if we've gotten this far, we own the pid file.  any daemon started
2530    * with the same args must not be alive.  therefore, ensure that we can
2531    * create the socket...
2532    */
2533   unlink(path);
2534
2535   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2536   if (status != 0)
2537   {
2538     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2539              path, rrd_strerror(errno));
2540     close (fd);
2541     return (-1);
2542   }
2543
2544   /* tweak the sockets group ownership */
2545   if (sock->socket_group != (gid_t)-1)
2546   {
2547     if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2548          (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2549     {
2550       fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2551     }
2552   }
2553
2554   if (sock->socket_permissions != (mode_t)-1)
2555   {
2556     if (chmod(path, sock->socket_permissions) != 0)
2557       fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2558           (unsigned int)sock->socket_permissions, strerror(errno));
2559   }
2560
2561   status = listen (fd, /* backlog = */ 10);
2562   if (status != 0)
2563   {
2564     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2565              path, rrd_strerror(errno));
2566     close (fd);
2567     unlink (path);
2568     return (-1);
2569   }
2570
2571   listen_fds[listen_fds_num].fd = fd;
2572   listen_fds[listen_fds_num].family = PF_UNIX;
2573   strncpy(listen_fds[listen_fds_num].addr, path,
2574           sizeof (listen_fds[listen_fds_num].addr) - 1);
2575   listen_fds_num++;
2576
2577   return (0);
2578 } /* }}} int open_listen_socket_unix */
2579
2580 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2581 {
2582   struct addrinfo ai_hints;
2583   struct addrinfo *ai_res;
2584   struct addrinfo *ai_ptr;
2585   char addr_copy[NI_MAXHOST];
2586   char *addr;
2587   char *port;
2588   int status;
2589
2590   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2591   addr_copy[sizeof (addr_copy) - 1] = 0;
2592   addr = addr_copy;
2593
2594   memset (&ai_hints, 0, sizeof (ai_hints));
2595   ai_hints.ai_flags = 0;
2596 #ifdef AI_ADDRCONFIG
2597   ai_hints.ai_flags |= AI_ADDRCONFIG;
2598 #endif
2599   ai_hints.ai_family = AF_UNSPEC;
2600   ai_hints.ai_socktype = SOCK_STREAM;
2601
2602   port = NULL;
2603   if (*addr == '[') /* IPv6+port format */
2604   {
2605     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2606     addr++;
2607
2608     port = strchr (addr, ']');
2609     if (port == NULL)
2610     {
2611       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2612       return (-1);
2613     }
2614     *port = 0;
2615     port++;
2616
2617     if (*port == ':')
2618       port++;
2619     else if (*port == 0)
2620       port = NULL;
2621     else
2622     {
2623       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2624       return (-1);
2625     }
2626   } /* if (*addr == '[') */
2627   else
2628   {
2629     port = rindex(addr, ':');
2630     if (port != NULL)
2631     {
2632       *port = 0;
2633       port++;
2634     }
2635   }
2636   ai_res = NULL;
2637   status = getaddrinfo (addr,
2638                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2639                         &ai_hints, &ai_res);
2640   if (status != 0)
2641   {
2642     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2643              addr, gai_strerror (status));
2644     return (-1);
2645   }
2646
2647   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2648   {
2649     int fd;
2650     listen_socket_t *temp;
2651     int one = 1;
2652
2653     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2654         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2655     if (temp == NULL)
2656     {
2657       fprintf (stderr,
2658                "rrdcached: open_listen_socket_network: realloc failed.\n");
2659       continue;
2660     }
2661     listen_fds = temp;
2662     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2663
2664     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2665     if (fd < 0)
2666     {
2667       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2668                rrd_strerror(errno));
2669       continue;
2670     }
2671
2672     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2673
2674     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2675     if (status != 0)
2676     {
2677       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2678                sock->addr, rrd_strerror(errno));
2679       close (fd);
2680       continue;
2681     }
2682
2683     status = listen (fd, /* backlog = */ 10);
2684     if (status != 0)
2685     {
2686       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2687                sock->addr, rrd_strerror(errno));
2688       close (fd);
2689       freeaddrinfo(ai_res);
2690       return (-1);
2691     }
2692
2693     listen_fds[listen_fds_num].fd = fd;
2694     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2695     listen_fds_num++;
2696   } /* for (ai_ptr) */
2697
2698   freeaddrinfo(ai_res);
2699   return (0);
2700 } /* }}} static int open_listen_socket_network */
2701
2702 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2703 {
2704   assert(sock != NULL);
2705   assert(sock->addr != NULL);
2706
2707   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2708       || sock->addr[0] == '/')
2709     return (open_listen_socket_unix(sock));
2710   else
2711     return (open_listen_socket_network(sock));
2712 } /* }}} int open_listen_socket */
2713
2714 static int close_listen_sockets (void) /* {{{ */
2715 {
2716   size_t i;
2717
2718   for (i = 0; i < listen_fds_num; i++)
2719   {
2720     close (listen_fds[i].fd);
2721
2722     if (listen_fds[i].family == PF_UNIX)
2723       unlink(listen_fds[i].addr);
2724   }
2725
2726   free (listen_fds);
2727   listen_fds = NULL;
2728   listen_fds_num = 0;
2729
2730   return (0);
2731 } /* }}} int close_listen_sockets */
2732
2733 static void *listen_thread_main (void UNUSED(*args)) /* {{{ */
2734 {
2735   struct pollfd *pollfds;
2736   int pollfds_num;
2737   int status;
2738   int i;
2739
2740   if (listen_fds_num < 1)
2741   {
2742     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2743     return (NULL);
2744   }
2745
2746   pollfds_num = listen_fds_num;
2747   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2748   if (pollfds == NULL)
2749   {
2750     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2751     return (NULL);
2752   }
2753   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2754
2755   RRDD_LOG(LOG_INFO, "listening for connections");
2756
2757   while (state == RUNNING)
2758   {
2759     for (i = 0; i < pollfds_num; i++)
2760     {
2761       pollfds[i].fd = listen_fds[i].fd;
2762       pollfds[i].events = POLLIN | POLLPRI;
2763       pollfds[i].revents = 0;
2764     }
2765
2766     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2767     if (state != RUNNING)
2768       break;
2769     else if (status == 0) /* timeout */
2770       continue;
2771     else if (status < 0) /* error */
2772     {
2773       status = errno;
2774       if (status != EINTR)
2775       {
2776         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2777       }
2778       continue;
2779     }
2780
2781     for (i = 0; i < pollfds_num; i++)
2782     {
2783       listen_socket_t *client_sock;
2784       struct sockaddr_storage client_sa;
2785       socklen_t client_sa_size;
2786       pthread_t tid;
2787       pthread_attr_t attr;
2788
2789       if (pollfds[i].revents == 0)
2790         continue;
2791
2792       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2793       {
2794         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2795             "poll(2) returned something unexpected for listen FD #%i.",
2796             pollfds[i].fd);
2797         continue;
2798       }
2799
2800       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2801       if (client_sock == NULL)
2802       {
2803         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2804         continue;
2805       }
2806       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2807
2808       client_sa_size = sizeof (client_sa);
2809       client_sock->fd = accept (pollfds[i].fd,
2810           (struct sockaddr *) &client_sa, &client_sa_size);
2811       if (client_sock->fd < 0)
2812       {
2813         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2814         free(client_sock);
2815         continue;
2816       }
2817
2818       pthread_attr_init (&attr);
2819       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2820
2821       status = pthread_create (&tid, &attr, connection_thread_main,
2822                                client_sock);
2823       if (status != 0)
2824       {
2825         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2826         close_connection(client_sock);
2827         continue;
2828       }
2829     } /* for (pollfds_num) */
2830   } /* while (state == RUNNING) */
2831
2832   RRDD_LOG(LOG_INFO, "starting shutdown");
2833
2834   close_listen_sockets ();
2835
2836   pthread_mutex_lock (&connection_threads_lock);
2837   while (connection_threads_num > 0)
2838     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2839   pthread_mutex_unlock (&connection_threads_lock);
2840
2841   free(pollfds);
2842
2843   return (NULL);
2844 } /* }}} void *listen_thread_main */
2845
2846 static int daemonize (void) /* {{{ */
2847 {
2848   int pid_fd;
2849   char *base_dir;
2850
2851   daemon_uid = geteuid();
2852
2853   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2854   if (pid_fd < 0)
2855     pid_fd = check_pidfile();
2856   if (pid_fd < 0)
2857     return pid_fd;
2858
2859   /* open all the listen sockets */
2860   if (config_listen_address_list_len > 0)
2861   {
2862     for (size_t i = 0; i < config_listen_address_list_len; i++)
2863       open_listen_socket (config_listen_address_list[i]);
2864
2865     rrd_free_ptrs((void ***) &config_listen_address_list,
2866                   &config_listen_address_list_len);
2867   }
2868   else
2869   {
2870     listen_socket_t sock;
2871     memset(&sock, 0, sizeof(sock));
2872     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2873     open_listen_socket (&sock);
2874   }
2875
2876   if (listen_fds_num < 1)
2877   {
2878     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2879     goto error;
2880   }
2881
2882   if (!stay_foreground)
2883   {
2884     pid_t child;
2885
2886     child = fork ();
2887     if (child < 0)
2888     {
2889       fprintf (stderr, "daemonize: fork(2) failed.\n");
2890       goto error;
2891     }
2892     else if (child > 0)
2893       exit(0);
2894
2895     /* Become session leader */
2896     setsid ();
2897
2898     /* Open the first three file descriptors to /dev/null */
2899     close (2);
2900     close (1);
2901     close (0);
2902
2903     open ("/dev/null", O_RDWR);
2904     if (dup(0) == -1 || dup(0) == -1){
2905         RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2906     }
2907   } /* if (!stay_foreground) */
2908
2909   /* Change into the /tmp directory. */
2910   base_dir = (config_base_dir != NULL)
2911     ? config_base_dir
2912     : "/tmp";
2913
2914   if (chdir (base_dir) != 0)
2915   {
2916     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2917     goto error;
2918   }
2919
2920   install_signal_handlers();
2921
2922   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2923   RRDD_LOG(LOG_INFO, "starting up");
2924
2925   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2926                                 (GDestroyNotify) free_cache_item);
2927   if (cache_tree == NULL)
2928   {
2929     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2930     goto error;
2931   }
2932
2933   return write_pidfile (pid_fd);
2934
2935 error:
2936   remove_pidfile();
2937   return -1;
2938 } /* }}} int daemonize */
2939
2940 static int cleanup (void) /* {{{ */
2941 {
2942   pthread_cond_broadcast (&flush_cond);
2943   pthread_join (flush_thread, NULL);
2944
2945   pthread_cond_broadcast (&queue_cond);
2946   for (int i = 0; i < config_queue_threads; i++)
2947     pthread_join (queue_threads[i], NULL);
2948
2949   if (config_flush_at_shutdown)
2950   {
2951     assert(cache_queue_head == NULL);
2952     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2953   }
2954
2955   free(queue_threads);
2956   free(config_base_dir);
2957
2958   pthread_mutex_lock(&cache_lock);
2959   g_tree_destroy(cache_tree);
2960
2961   pthread_mutex_lock(&journal_lock);
2962   journal_done();
2963
2964   RRDD_LOG(LOG_INFO, "goodbye");
2965   closelog ();
2966
2967   remove_pidfile ();
2968   free(config_pid_file);
2969
2970   return (0);
2971 } /* }}} int cleanup */
2972
2973 static int read_options (int argc, char **argv) /* {{{ */
2974 {
2975   int option;
2976   int status = 0;
2977
2978   char **permissions = NULL;
2979   size_t permissions_len = 0;
2980
2981   gid_t  socket_group = (gid_t)-1;
2982   mode_t socket_permissions = (mode_t)-1;
2983
2984   while ((option = getopt(argc, argv, "gl:s:m:P:f:w:z:t:Bb:p:Fj:a:h?")) != -1)
2985   {
2986     switch (option)
2987     {
2988       case 'g':
2989         stay_foreground=1;
2990         break;
2991
2992       case 'l':
2993       {
2994         listen_socket_t *new;
2995
2996         new = malloc(sizeof(listen_socket_t));
2997         if (new == NULL)
2998         {
2999           fprintf(stderr, "read_options: malloc failed.\n");
3000           return(2);
3001         }
3002         memset(new, 0, sizeof(listen_socket_t));
3003
3004         strncpy(new->addr, optarg, sizeof(new->addr)-1);
3005
3006         /* Add permissions to the socket {{{ */
3007         if (permissions_len != 0)
3008         {
3009           size_t i;
3010           for (i = 0; i < permissions_len; i++)
3011           {
3012             status = socket_permission_add (new, permissions[i]);
3013             if (status != 0)
3014             {
3015               fprintf (stderr, "read_options: Adding permission \"%s\" to "
3016                   "socket failed. Most likely, this permission doesn't "
3017                   "exist. Check your command line.\n", permissions[i]);
3018               status = 4;
3019             }
3020           }
3021         }
3022         else /* if (permissions_len == 0) */
3023         {
3024           /* Add permission for ALL commands to the socket. */
3025           size_t i;
3026           for (i = 0; i < list_of_commands_len; i++)
3027           {
3028             status = socket_permission_add (new, list_of_commands[i].cmd);
3029             if (status != 0)
3030             {
3031               fprintf (stderr, "read_options: Adding permission \"%s\" to "
3032                   "socket failed. This should never happen, ever! Sorry.\n",
3033                   permissions[i]);
3034               status = 4;
3035             }
3036           }
3037         }
3038         /* }}} Done adding permissions. */
3039
3040         new->socket_group = socket_group;
3041         new->socket_permissions = socket_permissions;
3042
3043         if (!rrd_add_ptr((void ***)&config_listen_address_list,
3044                          &config_listen_address_list_len, new))
3045         {
3046           fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
3047           return (2);
3048         }
3049       }
3050       break;
3051
3052       /* set socket group permissions */
3053       case 's':
3054       {
3055         gid_t group_gid;
3056         struct group *grp;
3057
3058         group_gid = strtoul(optarg, NULL, 10);
3059         if (errno != EINVAL && group_gid>0)
3060         {
3061           /* we were passed a number */
3062           grp = getgrgid(group_gid);
3063         }
3064         else
3065         {
3066           grp = getgrnam(optarg);
3067         }
3068
3069         if (grp)
3070         {
3071           socket_group = grp->gr_gid;
3072         }
3073         else
3074         {
3075           /* no idea what the user wanted... */
3076           fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
3077           return (5);
3078         }
3079       }
3080       break;
3081
3082       /* set socket file permissions */
3083       case 'm':
3084       {
3085         long  tmp;
3086         char *endptr = NULL;
3087
3088         tmp = strtol (optarg, &endptr, 8);
3089         if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
3090             || (tmp > 07777) || (tmp < 0)) {
3091           fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
3092               optarg);
3093           return (5);
3094         }
3095
3096         socket_permissions = (mode_t)tmp;
3097       }
3098       break;
3099
3100       case 'P':
3101       {
3102         char *optcopy;
3103         char *saveptr;
3104         char *dummy;
3105         char *ptr;
3106
3107         rrd_free_ptrs ((void *) &permissions, &permissions_len);
3108
3109         optcopy = strdup (optarg);
3110         dummy = optcopy;
3111         saveptr = NULL;
3112         while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
3113         {
3114           dummy = NULL;
3115           rrd_add_strdup ((void *) &permissions, &permissions_len, ptr);
3116         }
3117
3118         free (optcopy);
3119       }
3120       break;
3121
3122       case 'f':
3123       {
3124         int temp;
3125
3126         temp = atoi (optarg);
3127         if (temp > 0)
3128           config_flush_interval = temp;
3129         else
3130         {
3131           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
3132           status = 3;
3133         }
3134       }
3135       break;
3136
3137       case 'w':
3138       {
3139         int temp;
3140
3141         temp = atoi (optarg);
3142         if (temp > 0)
3143           config_write_interval = temp;
3144         else
3145         {
3146           fprintf (stderr, "Invalid write interval: %s\n", optarg);
3147           status = 2;
3148         }
3149       }
3150       break;
3151
3152       case 'z':
3153       {
3154         int temp;
3155
3156         temp = atoi(optarg);
3157         if (temp > 0)
3158           config_write_jitter = temp;
3159         else
3160         {
3161           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
3162           status = 2;
3163         }
3164
3165         break;
3166       }
3167
3168       case 't':
3169       {
3170         int threads;
3171         threads = atoi(optarg);
3172         if (threads >= 1)
3173           config_queue_threads = threads;
3174         else
3175         {
3176           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
3177           return 1;
3178         }
3179       }
3180       break;
3181
3182       case 'B':
3183         config_write_base_only = 1;
3184         break;
3185
3186       case 'b':
3187       {
3188         size_t len;
3189         char base_realpath[PATH_MAX];
3190
3191         if (config_base_dir != NULL)
3192           free (config_base_dir);
3193         config_base_dir = strdup (optarg);
3194         if (config_base_dir == NULL)
3195         {
3196           fprintf (stderr, "read_options: strdup failed.\n");
3197           return (3);
3198         }
3199
3200         if (rrd_mkdir_p (config_base_dir, 0777) != 0)
3201         {
3202           fprintf (stderr, "Failed to create base directory '%s': %s\n",
3203               config_base_dir, rrd_strerror (errno));
3204           return (3);
3205         }
3206
3207         /* make sure that the base directory is not resolved via
3208          * symbolic links.  this makes some performance-enhancing
3209          * assumptions possible (we don't have to resolve paths
3210          * that start with a "/")
3211          */
3212         if (realpath(config_base_dir, base_realpath) == NULL)
3213         {
3214           fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3215               "%s\n", config_base_dir, rrd_strerror(errno));
3216           return 5;
3217         }
3218
3219         len = strlen (config_base_dir);
3220         while ((len > 0) && (config_base_dir[len - 1] == '/'))
3221         {
3222           config_base_dir[len - 1] = 0;
3223           len--;
3224         }
3225
3226         if (len < 1)
3227         {
3228           fprintf (stderr, "Invalid base directory: %s\n", optarg);
3229           return (4);
3230         }
3231
3232         _config_base_dir_len = len;
3233
3234         len = strlen (base_realpath);
3235         while ((len > 0) && (base_realpath[len - 1] == '/'))
3236         {
3237           base_realpath[len - 1] = '\0';
3238           len--;
3239         }
3240
3241         if (strncmp(config_base_dir,
3242                          base_realpath, sizeof(base_realpath)) != 0)
3243         {
3244           fprintf(stderr,
3245                   "Base directory (-b) resolved via file system links!\n"
3246                   "Please consult rrdcached '-b' documentation!\n"
3247                   "Consider specifying the real directory (%s)\n",
3248                   base_realpath);
3249           return 5;
3250         }
3251       }
3252       break;
3253
3254       case 'p':
3255       {
3256         if (config_pid_file != NULL)
3257           free (config_pid_file);
3258         config_pid_file = strdup (optarg);
3259         if (config_pid_file == NULL)
3260         {
3261           fprintf (stderr, "read_options: strdup failed.\n");
3262           return (3);
3263         }
3264       }
3265       break;
3266
3267       case 'F':
3268         config_flush_at_shutdown = 1;
3269         break;
3270
3271       case 'j':
3272       {
3273         char journal_dir_actual[PATH_MAX];
3274         const char *dir;
3275         dir = journal_dir = strdup(realpath((const char *)optarg, journal_dir_actual));
3276
3277         status = rrd_mkdir_p(dir, 0777);
3278         if (status != 0)
3279         {
3280           fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3281               dir, rrd_strerror(errno));
3282           return 6;
3283         }
3284
3285         if (access(dir, R_OK|W_OK|X_OK) != 0)
3286         {
3287           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3288                   errno ? rrd_strerror(errno) : "");
3289           return 6;
3290         }
3291       }
3292       break;
3293
3294       case 'a':
3295       {
3296         int temp = atoi(optarg);
3297         if (temp > 0)
3298           config_alloc_chunk = temp;
3299         else
3300         {
3301           fprintf(stderr, "Invalid allocation size: %s\n", optarg);
3302           return 10;
3303         }
3304       }
3305       break;
3306
3307       case 'h':
3308       case '?':
3309         printf ("RRDCacheD %s\n"
3310             "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3311             "\n"
3312             "Usage: rrdcached [options]\n"
3313             "\n"
3314             "Valid options are:\n"
3315             "  -l <address>  Socket address to listen to.\n"
3316             "  -P <perms>    Sets the permissions to assign to all following "
3317                             "sockets\n"
3318             "  -w <seconds>  Interval in which to write data.\n"
3319             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
3320             "  -t <threads>  Number of write threads.\n"
3321             "  -f <seconds>  Interval in which to flush dead data.\n"
3322             "  -p <file>     Location of the PID-file.\n"
3323             "  -b <dir>      Base directory to change to.\n"
3324             "  -B            Restrict file access to paths within -b <dir>\n"
3325             "  -g            Do not fork and run in the foreground.\n"
3326             "  -j <dir>      Directory in which to create the journal files.\n"
3327             "  -F            Always flush all updates at shutdown\n"
3328             "  -s <id|name>  Group owner of all following UNIX sockets\n"
3329             "                (the socket will also have read/write permissions "
3330                             "for that group)\n"
3331             "  -m <mode>     File permissions (octal) of all following UNIX "
3332                             "sockets\n"
3333             "  -a <size>     Memory allocation chunk size. Default is 1."
3334             "\n"
3335             "For more information and a detailed description of all options "
3336             "please refer\n"
3337             "to the rrdcached(1) manual page.\n",
3338             VERSION);
3339         if (option == 'h')
3340           status = -1;
3341         else
3342           status = 1;
3343         break;
3344     } /* switch (option) */
3345   } /* while (getopt) */
3346
3347   /* advise the user when values are not sane */
3348   if (config_flush_interval < 2 * config_write_interval)
3349     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3350             " 2x write interval (-w) !\n");
3351   if (config_write_jitter > config_write_interval)
3352     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3353             " write interval (-w) !\n");
3354
3355   if (config_write_base_only && config_base_dir == NULL)
3356     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3357             "  Consult the rrdcached documentation\n");
3358
3359   if (journal_dir == NULL)
3360     config_flush_at_shutdown = 1;
3361
3362   rrd_free_ptrs ((void *) &permissions, &permissions_len);
3363
3364   return (status);
3365 } /* }}} int read_options */
3366
3367 int main (int argc, char **argv)
3368 {
3369   int status;
3370
3371   status = read_options (argc, argv);
3372   if (status != 0)
3373   {
3374     if (status < 0)
3375       status = 0;
3376     return (status);
3377   }
3378
3379   status = daemonize ();
3380   if (status != 0)
3381   {
3382     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3383     return (1);
3384   }
3385
3386   journal_init();
3387
3388   /* start the queue threads */
3389   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3390   if (queue_threads == NULL)
3391   {
3392     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3393     cleanup();
3394     return (1);
3395   }
3396   for (int i = 0; i < config_queue_threads; i++)
3397   {
3398     memset (&queue_threads[i], 0, sizeof (*queue_threads));
3399     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3400     if (status != 0)
3401     {
3402       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3403       cleanup();
3404       return (1);
3405     }
3406   }
3407
3408   /* start the flush thread */
3409   memset(&flush_thread, 0, sizeof(flush_thread));
3410   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3411   if (status != 0)
3412   {
3413     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3414     cleanup();
3415     return (1);
3416   }
3417
3418   listen_thread_main (NULL);
3419   cleanup ();
3420
3421   return (0);
3422 } /* int main */
3423
3424 /*
3425  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3426  */