3be6e7845487186af62697a67761a2ec0aa5d05f
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008-2010 Florian octo Forster
4  * Copyright (C) 2008,2009 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
73
74 #include "rrd.h"
75 #include "rrd_client.h"
76 #include "unused.h"
77
78 #include <stdlib.h>
79
80 #ifndef WIN32
81 #ifdef HAVE_STDINT_H
82 #  include <stdint.h>
83 #endif
84 #include <unistd.h>
85 #include <strings.h>
86 #include <inttypes.h>
87 #include <sys/socket.h>
88
89 #else
90
91 #endif
92 #include <stdio.h>
93 #include <string.h>
94
95 #include <sys/types.h>
96 #include <sys/stat.h>
97 #include <dirent.h>
98 #include <fcntl.h>
99 #include <signal.h>
100 #include <sys/un.h>
101 #include <netdb.h>
102 #include <poll.h>
103 #include <syslog.h>
104 #include <pthread.h>
105 #include <errno.h>
106 #include <assert.h>
107 #include <sys/time.h>
108 #include <time.h>
109 #include <libgen.h>
110 #include <grp.h>
111
112 #include <glib-2.0/glib.h>
113 /* }}} */
114
115 #define RRDD_LOG(severity, ...) \
116   do { \
117     if (stay_foreground) \
118       fprintf(stderr, __VA_ARGS__); \
119     syslog ((severity), __VA_ARGS__); \
120   } while (0)
121
122 /*
123  * Types
124  */
125 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
126
127 struct listen_socket_s
128 {
129   int fd;
130   char addr[PATH_MAX + 1];
131   int family;
132
133   /* state for BATCH processing */
134   time_t batch_start;
135   int batch_cmd;
136
137   /* buffered IO */
138   char *rbuf;
139   off_t next_cmd;
140   off_t next_read;
141
142   char *wbuf;
143   ssize_t wbuf_len;
144
145   uint32_t permissions;
146
147   gid_t  socket_group;
148   mode_t socket_permissions;
149 };
150 typedef struct listen_socket_s listen_socket_t;
151
152 struct command_s;
153 typedef struct command_s command_t;
154 /* note: guard against "unused" warnings in the handlers */
155 #define DISPATCH_PROTO  listen_socket_t UNUSED(*sock),\
156                         time_t UNUSED(now),\
157                         char  UNUSED(*buffer),\
158                         size_t UNUSED(buffer_size)
159
160 #define HANDLER_PROTO   command_t UNUSED(*cmd),\
161                         DISPATCH_PROTO
162
163 struct command_s {
164   char   *cmd;
165   int (*handler)(HANDLER_PROTO);
166
167   char  context;                /* where we expect to see it */
168 #define CMD_CONTEXT_CLIENT      (1<<0)
169 #define CMD_CONTEXT_BATCH       (1<<1)
170 #define CMD_CONTEXT_JOURNAL     (1<<2)
171 #define CMD_CONTEXT_ANY         (0x7f)
172
173   char *syntax;
174   char *help;
175 };
176
177 struct cache_item_s;
178 typedef struct cache_item_s cache_item_t;
179 struct cache_item_s
180 {
181   char *file;
182   char **values;
183   size_t values_num;            /* number of valid pointers */
184   size_t values_alloc;          /* number of allocated pointers */
185   time_t last_flush_time;
186   time_t last_update_stamp;
187 #define CI_FLAGS_IN_TREE  (1<<0)
188 #define CI_FLAGS_IN_QUEUE (1<<1)
189   int flags;
190   pthread_cond_t  flushed;
191   cache_item_t *prev;
192   cache_item_t *next;
193 };
194
195 struct callback_flush_data_s
196 {
197   time_t now;
198   time_t abs_timeout;
199   char **keys;
200   size_t keys_num;
201 };
202 typedef struct callback_flush_data_s callback_flush_data_t;
203
204 enum queue_side_e
205 {
206   HEAD,
207   TAIL
208 };
209 typedef enum queue_side_e queue_side_t;
210
211 /* describe a set of journal files */
212 typedef struct {
213   char **files;
214   size_t files_num;
215 } journal_set;
216
217 /* max length of socket command or response */
218 #define CMD_MAX 4096
219 #define RBUF_SIZE (CMD_MAX*2)
220
221 /*
222  * Variables
223  */
224 static int stay_foreground = 0;
225 static uid_t daemon_uid;
226
227 static listen_socket_t *listen_fds = NULL;
228 static size_t listen_fds_num = 0;
229
230 enum {
231   RUNNING,              /* normal operation */
232   FLUSHING,             /* flushing remaining values */
233   SHUTDOWN              /* shutting down */
234 } state = RUNNING;
235
236 static pthread_t *queue_threads;
237 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
238 static int config_queue_threads = 4;
239
240 static pthread_t flush_thread;
241 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
242
243 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
244 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
245 static int connection_threads_num = 0;
246
247 /* Cache stuff */
248 static GTree          *cache_tree = NULL;
249 static cache_item_t   *cache_queue_head = NULL;
250 static cache_item_t   *cache_queue_tail = NULL;
251 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
252
253 static int config_write_interval = 300;
254 static int config_write_jitter   = 0;
255 static int config_flush_interval = 3600;
256 static int config_flush_at_shutdown = 0;
257 static char *config_pid_file = NULL;
258 static char *config_base_dir = NULL;
259 static size_t _config_base_dir_len = 0;
260 static int config_write_base_only = 0;
261 static size_t config_alloc_chunk = 1;
262
263 static listen_socket_t **config_listen_address_list = NULL;
264 static size_t config_listen_address_list_len = 0;
265
266 static uint64_t stats_queue_length = 0;
267 static uint64_t stats_updates_received = 0;
268 static uint64_t stats_flush_received = 0;
269 static uint64_t stats_updates_written = 0;
270 static uint64_t stats_data_sets_written = 0;
271 static uint64_t stats_journal_bytes = 0;
272 static uint64_t stats_journal_rotate = 0;
273 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
274
275 /* Journaled updates */
276 #define JOURNAL_REPLAY(s) ((s) == NULL)
277 #define JOURNAL_BASE "rrd.journal"
278 static journal_set *journal_cur = NULL;
279 static journal_set *journal_old = NULL;
280 static char *journal_dir = NULL;
281 static FILE *journal_fh = NULL;         /* current journal file handle */
282 static long  journal_size = 0;          /* current journal size */
283 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
284 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
285 static int journal_write(char *cmd, char *args);
286 static void journal_done(void);
287 static void journal_rotate(void);
288
289 /* prototypes for forward refernces */
290 static int handle_request_help (HANDLER_PROTO);
291
292 /* 
293  * Functions
294  */
295 static void sig_common (const char *sig) /* {{{ */
296 {
297   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
298   state = FLUSHING;
299   pthread_cond_broadcast(&flush_cond);
300   pthread_cond_broadcast(&queue_cond);
301 } /* }}} void sig_common */
302
303 static void sig_int_handler (int UNUSED(s)) /* {{{ */
304 {
305   sig_common("INT");
306 } /* }}} void sig_int_handler */
307
308 static void sig_term_handler (int UNUSED(s)) /* {{{ */
309 {
310   sig_common("TERM");
311 } /* }}} void sig_term_handler */
312
313 static void sig_usr1_handler (int UNUSED(s)) /* {{{ */
314 {
315   config_flush_at_shutdown = 1;
316   sig_common("USR1");
317 } /* }}} void sig_usr1_handler */
318
319 static void sig_usr2_handler (int UNUSED(s)) /* {{{ */
320 {
321   config_flush_at_shutdown = 0;
322   sig_common("USR2");
323 } /* }}} void sig_usr2_handler */
324
325 static void install_signal_handlers(void) /* {{{ */
326 {
327   /* These structures are static, because `sigaction' behaves weird if the are
328    * overwritten.. */
329   static struct sigaction sa_int;
330   static struct sigaction sa_term;
331   static struct sigaction sa_pipe;
332   static struct sigaction sa_usr1;
333   static struct sigaction sa_usr2;
334
335   /* Install signal handlers */
336   memset (&sa_int, 0, sizeof (sa_int));
337   sa_int.sa_handler = sig_int_handler;
338   sigaction (SIGINT, &sa_int, NULL);
339
340   memset (&sa_term, 0, sizeof (sa_term));
341   sa_term.sa_handler = sig_term_handler;
342   sigaction (SIGTERM, &sa_term, NULL);
343
344   memset (&sa_pipe, 0, sizeof (sa_pipe));
345   sa_pipe.sa_handler = SIG_IGN;
346   sigaction (SIGPIPE, &sa_pipe, NULL);
347
348   memset (&sa_pipe, 0, sizeof (sa_usr1));
349   sa_usr1.sa_handler = sig_usr1_handler;
350   sigaction (SIGUSR1, &sa_usr1, NULL);
351
352   memset (&sa_usr2, 0, sizeof (sa_usr2));
353   sa_usr2.sa_handler = sig_usr2_handler;
354   sigaction (SIGUSR2, &sa_usr2, NULL);
355
356 } /* }}} void install_signal_handlers */
357
358 static int open_pidfile(char *action, int oflag) /* {{{ */
359 {
360   int fd;
361   const char *file;
362   char *file_copy, *dir;
363
364   file = (config_pid_file != NULL)
365     ? config_pid_file
366     : LOCALSTATEDIR "/run/rrdcached.pid";
367
368   /* dirname may modify its argument */
369   file_copy = strdup(file);
370   if (file_copy == NULL)
371   {
372     fprintf(stderr, "rrdcached: strdup(): %s\n",
373         rrd_strerror(errno));
374     return -1;
375   }
376
377   dir = dirname(file_copy);
378   if (rrd_mkdir_p(dir, 0777) != 0)
379   {
380     fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
381         dir, rrd_strerror(errno));
382     return -1;
383   }
384
385   free(file_copy);
386
387   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
388   if (fd < 0)
389     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
390             action, file, rrd_strerror(errno));
391
392   return(fd);
393 } /* }}} static int open_pidfile */
394
395 /* check existing pid file to see whether a daemon is running */
396 static int check_pidfile(void)
397 {
398   int pid_fd;
399   pid_t pid;
400   char pid_str[16];
401
402   pid_fd = open_pidfile("open", O_RDWR);
403   if (pid_fd < 0)
404     return pid_fd;
405
406   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
407     return -1;
408
409   pid = atoi(pid_str);
410   if (pid <= 0)
411     return -1;
412
413   /* another running process that we can signal COULD be
414    * a competing rrdcached */
415   if (pid != getpid() && kill(pid, 0) == 0)
416   {
417     fprintf(stderr,
418             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
419     close(pid_fd);
420     return -1;
421   }
422
423   lseek(pid_fd, 0, SEEK_SET);
424   if (ftruncate(pid_fd, 0) == -1)
425   {
426     fprintf(stderr,
427             "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
428     close(pid_fd);
429     return -1;
430   }
431
432   fprintf(stderr,
433           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
434           "rrdcached: starting normally.\n", pid);
435
436   return pid_fd;
437 } /* }}} static int check_pidfile */
438
439 static int write_pidfile (int fd) /* {{{ */
440 {
441   pid_t pid;
442   FILE *fh;
443
444   pid = getpid ();
445
446   fh = fdopen (fd, "w");
447   if (fh == NULL)
448   {
449     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
450     close(fd);
451     return (-1);
452   }
453
454   fprintf (fh, "%i\n", (int) pid);
455   fclose (fh);
456
457   return (0);
458 } /* }}} int write_pidfile */
459
460 static int remove_pidfile (void) /* {{{ */
461 {
462   char *file;
463   int status;
464
465   file = (config_pid_file != NULL)
466     ? config_pid_file
467     : LOCALSTATEDIR "/run/rrdcached.pid";
468
469   status = unlink (file);
470   if (status == 0)
471     return (0);
472   return (errno);
473 } /* }}} int remove_pidfile */
474
475 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
476 {
477   char *eol;
478
479   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
480                sock->next_read - sock->next_cmd);
481
482   if (eol == NULL)
483   {
484     /* no commands left, move remainder back to front of rbuf */
485     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
486             sock->next_read - sock->next_cmd);
487     sock->next_read -= sock->next_cmd;
488     sock->next_cmd = 0;
489     *len = 0;
490     return NULL;
491   }
492   else
493   {
494     char *cmd = sock->rbuf + sock->next_cmd;
495     *eol = '\0';
496
497     sock->next_cmd = eol - sock->rbuf + 1;
498
499     if (eol > sock->rbuf && *(eol-1) == '\r')
500       *(--eol) = '\0'; /* handle "\r\n" EOL */
501
502     *len = eol - cmd;
503
504     return cmd;
505   }
506
507   /* NOTREACHED */
508   assert(1==0);
509 } /* }}} char *next_cmd */
510
511 /* add the characters directly to the write buffer */
512 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
513 {
514   char *new_buf;
515
516   assert(sock != NULL);
517
518   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
519   if (new_buf == NULL)
520   {
521     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
522     return -1;
523   }
524
525   strncpy(new_buf + sock->wbuf_len, str, len + 1);
526
527   sock->wbuf = new_buf;
528   sock->wbuf_len += len;
529
530   return 0;
531 } /* }}} static int add_to_wbuf */
532
533 /* add the text to the "extra" info that's sent after the status line */
534 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
535 {
536   va_list argp;
537   char buffer[CMD_MAX];
538   int len;
539
540   if (JOURNAL_REPLAY(sock)) return 0;
541   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
542
543   va_start(argp, fmt);
544 #ifdef HAVE_VSNPRINTF
545   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
546 #else
547   len = vsprintf(buffer, fmt, argp);
548 #endif
549   va_end(argp);
550   if (len < 0)
551   {
552     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
553     return -1;
554   }
555
556   return add_to_wbuf(sock, buffer, len);
557 } /* }}} static int add_response_info */
558
559 static int count_lines(char *str) /* {{{ */
560 {
561   int lines = 0;
562
563   if (str != NULL)
564   {
565     while ((str = strchr(str, '\n')) != NULL)
566     {
567       ++lines;
568       ++str;
569     }
570   }
571
572   return lines;
573 } /* }}} static int count_lines */
574
575 /* send the response back to the user.
576  * returns 0 on success, -1 on error
577  * write buffer is always zeroed after this call */
578 static int send_response (listen_socket_t *sock, response_code rc,
579                           char *fmt, ...) /* {{{ */
580 {
581   va_list argp;
582   char buffer[CMD_MAX];
583   int lines;
584   ssize_t wrote;
585   int rclen, len;
586
587   if (JOURNAL_REPLAY(sock)) return rc;
588
589   if (sock->batch_start)
590   {
591     if (rc == RESP_OK)
592       return rc; /* no response on success during BATCH */
593     lines = sock->batch_cmd;
594   }
595   else if (rc == RESP_OK)
596     lines = count_lines(sock->wbuf);
597   else
598     lines = -1;
599
600   rclen = sprintf(buffer, "%d ", lines);
601   va_start(argp, fmt);
602 #ifdef HAVE_VSNPRINTF
603   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
604 #else
605   len = vsprintf(buffer+rclen, fmt, argp);
606 #endif
607   va_end(argp);
608   if (len < 0)
609     return -1;
610
611   len += rclen;
612
613   /* append the result to the wbuf, don't write to the user */
614   if (sock->batch_start)
615     return add_to_wbuf(sock, buffer, len);
616
617   /* first write must be complete */
618   if (len != write(sock->fd, buffer, len))
619   {
620     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
621     return -1;
622   }
623
624   if (sock->wbuf != NULL && rc == RESP_OK)
625   {
626     wrote = 0;
627     while (wrote < sock->wbuf_len)
628     {
629       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
630       if (wb <= 0)
631       {
632         RRDD_LOG(LOG_INFO, "send_response: could not write results");
633         return -1;
634       }
635       wrote += wb;
636     }
637   }
638
639   free(sock->wbuf); sock->wbuf = NULL;
640   sock->wbuf_len = 0;
641
642   return 0;
643 } /* }}} */
644
645 static void wipe_ci_values(cache_item_t *ci, time_t when)
646 {
647   ci->values = NULL;
648   ci->values_num = 0;
649   ci->values_alloc = 0;
650
651   ci->last_flush_time = when;
652   if (config_write_jitter > 0)
653     ci->last_flush_time += (rrd_random() % config_write_jitter);
654 }
655
656 /* remove_from_queue
657  * remove a "cache_item_t" item from the queue.
658  * must hold 'cache_lock' when calling this
659  */
660 static void remove_from_queue(cache_item_t *ci) /* {{{ */
661 {
662   if (ci == NULL) return;
663   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
664
665   if (ci->prev == NULL)
666     cache_queue_head = ci->next; /* reset head */
667   else
668     ci->prev->next = ci->next;
669
670   if (ci->next == NULL)
671     cache_queue_tail = ci->prev; /* reset the tail */
672   else
673     ci->next->prev = ci->prev;
674
675   ci->next = ci->prev = NULL;
676   ci->flags &= ~CI_FLAGS_IN_QUEUE;
677
678   pthread_mutex_lock (&stats_lock);
679   assert (stats_queue_length > 0);
680   stats_queue_length--;
681   pthread_mutex_unlock (&stats_lock);
682
683 } /* }}} static void remove_from_queue */
684
685 /* free the resources associated with the cache_item_t
686  * must hold cache_lock when calling this function
687  */
688 static void *free_cache_item(cache_item_t *ci) /* {{{ */
689 {
690   if (ci == NULL) return NULL;
691
692   remove_from_queue(ci);
693
694   for (size_t i=0; i < ci->values_num; i++)
695     free(ci->values[i]);
696
697   free (ci->values);
698   free (ci->file);
699
700   /* in case anyone is waiting */
701   pthread_cond_broadcast(&ci->flushed);
702   pthread_cond_destroy(&ci->flushed);
703
704   free (ci);
705
706   return NULL;
707 } /* }}} static void *free_cache_item */
708
709 /*
710  * enqueue_cache_item:
711  * `cache_lock' must be acquired before calling this function!
712  */
713 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
714     queue_side_t side)
715 {
716   if (ci == NULL)
717     return (-1);
718
719   if (ci->values_num == 0)
720     return (0);
721
722   if (side == HEAD)
723   {
724     if (cache_queue_head == ci)
725       return 0;
726
727     /* remove if further down in queue */
728     remove_from_queue(ci);
729
730     ci->prev = NULL;
731     ci->next = cache_queue_head;
732     if (ci->next != NULL)
733       ci->next->prev = ci;
734     cache_queue_head = ci;
735
736     if (cache_queue_tail == NULL)
737       cache_queue_tail = cache_queue_head;
738   }
739   else /* (side == TAIL) */
740   {
741     /* We don't move values back in the list.. */
742     if (ci->flags & CI_FLAGS_IN_QUEUE)
743       return (0);
744
745     assert (ci->next == NULL);
746     assert (ci->prev == NULL);
747
748     ci->prev = cache_queue_tail;
749
750     if (cache_queue_tail == NULL)
751       cache_queue_head = ci;
752     else
753       cache_queue_tail->next = ci;
754
755     cache_queue_tail = ci;
756   }
757
758   ci->flags |= CI_FLAGS_IN_QUEUE;
759
760   pthread_cond_signal(&queue_cond);
761   pthread_mutex_lock (&stats_lock);
762   stats_queue_length++;
763   pthread_mutex_unlock (&stats_lock);
764
765   return (0);
766 } /* }}} int enqueue_cache_item */
767
768 /*
769  * tree_callback_flush:
770  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
771  * while this is in progress.
772  */
773 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
774     gpointer data)
775 {
776   cache_item_t *ci;
777   callback_flush_data_t *cfd;
778
779   ci = (cache_item_t *) value;
780   cfd = (callback_flush_data_t *) data;
781
782   if (ci->flags & CI_FLAGS_IN_QUEUE)
783     return FALSE;
784
785   if (ci->values_num > 0
786       && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
787   {
788     enqueue_cache_item (ci, TAIL);
789   }
790   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
791       && (ci->values_num <= 0))
792   {
793     assert ((char *) key == ci->file);
794     if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
795     {
796       RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
797       return (FALSE);
798     }
799   }
800
801   return (FALSE);
802 } /* }}} gboolean tree_callback_flush */
803
804 static int flush_old_values (int max_age)
805 {
806   callback_flush_data_t cfd;
807   size_t k;
808
809   memset (&cfd, 0, sizeof (cfd));
810   /* Pass the current time as user data so that we don't need to call
811    * `time' for each node. */
812   cfd.now = time (NULL);
813   cfd.keys = NULL;
814   cfd.keys_num = 0;
815
816   if (max_age > 0)
817     cfd.abs_timeout = cfd.now - max_age;
818   else
819     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
820
821   /* `tree_callback_flush' will return the keys of all values that haven't
822    * been touched in the last `config_flush_interval' seconds in `cfd'.
823    * The char*'s in this array point to the same memory as ci->file, so we
824    * don't need to free them separately. */
825   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
826
827   for (k = 0; k < cfd.keys_num; k++)
828   {
829     gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
830     /* should never fail, since we have held the cache_lock
831      * the entire time */
832     assert(status == TRUE);
833   }
834
835   if (cfd.keys != NULL)
836   {
837     free (cfd.keys);
838     cfd.keys = NULL;
839   }
840
841   return (0);
842 } /* int flush_old_values */
843
844 static void *flush_thread_main (void UNUSED(*args)) /* {{{ */
845 {
846   struct timeval now;
847   struct timespec next_flush;
848   int status;
849
850   gettimeofday (&now, NULL);
851   next_flush.tv_sec = now.tv_sec + config_flush_interval;
852   next_flush.tv_nsec = 1000 * now.tv_usec;
853
854   pthread_mutex_lock(&cache_lock);
855
856   while (state == RUNNING)
857   {
858     gettimeofday (&now, NULL);
859     if ((now.tv_sec > next_flush.tv_sec)
860         || ((now.tv_sec == next_flush.tv_sec)
861           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
862     {
863       RRDD_LOG(LOG_DEBUG, "flushing old values");
864
865       /* Determine the time of the next cache flush. */
866       next_flush.tv_sec = now.tv_sec + config_flush_interval;
867
868       /* Flush all values that haven't been written in the last
869        * `config_write_interval' seconds. */
870       flush_old_values (config_write_interval);
871
872       /* unlock the cache while we rotate so we don't block incoming
873        * updates if the fsync() blocks on disk I/O */
874       pthread_mutex_unlock(&cache_lock);
875       journal_rotate();
876       pthread_mutex_lock(&cache_lock);
877     }
878
879     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
880     if (status != 0 && status != ETIMEDOUT)
881     {
882       RRDD_LOG (LOG_ERR, "flush_thread_main: "
883                 "pthread_cond_timedwait returned %i.", status);
884     }
885   }
886
887   if (config_flush_at_shutdown)
888     flush_old_values (-1); /* flush everything */
889
890   state = SHUTDOWN;
891
892   pthread_mutex_unlock(&cache_lock);
893
894   return NULL;
895 } /* void *flush_thread_main */
896
897 static void *queue_thread_main (void UNUSED(*args)) /* {{{ */
898 {
899   pthread_mutex_lock (&cache_lock);
900
901   while (state != SHUTDOWN
902          || (cache_queue_head != NULL && config_flush_at_shutdown))
903   {
904     cache_item_t *ci;
905     char *file;
906     char **values;
907     size_t values_num;
908     int status;
909
910     /* Now, check if there's something to store away. If not, wait until
911      * something comes in. */
912     if (cache_queue_head == NULL)
913     {
914       status = pthread_cond_wait (&queue_cond, &cache_lock);
915       if ((status != 0) && (status != ETIMEDOUT))
916       {
917         RRDD_LOG (LOG_ERR, "queue_thread_main: "
918             "pthread_cond_wait returned %i.", status);
919       }
920     }
921
922     /* Check if a value has arrived. This may be NULL if we timed out or there
923      * was an interrupt such as a signal. */
924     if (cache_queue_head == NULL)
925       continue;
926
927     ci = cache_queue_head;
928
929     /* copy the relevant parts */
930     file = strdup (ci->file);
931     if (file == NULL)
932     {
933       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
934       continue;
935     }
936
937     assert(ci->values != NULL);
938     assert(ci->values_num > 0);
939
940     values = ci->values;
941     values_num = ci->values_num;
942
943     wipe_ci_values(ci, time(NULL));
944     remove_from_queue(ci);
945
946     pthread_mutex_unlock (&cache_lock);
947
948     rrd_clear_error ();
949     status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
950     if (status != 0)
951     {
952       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
953           "rrd_update_r (%s) failed with status %i. (%s)",
954           file, status, rrd_get_error());
955     }
956
957     journal_write("wrote", file);
958
959     /* Search again in the tree.  It's possible someone issued a "FORGET"
960      * while we were writing the update values. */
961     pthread_mutex_lock(&cache_lock);
962     ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
963     if (ci)
964       pthread_cond_broadcast(&ci->flushed);
965     pthread_mutex_unlock(&cache_lock);
966
967     if (status == 0)
968     {
969       pthread_mutex_lock (&stats_lock);
970       stats_updates_written++;
971       stats_data_sets_written += values_num;
972       pthread_mutex_unlock (&stats_lock);
973     }
974
975     rrd_free_ptrs((void ***) &values, &values_num);
976     free(file);
977
978     pthread_mutex_lock (&cache_lock);
979   }
980   pthread_mutex_unlock (&cache_lock);
981
982   return (NULL);
983 } /* }}} void *queue_thread_main */
984
985 static int buffer_get_field (char **buffer_ret, /* {{{ */
986     size_t *buffer_size_ret, char **field_ret)
987 {
988   char *buffer;
989   size_t buffer_pos;
990   size_t buffer_size;
991   char *field;
992   size_t field_size;
993   int status;
994
995   buffer = *buffer_ret;
996   buffer_pos = 0;
997   buffer_size = *buffer_size_ret;
998   field = *buffer_ret;
999   field_size = 0;
1000
1001   if (buffer_size <= 0)
1002     return (-1);
1003
1004   /* This is ensured by `handle_request'. */
1005   assert (buffer[buffer_size - 1] == '\0');
1006
1007   status = -1;
1008   while (buffer_pos < buffer_size)
1009   {
1010     /* Check for end-of-field or end-of-buffer */
1011     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1012     {
1013       field[field_size] = 0;
1014       field_size++;
1015       buffer_pos++;
1016       status = 0;
1017       break;
1018     }
1019     /* Handle escaped characters. */
1020     else if (buffer[buffer_pos] == '\\')
1021     {
1022       if (buffer_pos >= (buffer_size - 1))
1023         break;
1024       buffer_pos++;
1025       field[field_size] = buffer[buffer_pos];
1026       field_size++;
1027       buffer_pos++;
1028     }
1029     /* Normal operation */ 
1030     else
1031     {
1032       field[field_size] = buffer[buffer_pos];
1033       field_size++;
1034       buffer_pos++;
1035     }
1036   } /* while (buffer_pos < buffer_size) */
1037
1038   if (status != 0)
1039     return (status);
1040
1041   *buffer_ret = buffer + buffer_pos;
1042   *buffer_size_ret = buffer_size - buffer_pos;
1043   *field_ret = field;
1044
1045   return (0);
1046 } /* }}} int buffer_get_field */
1047
1048 /* if we're restricting writes to the base directory,
1049  * check whether the file falls within the dir
1050  * returns 1 if OK, otherwise 0
1051  */
1052 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1053 {
1054   assert(file != NULL);
1055
1056   if (!config_write_base_only
1057       || JOURNAL_REPLAY(sock)
1058       || config_base_dir == NULL)
1059     return 1;
1060
1061   if (strstr(file, "../") != NULL) goto err;
1062
1063   /* relative paths without "../" are ok */
1064   if (*file != '/') return 1;
1065
1066   /* file must be of the format base + "/" + <1+ char filename> */
1067   if (strlen(file) < _config_base_dir_len + 2) goto err;
1068   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1069   if (*(file + _config_base_dir_len) != '/') goto err;
1070
1071   return 1;
1072
1073 err:
1074   if (sock != NULL && sock->fd >= 0)
1075     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1076
1077   return 0;
1078 } /* }}} static int check_file_access */
1079
1080 /* when using a base dir, convert relative paths to absolute paths.
1081  * if necessary, modifies the "filename" pointer to point
1082  * to the new path created in "tmp".  "tmp" is provided
1083  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1084  *
1085  * this allows us to optimize for the expected case (absolute path)
1086  * with a no-op.
1087  */
1088 static void get_abs_path(char **filename, char *tmp)
1089 {
1090   assert(tmp != NULL);
1091   assert(filename != NULL && *filename != NULL);
1092
1093   if (config_base_dir == NULL || **filename == '/')
1094     return;
1095
1096   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1097   *filename = tmp;
1098 } /* }}} static int get_abs_path */
1099
1100 static int flush_file (const char *filename) /* {{{ */
1101 {
1102   cache_item_t *ci;
1103
1104   pthread_mutex_lock (&cache_lock);
1105
1106   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1107   if (ci == NULL)
1108   {
1109     pthread_mutex_unlock (&cache_lock);
1110     return (ENOENT);
1111   }
1112
1113   if (ci->values_num > 0)
1114   {
1115     /* Enqueue at head */
1116     enqueue_cache_item (ci, HEAD);
1117     pthread_cond_wait(&ci->flushed, &cache_lock);
1118   }
1119
1120   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1121    * may have been purged during our cond_wait() */
1122
1123   pthread_mutex_unlock(&cache_lock);
1124
1125   return (0);
1126 } /* }}} int flush_file */
1127
1128 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1129 {
1130   char *err = "Syntax error.\n";
1131
1132   if (cmd && cmd->syntax)
1133     err = cmd->syntax;
1134
1135   return send_response(sock, RESP_ERR, "Usage: %s", err);
1136 } /* }}} static int syntax_error() */
1137
1138 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1139 {
1140   uint64_t copy_queue_length;
1141   uint64_t copy_updates_received;
1142   uint64_t copy_flush_received;
1143   uint64_t copy_updates_written;
1144   uint64_t copy_data_sets_written;
1145   uint64_t copy_journal_bytes;
1146   uint64_t copy_journal_rotate;
1147
1148   uint64_t tree_nodes_number;
1149   uint64_t tree_depth;
1150
1151   pthread_mutex_lock (&stats_lock);
1152   copy_queue_length       = stats_queue_length;
1153   copy_updates_received   = stats_updates_received;
1154   copy_flush_received     = stats_flush_received;
1155   copy_updates_written    = stats_updates_written;
1156   copy_data_sets_written  = stats_data_sets_written;
1157   copy_journal_bytes      = stats_journal_bytes;
1158   copy_journal_rotate     = stats_journal_rotate;
1159   pthread_mutex_unlock (&stats_lock);
1160
1161   pthread_mutex_lock (&cache_lock);
1162   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1163   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1164   pthread_mutex_unlock (&cache_lock);
1165
1166   add_response_info(sock,
1167                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1168   add_response_info(sock,
1169                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1170   add_response_info(sock,
1171                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1172   add_response_info(sock,
1173                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1174   add_response_info(sock,
1175                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1176   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1177   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1178   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1179   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1180
1181   send_response(sock, RESP_OK, "Statistics follow\n");
1182
1183   return (0);
1184 } /* }}} int handle_request_stats */
1185
1186 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1187 {
1188   char *file, file_tmp[PATH_MAX];
1189   int status;
1190
1191   status = buffer_get_field (&buffer, &buffer_size, &file);
1192   if (status != 0)
1193   {
1194     return syntax_error(sock,cmd);
1195   }
1196   else
1197   {
1198     pthread_mutex_lock(&stats_lock);
1199     stats_flush_received++;
1200     pthread_mutex_unlock(&stats_lock);
1201
1202     get_abs_path(&file, file_tmp);
1203     if (!check_file_access(file, sock)) return 0;
1204
1205     status = flush_file (file);
1206     if (status == 0)
1207       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1208     else if (status == ENOENT)
1209     {
1210       /* no file in our tree; see whether it exists at all */
1211       struct stat statbuf;
1212
1213       memset(&statbuf, 0, sizeof(statbuf));
1214       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1215         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1216       else
1217         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1218     }
1219     else if (status < 0)
1220       return send_response(sock, RESP_ERR, "Internal error.\n");
1221     else
1222       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1223   }
1224
1225   /* NOTREACHED */
1226   assert(1==0);
1227 } /* }}} int handle_request_flush */
1228
1229 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1230 {
1231   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1232
1233   pthread_mutex_lock(&cache_lock);
1234   flush_old_values(-1);
1235   pthread_mutex_unlock(&cache_lock);
1236
1237   return send_response(sock, RESP_OK, "Started flush.\n");
1238 } /* }}} static int handle_request_flushall */
1239
1240 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1241 {
1242   int status;
1243   char *file, file_tmp[PATH_MAX];
1244   cache_item_t *ci;
1245
1246   status = buffer_get_field(&buffer, &buffer_size, &file);
1247   if (status != 0)
1248     return syntax_error(sock,cmd);
1249
1250   get_abs_path(&file, file_tmp);
1251
1252   pthread_mutex_lock(&cache_lock);
1253   ci = g_tree_lookup(cache_tree, file);
1254   if (ci == NULL)
1255   {
1256     pthread_mutex_unlock(&cache_lock);
1257     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1258   }
1259
1260   for (size_t i=0; i < ci->values_num; i++)
1261     add_response_info(sock, "%s\n", ci->values[i]);
1262
1263   pthread_mutex_unlock(&cache_lock);
1264   return send_response(sock, RESP_OK, "updates pending\n");
1265 } /* }}} static int handle_request_pending */
1266
1267 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1268 {
1269   int status;
1270   gboolean found;
1271   char *file, file_tmp[PATH_MAX];
1272
1273   status = buffer_get_field(&buffer, &buffer_size, &file);
1274   if (status != 0)
1275     return syntax_error(sock,cmd);
1276
1277   get_abs_path(&file, file_tmp);
1278   if (!check_file_access(file, sock)) return 0;
1279
1280   pthread_mutex_lock(&cache_lock);
1281   found = g_tree_remove(cache_tree, file);
1282   pthread_mutex_unlock(&cache_lock);
1283
1284   if (found == TRUE)
1285   {
1286     if (!JOURNAL_REPLAY(sock))
1287       journal_write("forget", file);
1288
1289     return send_response(sock, RESP_OK, "Gone!\n");
1290   }
1291   else
1292     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1293
1294   /* NOTREACHED */
1295   assert(1==0);
1296 } /* }}} static int handle_request_forget */
1297
1298 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1299 {
1300   cache_item_t *ci;
1301
1302   pthread_mutex_lock(&cache_lock);
1303
1304   ci = cache_queue_head;
1305   while (ci != NULL)
1306   {
1307     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1308     ci = ci->next;
1309   }
1310
1311   pthread_mutex_unlock(&cache_lock);
1312
1313   return send_response(sock, RESP_OK, "in queue.\n");
1314 } /* }}} int handle_request_queue */
1315
1316 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1317 {
1318   char *file, file_tmp[PATH_MAX];
1319   int values_num = 0;
1320   int status;
1321   char orig_buf[CMD_MAX];
1322
1323   cache_item_t *ci;
1324
1325   /* save it for the journal later */
1326   if (!JOURNAL_REPLAY(sock))
1327     strncpy(orig_buf, buffer, buffer_size);
1328
1329   status = buffer_get_field (&buffer, &buffer_size, &file);
1330   if (status != 0)
1331     return syntax_error(sock,cmd);
1332
1333   pthread_mutex_lock(&stats_lock);
1334   stats_updates_received++;
1335   pthread_mutex_unlock(&stats_lock);
1336
1337   get_abs_path(&file, file_tmp);
1338   if (!check_file_access(file, sock)) return 0;
1339
1340   pthread_mutex_lock (&cache_lock);
1341   ci = g_tree_lookup (cache_tree, file);
1342
1343   if (ci == NULL) /* {{{ */
1344   {
1345     struct stat statbuf;
1346     cache_item_t *tmp;
1347
1348     /* don't hold the lock while we setup; stat(2) might block */
1349     pthread_mutex_unlock(&cache_lock);
1350
1351     memset (&statbuf, 0, sizeof (statbuf));
1352     status = stat (file, &statbuf);
1353     if (status != 0)
1354     {
1355       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1356
1357       status = errno;
1358       if (status == ENOENT)
1359         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1360       else
1361         return send_response(sock, RESP_ERR,
1362                              "stat failed with error %i.\n", status);
1363     }
1364     if (!S_ISREG (statbuf.st_mode))
1365       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1366
1367     if (access(file, R_OK|W_OK) != 0)
1368       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1369                            file, rrd_strerror(errno));
1370
1371     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1372     if (ci == NULL)
1373     {
1374       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1375
1376       return send_response(sock, RESP_ERR, "malloc failed.\n");
1377     }
1378     memset (ci, 0, sizeof (cache_item_t));
1379
1380     ci->file = strdup (file);
1381     if (ci->file == NULL)
1382     {
1383       free (ci);
1384       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1385
1386       return send_response(sock, RESP_ERR, "strdup failed.\n");
1387     }
1388
1389     wipe_ci_values(ci, now);
1390     ci->flags = CI_FLAGS_IN_TREE;
1391     pthread_cond_init(&ci->flushed, NULL);
1392
1393     pthread_mutex_lock(&cache_lock);
1394
1395     /* another UPDATE might have added this entry in the meantime */
1396     tmp = g_tree_lookup (cache_tree, file);
1397     if (tmp == NULL)
1398       g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1399     else
1400     {
1401       free_cache_item (ci);
1402       ci = tmp;
1403     }
1404
1405     /* state may have changed while we were unlocked */
1406     if (state == SHUTDOWN)
1407       return -1;
1408   } /* }}} */
1409   assert (ci != NULL);
1410
1411   /* don't re-write updates in replay mode */
1412   if (!JOURNAL_REPLAY(sock))
1413     journal_write("update", orig_buf);
1414
1415   while (buffer_size > 0)
1416   {
1417     char *value;
1418     time_t stamp;
1419     char *eostamp;
1420
1421     status = buffer_get_field (&buffer, &buffer_size, &value);
1422     if (status != 0)
1423     {
1424       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1425       break;
1426     }
1427
1428     /* make sure update time is always moving forward */
1429     stamp = strtol(value, &eostamp, 10);
1430     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1431     {
1432       pthread_mutex_unlock(&cache_lock);
1433       return send_response(sock, RESP_ERR,
1434                            "Cannot find timestamp in '%s'!\n", value);
1435     }
1436     else if (stamp <= ci->last_update_stamp)
1437     {
1438       pthread_mutex_unlock(&cache_lock);
1439       return send_response(sock, RESP_ERR,
1440                            "illegal attempt to update using time %ld when last"
1441                            " update time is %ld (minimum one second step)\n",
1442                            stamp, ci->last_update_stamp);
1443     }
1444     else
1445       ci->last_update_stamp = stamp;
1446
1447     if (!rrd_add_strdup_chunk(&ci->values, &ci->values_num, value,
1448                               &ci->values_alloc, config_alloc_chunk))
1449     {
1450       RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1451       continue;
1452     }
1453
1454     values_num++;
1455   }
1456
1457   if (((now - ci->last_flush_time) >= config_write_interval)
1458       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1459       && (ci->values_num > 0))
1460   {
1461     enqueue_cache_item (ci, TAIL);
1462   }
1463
1464   pthread_mutex_unlock (&cache_lock);
1465
1466   if (values_num < 1)
1467     return send_response(sock, RESP_ERR, "No values updated.\n");
1468   else
1469     return send_response(sock, RESP_OK,
1470                          "errors, enqueued %i value(s).\n", values_num);
1471
1472   /* NOTREACHED */
1473   assert(1==0);
1474
1475 } /* }}} int handle_request_update */
1476
1477 static int handle_request_fetch (HANDLER_PROTO) /* {{{ */
1478 {
1479   char *file;
1480   char *cf;
1481
1482   char *start_str;
1483   char *end_str;
1484   rrd_time_value_t start_tv;
1485   rrd_time_value_t end_tv;
1486   time_t start_tm;
1487   time_t end_tm;
1488
1489   unsigned long step;
1490   unsigned long ds_cnt;
1491   char **ds_namv;
1492   rrd_value_t *data;
1493
1494   int status;
1495   unsigned long i;
1496   time_t t;
1497   rrd_value_t *data_ptr;
1498
1499   file = NULL;
1500   cf = NULL;
1501   start_str = NULL;
1502   end_str = NULL;
1503
1504   /* Read the arguments */
1505   do /* while (0) */
1506   {
1507     status = buffer_get_field (&buffer, &buffer_size, &file);
1508     if (status != 0)
1509       break;
1510
1511     status = buffer_get_field (&buffer, &buffer_size, &cf);
1512     if (status != 0)
1513       break;
1514
1515     status = buffer_get_field (&buffer, &buffer_size, &start_str);
1516     if (status != 0)
1517     {
1518       start_str = NULL;
1519       status = 0;
1520       break;
1521     }
1522
1523     status = buffer_get_field (&buffer, &buffer_size, &end_str);
1524     if (status != 0)
1525     {
1526       end_str = NULL;
1527       status = 0;
1528       break;
1529     }
1530   } while (0);
1531
1532   if (status != 0)
1533     return (syntax_error(sock,cmd));
1534
1535   status = flush_file (file);
1536   if ((status != 0) && (status != ENOENT))
1537     return (send_response (sock, RESP_ERR,
1538           "flush_file (%s) failed with status %i.\n", file, status));
1539
1540   /* Parse start time */
1541   if (start_str != NULL)
1542   {
1543     const char *errmsg;
1544
1545     errmsg = rrd_parsetime (start_str, &start_tv);
1546     if (errmsg != NULL)
1547       return (send_response(sock, RESP_ERR,
1548             "Cannot parse start time `%s': %s\n", start_str, errmsg));
1549   }
1550   else
1551     rrd_parsetime ("-86400", &start_tv);
1552
1553   /* Parse end time */
1554   if (end_str != NULL)
1555   {
1556     const char *errmsg;
1557
1558     errmsg = rrd_parsetime (end_str, &end_tv);
1559     if (errmsg != NULL)
1560       return (send_response(sock, RESP_ERR,
1561             "Cannot parse end time `%s': %s\n", end_str, errmsg));
1562   }
1563   else
1564     rrd_parsetime ("now", &end_tv);
1565
1566   start_tm = 0;
1567   end_tm = 0;
1568   status = rrd_proc_start_end (&start_tv, &end_tv, &start_tm, &end_tm);
1569   if (status != 0)
1570     return (send_response(sock, RESP_ERR,
1571           "rrd_proc_start_end failed: %s\n", rrd_get_error ()));
1572
1573   step = -1;
1574   ds_cnt = 0;
1575   ds_namv = NULL;
1576   data = NULL;
1577
1578   status = rrd_fetch_r (file, cf, &start_tm, &end_tm, &step,
1579       &ds_cnt, &ds_namv, &data);
1580   if (status != 0)
1581     return (send_response(sock, RESP_ERR,
1582           "rrd_fetch_r failed: %s\n", rrd_get_error ()));
1583
1584   add_response_info (sock, "FlushVersion: %lu\n", 1);
1585   add_response_info (sock, "Start: %lu\n", (unsigned long) start_tm);
1586   add_response_info (sock, "End: %lu\n", (unsigned long) end_tm);
1587   add_response_info (sock, "Step: %lu\n", step);
1588   add_response_info (sock, "DSCount: %lu\n", ds_cnt);
1589
1590 #define SSTRCAT(buffer,str,buffer_fill) do { \
1591     size_t str_len = strlen (str); \
1592     if ((buffer_fill + str_len) > sizeof (buffer)) \
1593       str_len = sizeof (buffer) - buffer_fill; \
1594     if (str_len > 0) { \
1595       strncpy (buffer + buffer_fill, str, str_len); \
1596       buffer_fill += str_len; \
1597       assert (buffer_fill <= sizeof (buffer)); \
1598       if (buffer_fill == sizeof (buffer)) \
1599         buffer[buffer_fill - 1] = 0; \
1600       else \
1601         buffer[buffer_fill] = 0; \
1602     } \
1603   } while (0)
1604
1605   { /* Add list of DS names */
1606     char linebuf[1024];
1607     size_t linebuf_fill;
1608
1609     memset (linebuf, 0, sizeof (linebuf));
1610     linebuf_fill = 0;
1611     for (i = 0; i < ds_cnt; i++)
1612     {
1613       if (i > 0)
1614         SSTRCAT (linebuf, " ", linebuf_fill);
1615       SSTRCAT (linebuf, ds_namv[i], linebuf_fill);
1616     }
1617     add_response_info (sock, "DSName: %s\n", linebuf);
1618   }
1619
1620   /* Add the actual data */
1621   assert (step > 0);
1622   data_ptr = data;
1623   for (t = start_tm + step; t <= end_tm; t += step)
1624   {
1625     char linebuf[1024];
1626     size_t linebuf_fill;
1627     char tmp[128];
1628
1629     memset (linebuf, 0, sizeof (linebuf));
1630     linebuf_fill = 0;
1631     for (i = 0; i < ds_cnt; i++)
1632     {
1633       snprintf (tmp, sizeof (tmp), " %0.10e", *data_ptr);
1634       tmp[sizeof (tmp) - 1] = 0;
1635       SSTRCAT (linebuf, tmp, linebuf_fill);
1636
1637       data_ptr++;
1638     }
1639
1640     add_response_info (sock, "%10lu:%s\n", (unsigned long) t, linebuf);
1641   } /* for (t) */
1642
1643   return (send_response (sock, RESP_OK, "Success\n"));
1644 #undef SSTRCAT
1645 } /* }}} int handle_request_fetch */
1646
1647 /* we came across a "WROTE" entry during journal replay.
1648  * throw away any values that we have accumulated for this file
1649  */
1650 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1651 {
1652   cache_item_t *ci;
1653   const char *file = buffer;
1654
1655   pthread_mutex_lock(&cache_lock);
1656
1657   ci = g_tree_lookup(cache_tree, file);
1658   if (ci == NULL)
1659   {
1660     pthread_mutex_unlock(&cache_lock);
1661     return (0);
1662   }
1663
1664   if (ci->values)
1665     rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1666
1667   wipe_ci_values(ci, now);
1668   remove_from_queue(ci);
1669
1670   pthread_mutex_unlock(&cache_lock);
1671   return (0);
1672 } /* }}} int handle_request_wrote */
1673
1674 /* start "BATCH" processing */
1675 static int batch_start (HANDLER_PROTO) /* {{{ */
1676 {
1677   int status;
1678   if (sock->batch_start)
1679     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1680
1681   status = send_response(sock, RESP_OK,
1682                          "Go ahead.  End with dot '.' on its own line.\n");
1683   sock->batch_start = time(NULL);
1684   sock->batch_cmd = 0;
1685
1686   return status;
1687 } /* }}} static int batch_start */
1688
1689 /* finish "BATCH" processing and return results to the client */
1690 static int batch_done (HANDLER_PROTO) /* {{{ */
1691 {
1692   assert(sock->batch_start);
1693   sock->batch_start = 0;
1694   sock->batch_cmd  = 0;
1695   return send_response(sock, RESP_OK, "errors\n");
1696 } /* }}} static int batch_done */
1697
1698 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1699 {
1700   return -1;
1701 } /* }}} static int handle_request_quit */
1702
1703 static command_t list_of_commands[] = { /* {{{ */
1704   {
1705     "UPDATE",
1706     handle_request_update,
1707     CMD_CONTEXT_ANY,
1708     "UPDATE <filename> <values> [<values> ...]\n"
1709     ,
1710     "Adds the given file to the internal cache if it is not yet known and\n"
1711     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1712     "for details.\n"
1713     "\n"
1714     "Each <values> has the following form:\n"
1715     "  <values> = <time>:<value>[:<value>[...]]\n"
1716     "See the rrdupdate(1) manpage for details.\n"
1717   },
1718   {
1719     "WROTE",
1720     handle_request_wrote,
1721     CMD_CONTEXT_JOURNAL,
1722     NULL,
1723     NULL
1724   },
1725   {
1726     "FLUSH",
1727     handle_request_flush,
1728     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1729     "FLUSH <filename>\n"
1730     ,
1731     "Adds the given filename to the head of the update queue and returns\n"
1732     "after it has been dequeued.\n"
1733   },
1734   {
1735     "FLUSHALL",
1736     handle_request_flushall,
1737     CMD_CONTEXT_CLIENT,
1738     "FLUSHALL\n"
1739     ,
1740     "Triggers writing of all pending updates.  Returns immediately.\n"
1741   },
1742   {
1743     "PENDING",
1744     handle_request_pending,
1745     CMD_CONTEXT_CLIENT,
1746     "PENDING <filename>\n"
1747     ,
1748     "Shows any 'pending' updates for a file, in order.\n"
1749     "The updates shown have not yet been written to the underlying RRD file.\n"
1750   },
1751   {
1752     "FORGET",
1753     handle_request_forget,
1754     CMD_CONTEXT_ANY,
1755     "FORGET <filename>\n"
1756     ,
1757     "Removes the file completely from the cache.\n"
1758     "Any pending updates for the file will be lost.\n"
1759   },
1760   {
1761     "QUEUE",
1762     handle_request_queue,
1763     CMD_CONTEXT_CLIENT,
1764     "QUEUE\n"
1765     ,
1766         "Shows all files in the output queue.\n"
1767     "The output is zero or more lines in the following format:\n"
1768     "(where <num_vals> is the number of values to be written)\n"
1769     "\n"
1770     "<num_vals> <filename>\n"
1771   },
1772   {
1773     "STATS",
1774     handle_request_stats,
1775     CMD_CONTEXT_CLIENT,
1776     "STATS\n"
1777     ,
1778     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1779     "a description of the values.\n"
1780   },
1781   {
1782     "HELP",
1783     handle_request_help,
1784     CMD_CONTEXT_CLIENT,
1785     "HELP [<command>]\n",
1786     NULL, /* special! */
1787   },
1788   {
1789     "BATCH",
1790     batch_start,
1791     CMD_CONTEXT_CLIENT,
1792     "BATCH\n"
1793     ,
1794     "The 'BATCH' command permits the client to initiate a bulk load\n"
1795     "   of commands to rrdcached.\n"
1796     "\n"
1797     "Usage:\n"
1798     "\n"
1799     "    client: BATCH\n"
1800     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1801     "    client: command #1\n"
1802     "    client: command #2\n"
1803     "    client: ... and so on\n"
1804     "    client: .\n"
1805     "    server: 2 errors\n"
1806     "    server: 7 message for command #7\n"
1807     "    server: 9 message for command #9\n"
1808     "\n"
1809     "For more information, consult the rrdcached(1) documentation.\n"
1810   },
1811   {
1812     ".",   /* BATCH terminator */
1813     batch_done,
1814     CMD_CONTEXT_BATCH,
1815     NULL,
1816     NULL
1817   },
1818   {
1819     "FETCH",
1820     handle_request_fetch,
1821     CMD_CONTEXT_CLIENT,
1822     "FETCH <file> <CF> [<start> [<end>]]\n"
1823     ,
1824     "The 'FETCH' can be used by the client to retrieve values from an RRD file.\n"
1825   },
1826   {
1827     "QUIT",
1828     handle_request_quit,
1829     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1830     "QUIT\n"
1831     ,
1832     "Disconnect from rrdcached.\n"
1833   }
1834 }; /* }}} command_t list_of_commands[] */
1835 static size_t list_of_commands_len = sizeof (list_of_commands)
1836   / sizeof (list_of_commands[0]);
1837
1838 static command_t *find_command(char *cmd)
1839 {
1840   size_t i;
1841
1842   for (i = 0; i < list_of_commands_len; i++)
1843     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1844       return (&list_of_commands[i]);
1845   return NULL;
1846 }
1847
1848 /* We currently use the index in the `list_of_commands' array as a bit position
1849  * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
1850  * outside these functions so that switching to a more elegant storage method
1851  * is easily possible. */
1852 static ssize_t find_command_index (const char *cmd) /* {{{ */
1853 {
1854   size_t i;
1855
1856   for (i = 0; i < list_of_commands_len; i++)
1857     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1858       return ((ssize_t) i);
1859   return (-1);
1860 } /* }}} ssize_t find_command_index */
1861
1862 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
1863     const char *cmd)
1864 {
1865   ssize_t i;
1866
1867   if (JOURNAL_REPLAY(sock))
1868     return (1);
1869
1870   if (cmd == NULL)
1871     return (-1);
1872
1873   if ((strcasecmp ("QUIT", cmd) == 0)
1874       || (strcasecmp ("HELP", cmd) == 0))
1875     return (1);
1876   else if (strcmp (".", cmd) == 0)
1877     cmd = "BATCH";
1878
1879   i = find_command_index (cmd);
1880   if (i < 0)
1881     return (-1);
1882   assert (i < 32);
1883
1884   if ((sock->permissions & (1 << i)) != 0)
1885     return (1);
1886   return (0);
1887 } /* }}} int socket_permission_check */
1888
1889 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
1890     const char *cmd)
1891 {
1892   ssize_t i;
1893
1894   i = find_command_index (cmd);
1895   if (i < 0)
1896     return (-1);
1897   assert (i < 32);
1898
1899   sock->permissions |= (1 << i);
1900   return (0);
1901 } /* }}} int socket_permission_add */
1902
1903 /* check whether commands are received in the expected context */
1904 static int command_check_context(listen_socket_t *sock, command_t *cmd)
1905 {
1906   if (JOURNAL_REPLAY(sock))
1907     return (cmd->context & CMD_CONTEXT_JOURNAL);
1908   else if (sock->batch_start)
1909     return (cmd->context & CMD_CONTEXT_BATCH);
1910   else
1911     return (cmd->context & CMD_CONTEXT_CLIENT);
1912
1913   /* NOTREACHED */
1914   assert(1==0);
1915 }
1916
1917 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1918 {
1919   int status;
1920   char *cmd_str;
1921   char *resp_txt;
1922   command_t *help = NULL;
1923
1924   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1925   if (status == 0)
1926     help = find_command(cmd_str);
1927
1928   if (help && (help->syntax || help->help))
1929   {
1930     char tmp[CMD_MAX];
1931
1932     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1933     resp_txt = tmp;
1934
1935     if (help->syntax)
1936       add_response_info(sock, "Usage: %s\n", help->syntax);
1937
1938     if (help->help)
1939       add_response_info(sock, "%s\n", help->help);
1940   }
1941   else
1942   {
1943     size_t i;
1944
1945     resp_txt = "Command overview\n";
1946
1947     for (i = 0; i < list_of_commands_len; i++)
1948     {
1949       if (list_of_commands[i].syntax == NULL)
1950         continue;
1951       add_response_info (sock, "%s", list_of_commands[i].syntax);
1952     }
1953   }
1954
1955   return send_response(sock, RESP_OK, resp_txt);
1956 } /* }}} int handle_request_help */
1957
1958 static int handle_request (DISPATCH_PROTO) /* {{{ */
1959 {
1960   char *buffer_ptr = buffer;
1961   char *cmd_str = NULL;
1962   command_t *cmd = NULL;
1963   int status;
1964
1965   assert (buffer[buffer_size - 1] == '\0');
1966
1967   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1968   if (status != 0)
1969   {
1970     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1971     return (-1);
1972   }
1973
1974   if (sock != NULL && sock->batch_start)
1975     sock->batch_cmd++;
1976
1977   cmd = find_command(cmd_str);
1978   if (!cmd)
1979     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1980
1981   if (!socket_permission_check (sock, cmd->cmd))
1982     return send_response(sock, RESP_ERR, "Permission denied.\n");
1983
1984   if (!command_check_context(sock, cmd))
1985     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1986
1987   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1988 } /* }}} int handle_request */
1989
1990 static void journal_set_free (journal_set *js) /* {{{ */
1991 {
1992   if (js == NULL)
1993     return;
1994
1995   rrd_free_ptrs((void ***) &js->files, &js->files_num);
1996
1997   free(js);
1998 } /* }}} journal_set_free */
1999
2000 static void journal_set_remove (journal_set *js) /* {{{ */
2001 {
2002   if (js == NULL)
2003     return;
2004
2005   for (uint i=0; i < js->files_num; i++)
2006   {
2007     RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
2008     unlink(js->files[i]);
2009   }
2010 } /* }}} journal_set_remove */
2011
2012 /* close current journal file handle.
2013  * MUST hold journal_lock before calling */
2014 static void journal_close(void) /* {{{ */
2015 {
2016   if (journal_fh != NULL)
2017   {
2018     if (fclose(journal_fh) != 0)
2019       RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
2020   }
2021
2022   journal_fh = NULL;
2023   journal_size = 0;
2024 } /* }}} journal_close */
2025
2026 /* MUST hold journal_lock before calling */
2027 static void journal_new_file(void) /* {{{ */
2028 {
2029   struct timeval now;
2030   int  new_fd;
2031   char new_file[PATH_MAX + 1];
2032
2033   assert(journal_dir != NULL);
2034   assert(journal_cur != NULL);
2035
2036   journal_close();
2037
2038   gettimeofday(&now, NULL);
2039   /* this format assures that the files sort in strcmp() order */
2040   snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
2041            journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
2042
2043   new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
2044                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
2045   if (new_fd < 0)
2046     goto error;
2047
2048   journal_fh = fdopen(new_fd, "a");
2049   if (journal_fh == NULL)
2050     goto error;
2051
2052   journal_size = ftell(journal_fh);
2053   RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
2054
2055   /* record the file in the journal set */
2056   rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
2057
2058   return;
2059
2060 error:
2061   RRDD_LOG(LOG_CRIT,
2062            "JOURNALING DISABLED: Error while trying to create %s : %s",
2063            new_file, rrd_strerror(errno));
2064   RRDD_LOG(LOG_CRIT,
2065            "JOURNALING DISABLED: All values will be flushed at shutdown");
2066
2067   close(new_fd);
2068   config_flush_at_shutdown = 1;
2069
2070 } /* }}} journal_new_file */
2071
2072 /* MUST NOT hold journal_lock before calling this */
2073 static void journal_rotate(void) /* {{{ */
2074 {
2075   journal_set *old_js = NULL;
2076
2077   if (journal_dir == NULL)
2078     return;
2079
2080   RRDD_LOG(LOG_DEBUG, "rotating journals");
2081
2082   pthread_mutex_lock(&stats_lock);
2083   ++stats_journal_rotate;
2084   pthread_mutex_unlock(&stats_lock);
2085
2086   pthread_mutex_lock(&journal_lock);
2087
2088   journal_close();
2089
2090   /* rotate the journal sets */
2091   old_js = journal_old;
2092   journal_old = journal_cur;
2093   journal_cur = calloc(1, sizeof(journal_set));
2094
2095   if (journal_cur != NULL)
2096     journal_new_file();
2097   else
2098     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2099
2100   pthread_mutex_unlock(&journal_lock);
2101
2102   journal_set_remove(old_js);
2103   journal_set_free  (old_js);
2104
2105 } /* }}} static void journal_rotate */
2106
2107 /* MUST hold journal_lock when calling */
2108 static void journal_done(void) /* {{{ */
2109 {
2110   if (journal_cur == NULL)
2111     return;
2112
2113   journal_close();
2114
2115   if (config_flush_at_shutdown)
2116   {
2117     RRDD_LOG(LOG_INFO, "removing journals");
2118     journal_set_remove(journal_old);
2119     journal_set_remove(journal_cur);
2120   }
2121   else
2122   {
2123     RRDD_LOG(LOG_INFO, "expedited shutdown; "
2124              "journals will be used at next startup");
2125   }
2126
2127   journal_set_free(journal_cur);
2128   journal_set_free(journal_old);
2129   free(journal_dir);
2130
2131 } /* }}} static void journal_done */
2132
2133 static int journal_write(char *cmd, char *args) /* {{{ */
2134 {
2135   int chars;
2136
2137   if (journal_fh == NULL)
2138     return 0;
2139
2140   pthread_mutex_lock(&journal_lock);
2141   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
2142   journal_size += chars;
2143
2144   if (journal_size > JOURNAL_MAX)
2145     journal_new_file();
2146
2147   pthread_mutex_unlock(&journal_lock);
2148
2149   if (chars > 0)
2150   {
2151     pthread_mutex_lock(&stats_lock);
2152     stats_journal_bytes += chars;
2153     pthread_mutex_unlock(&stats_lock);
2154   }
2155
2156   return chars;
2157 } /* }}} static int journal_write */
2158
2159 static int journal_replay (const char *file) /* {{{ */
2160 {
2161   FILE *fh;
2162   int entry_cnt = 0;
2163   int fail_cnt = 0;
2164   uint64_t line = 0;
2165   char entry[CMD_MAX];
2166   time_t now;
2167
2168   if (file == NULL) return 0;
2169
2170   {
2171     char *reason = "unknown error";
2172     int status = 0;
2173     struct stat statbuf;
2174
2175     memset(&statbuf, 0, sizeof(statbuf));
2176     if (stat(file, &statbuf) != 0)
2177     {
2178       reason = "stat error";
2179       status = errno;
2180     }
2181     else if (!S_ISREG(statbuf.st_mode))
2182     {
2183       reason = "not a regular file";
2184       status = EPERM;
2185     }
2186     if (statbuf.st_uid != daemon_uid)
2187     {
2188       reason = "not owned by daemon user";
2189       status = EACCES;
2190     }
2191     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2192     {
2193       reason = "must not be user/group writable";
2194       status = EACCES;
2195     }
2196
2197     if (status != 0)
2198     {
2199       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2200                file, rrd_strerror(status), reason);
2201       return 0;
2202     }
2203   }
2204
2205   fh = fopen(file, "r");
2206   if (fh == NULL)
2207   {
2208     if (errno != ENOENT)
2209       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2210                file, rrd_strerror(errno));
2211     return 0;
2212   }
2213   else
2214     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2215
2216   now = time(NULL);
2217
2218   while(!feof(fh))
2219   {
2220     size_t entry_len;
2221
2222     ++line;
2223     if (fgets(entry, sizeof(entry), fh) == NULL)
2224       break;
2225     entry_len = strlen(entry);
2226
2227     /* check \n termination in case journal writing crashed mid-line */
2228     if (entry_len == 0)
2229       continue;
2230     else if (entry[entry_len - 1] != '\n')
2231     {
2232       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2233       ++fail_cnt;
2234       continue;
2235     }
2236
2237     entry[entry_len - 1] = '\0';
2238
2239     if (handle_request(NULL, now, entry, entry_len) == 0)
2240       ++entry_cnt;
2241     else
2242       ++fail_cnt;
2243   }
2244
2245   fclose(fh);
2246
2247   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2248            entry_cnt, fail_cnt);
2249
2250   return entry_cnt > 0 ? 1 : 0;
2251 } /* }}} static int journal_replay */
2252
2253 static int journal_sort(const void *v1, const void *v2)
2254 {
2255   char **jn1 = (char **) v1;
2256   char **jn2 = (char **) v2;
2257
2258   return strcmp(*jn1,*jn2);
2259 }
2260
2261 static void journal_init(void) /* {{{ */
2262 {
2263   int had_journal = 0;
2264   DIR *dir;
2265   struct dirent *dent;
2266   char path[PATH_MAX+1];
2267
2268   if (journal_dir == NULL) return;
2269
2270   pthread_mutex_lock(&journal_lock);
2271
2272   journal_cur = calloc(1, sizeof(journal_set));
2273   if (journal_cur == NULL)
2274   {
2275     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2276     return;
2277   }
2278
2279   RRDD_LOG(LOG_INFO, "checking for journal files");
2280
2281   /* Handle old journal files during transition.  This gives them the
2282    * correct sort order.  TODO: remove after first release
2283    */
2284   {
2285     char old_path[PATH_MAX+1];
2286     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2287     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2288     rename(old_path, path);
2289
2290     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE        );
2291     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2292     rename(old_path, path);
2293   }
2294
2295   dir = opendir(journal_dir);
2296   while ((dent = readdir(dir)) != NULL)
2297   {
2298     /* looks like a journal file? */
2299     if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2300       continue;
2301
2302     snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2303
2304     if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2305     {
2306       RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2307                dent->d_name);
2308       break;
2309     }
2310   }
2311   closedir(dir);
2312
2313   qsort(journal_cur->files, journal_cur->files_num,
2314         sizeof(journal_cur->files[0]), journal_sort);
2315
2316   for (uint i=0; i < journal_cur->files_num; i++)
2317     had_journal += journal_replay(journal_cur->files[i]);
2318
2319   journal_new_file();
2320
2321   /* it must have been a crash.  start a flush */
2322   if (had_journal && config_flush_at_shutdown)
2323     flush_old_values(-1);
2324
2325   pthread_mutex_unlock(&journal_lock);
2326
2327   RRDD_LOG(LOG_INFO, "journal processing complete");
2328
2329 } /* }}} static void journal_init */
2330
2331 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2332 {
2333   assert(sock != NULL);
2334
2335   free(sock->rbuf);  sock->rbuf = NULL;
2336   free(sock->wbuf);  sock->wbuf = NULL;
2337   free(sock);
2338 } /* }}} void free_listen_socket */
2339
2340 static void close_connection(listen_socket_t *sock) /* {{{ */
2341 {
2342   if (sock->fd >= 0)
2343   {
2344     close(sock->fd);
2345     sock->fd = -1;
2346   }
2347
2348   free_listen_socket(sock);
2349
2350 } /* }}} void close_connection */
2351
2352 static void *connection_thread_main (void *args) /* {{{ */
2353 {
2354   listen_socket_t *sock;
2355   int fd;
2356
2357   sock = (listen_socket_t *) args;
2358   fd = sock->fd;
2359
2360   /* init read buffers */
2361   sock->next_read = sock->next_cmd = 0;
2362   sock->rbuf = malloc(RBUF_SIZE);
2363   if (sock->rbuf == NULL)
2364   {
2365     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2366     close_connection(sock);
2367     return NULL;
2368   }
2369
2370   pthread_mutex_lock (&connection_threads_lock);
2371   connection_threads_num++;
2372   pthread_mutex_unlock (&connection_threads_lock);
2373
2374   while (state == RUNNING)
2375   {
2376     char *cmd;
2377     ssize_t cmd_len;
2378     ssize_t rbytes;
2379     time_t now;
2380
2381     struct pollfd pollfd;
2382     int status;
2383
2384     pollfd.fd = fd;
2385     pollfd.events = POLLIN | POLLPRI;
2386     pollfd.revents = 0;
2387
2388     status = poll (&pollfd, 1, /* timeout = */ 500);
2389     if (state != RUNNING)
2390       break;
2391     else if (status == 0) /* timeout */
2392       continue;
2393     else if (status < 0) /* error */
2394     {
2395       status = errno;
2396       if (status != EINTR)
2397         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2398       continue;
2399     }
2400
2401     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2402       break;
2403     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2404     {
2405       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2406           "poll(2) returned something unexpected: %#04hx",
2407           pollfd.revents);
2408       break;
2409     }
2410
2411     rbytes = read(fd, sock->rbuf + sock->next_read,
2412                   RBUF_SIZE - sock->next_read);
2413     if (rbytes < 0)
2414     {
2415       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2416       break;
2417     }
2418     else if (rbytes == 0)
2419       break; /* eof */
2420
2421     sock->next_read += rbytes;
2422
2423     if (sock->batch_start)
2424       now = sock->batch_start;
2425     else
2426       now = time(NULL);
2427
2428     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2429     {
2430       status = handle_request (sock, now, cmd, cmd_len+1);
2431       if (status != 0)
2432         goto out_close;
2433     }
2434   }
2435
2436 out_close:
2437   close_connection(sock);
2438
2439   /* Remove this thread from the connection threads list */
2440   pthread_mutex_lock (&connection_threads_lock);
2441   connection_threads_num--;
2442   if (connection_threads_num <= 0)
2443     pthread_cond_broadcast(&connection_threads_done);
2444   pthread_mutex_unlock (&connection_threads_lock);
2445
2446   return (NULL);
2447 } /* }}} void *connection_thread_main */
2448
2449 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2450 {
2451   int fd;
2452   struct sockaddr_un sa;
2453   listen_socket_t *temp;
2454   int status;
2455   const char *path;
2456   char *path_copy, *dir;
2457
2458   path = sock->addr;
2459   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2460     path += strlen("unix:");
2461
2462   /* dirname may modify its argument */
2463   path_copy = strdup(path);
2464   if (path_copy == NULL)
2465   {
2466     fprintf(stderr, "rrdcached: strdup(): %s\n",
2467         rrd_strerror(errno));
2468     return (-1);
2469   }
2470
2471   dir = dirname(path_copy);
2472   if (rrd_mkdir_p(dir, 0777) != 0)
2473   {
2474     fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2475         dir, rrd_strerror(errno));
2476     return (-1);
2477   }
2478
2479   free(path_copy);
2480
2481   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2482       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2483   if (temp == NULL)
2484   {
2485     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2486     return (-1);
2487   }
2488   listen_fds = temp;
2489   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2490
2491   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2492   if (fd < 0)
2493   {
2494     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2495              rrd_strerror(errno));
2496     return (-1);
2497   }
2498
2499   memset (&sa, 0, sizeof (sa));
2500   sa.sun_family = AF_UNIX;
2501   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2502
2503   /* if we've gotten this far, we own the pid file.  any daemon started
2504    * with the same args must not be alive.  therefore, ensure that we can
2505    * create the socket...
2506    */
2507   unlink(path);
2508
2509   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2510   if (status != 0)
2511   {
2512     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2513              path, rrd_strerror(errno));
2514     close (fd);
2515     return (-1);
2516   }
2517
2518   /* tweak the sockets group ownership */
2519   if (sock->socket_group != (gid_t)-1)
2520   {
2521     if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2522          (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2523     {
2524       fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2525     }
2526   }
2527
2528   if (sock->socket_permissions != (mode_t)-1)
2529   {
2530     if (chmod(path, sock->socket_permissions) != 0)
2531       fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2532           (unsigned int)sock->socket_permissions, strerror(errno));
2533   }
2534
2535   status = listen (fd, /* backlog = */ 10);
2536   if (status != 0)
2537   {
2538     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2539              path, rrd_strerror(errno));
2540     close (fd);
2541     unlink (path);
2542     return (-1);
2543   }
2544
2545   listen_fds[listen_fds_num].fd = fd;
2546   listen_fds[listen_fds_num].family = PF_UNIX;
2547   strncpy(listen_fds[listen_fds_num].addr, path,
2548           sizeof (listen_fds[listen_fds_num].addr) - 1);
2549   listen_fds_num++;
2550
2551   return (0);
2552 } /* }}} int open_listen_socket_unix */
2553
2554 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2555 {
2556   struct addrinfo ai_hints;
2557   struct addrinfo *ai_res;
2558   struct addrinfo *ai_ptr;
2559   char addr_copy[NI_MAXHOST];
2560   char *addr;
2561   char *port;
2562   int status;
2563
2564   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2565   addr_copy[sizeof (addr_copy) - 1] = 0;
2566   addr = addr_copy;
2567
2568   memset (&ai_hints, 0, sizeof (ai_hints));
2569   ai_hints.ai_flags = 0;
2570 #ifdef AI_ADDRCONFIG
2571   ai_hints.ai_flags |= AI_ADDRCONFIG;
2572 #endif
2573   ai_hints.ai_family = AF_UNSPEC;
2574   ai_hints.ai_socktype = SOCK_STREAM;
2575
2576   port = NULL;
2577   if (*addr == '[') /* IPv6+port format */
2578   {
2579     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2580     addr++;
2581
2582     port = strchr (addr, ']');
2583     if (port == NULL)
2584     {
2585       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2586       return (-1);
2587     }
2588     *port = 0;
2589     port++;
2590
2591     if (*port == ':')
2592       port++;
2593     else if (*port == 0)
2594       port = NULL;
2595     else
2596     {
2597       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2598       return (-1);
2599     }
2600   } /* if (*addr == '[') */
2601   else
2602   {
2603     port = rindex(addr, ':');
2604     if (port != NULL)
2605     {
2606       *port = 0;
2607       port++;
2608     }
2609   }
2610   ai_res = NULL;
2611   status = getaddrinfo (addr,
2612                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2613                         &ai_hints, &ai_res);
2614   if (status != 0)
2615   {
2616     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2617              addr, gai_strerror (status));
2618     return (-1);
2619   }
2620
2621   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2622   {
2623     int fd;
2624     listen_socket_t *temp;
2625     int one = 1;
2626
2627     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2628         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2629     if (temp == NULL)
2630     {
2631       fprintf (stderr,
2632                "rrdcached: open_listen_socket_network: realloc failed.\n");
2633       continue;
2634     }
2635     listen_fds = temp;
2636     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2637
2638     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2639     if (fd < 0)
2640     {
2641       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2642                rrd_strerror(errno));
2643       continue;
2644     }
2645
2646     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2647
2648     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2649     if (status != 0)
2650     {
2651       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2652                sock->addr, rrd_strerror(errno));
2653       close (fd);
2654       continue;
2655     }
2656
2657     status = listen (fd, /* backlog = */ 10);
2658     if (status != 0)
2659     {
2660       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2661                sock->addr, rrd_strerror(errno));
2662       close (fd);
2663       freeaddrinfo(ai_res);
2664       return (-1);
2665     }
2666
2667     listen_fds[listen_fds_num].fd = fd;
2668     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2669     listen_fds_num++;
2670   } /* for (ai_ptr) */
2671
2672   freeaddrinfo(ai_res);
2673   return (0);
2674 } /* }}} static int open_listen_socket_network */
2675
2676 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2677 {
2678   assert(sock != NULL);
2679   assert(sock->addr != NULL);
2680
2681   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2682       || sock->addr[0] == '/')
2683     return (open_listen_socket_unix(sock));
2684   else
2685     return (open_listen_socket_network(sock));
2686 } /* }}} int open_listen_socket */
2687
2688 static int close_listen_sockets (void) /* {{{ */
2689 {
2690   size_t i;
2691
2692   for (i = 0; i < listen_fds_num; i++)
2693   {
2694     close (listen_fds[i].fd);
2695
2696     if (listen_fds[i].family == PF_UNIX)
2697       unlink(listen_fds[i].addr);
2698   }
2699
2700   free (listen_fds);
2701   listen_fds = NULL;
2702   listen_fds_num = 0;
2703
2704   return (0);
2705 } /* }}} int close_listen_sockets */
2706
2707 static void *listen_thread_main (void UNUSED(*args)) /* {{{ */
2708 {
2709   struct pollfd *pollfds;
2710   int pollfds_num;
2711   int status;
2712   int i;
2713
2714   if (listen_fds_num < 1)
2715   {
2716     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2717     return (NULL);
2718   }
2719
2720   pollfds_num = listen_fds_num;
2721   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2722   if (pollfds == NULL)
2723   {
2724     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2725     return (NULL);
2726   }
2727   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2728
2729   RRDD_LOG(LOG_INFO, "listening for connections");
2730
2731   while (state == RUNNING)
2732   {
2733     for (i = 0; i < pollfds_num; i++)
2734     {
2735       pollfds[i].fd = listen_fds[i].fd;
2736       pollfds[i].events = POLLIN | POLLPRI;
2737       pollfds[i].revents = 0;
2738     }
2739
2740     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2741     if (state != RUNNING)
2742       break;
2743     else if (status == 0) /* timeout */
2744       continue;
2745     else if (status < 0) /* error */
2746     {
2747       status = errno;
2748       if (status != EINTR)
2749       {
2750         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2751       }
2752       continue;
2753     }
2754
2755     for (i = 0; i < pollfds_num; i++)
2756     {
2757       listen_socket_t *client_sock;
2758       struct sockaddr_storage client_sa;
2759       socklen_t client_sa_size;
2760       pthread_t tid;
2761       pthread_attr_t attr;
2762
2763       if (pollfds[i].revents == 0)
2764         continue;
2765
2766       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2767       {
2768         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2769             "poll(2) returned something unexpected for listen FD #%i.",
2770             pollfds[i].fd);
2771         continue;
2772       }
2773
2774       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2775       if (client_sock == NULL)
2776       {
2777         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2778         continue;
2779       }
2780       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2781
2782       client_sa_size = sizeof (client_sa);
2783       client_sock->fd = accept (pollfds[i].fd,
2784           (struct sockaddr *) &client_sa, &client_sa_size);
2785       if (client_sock->fd < 0)
2786       {
2787         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2788         free(client_sock);
2789         continue;
2790       }
2791
2792       pthread_attr_init (&attr);
2793       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2794
2795       status = pthread_create (&tid, &attr, connection_thread_main,
2796                                client_sock);
2797       if (status != 0)
2798       {
2799         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2800         close_connection(client_sock);
2801         continue;
2802       }
2803     } /* for (pollfds_num) */
2804   } /* while (state == RUNNING) */
2805
2806   RRDD_LOG(LOG_INFO, "starting shutdown");
2807
2808   close_listen_sockets ();
2809
2810   pthread_mutex_lock (&connection_threads_lock);
2811   while (connection_threads_num > 0)
2812     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2813   pthread_mutex_unlock (&connection_threads_lock);
2814
2815   free(pollfds);
2816
2817   return (NULL);
2818 } /* }}} void *listen_thread_main */
2819
2820 static int daemonize (void) /* {{{ */
2821 {
2822   int pid_fd;
2823   char *base_dir;
2824
2825   daemon_uid = geteuid();
2826
2827   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2828   if (pid_fd < 0)
2829     pid_fd = check_pidfile();
2830   if (pid_fd < 0)
2831     return pid_fd;
2832
2833   /* open all the listen sockets */
2834   if (config_listen_address_list_len > 0)
2835   {
2836     for (size_t i = 0; i < config_listen_address_list_len; i++)
2837       open_listen_socket (config_listen_address_list[i]);
2838
2839     rrd_free_ptrs((void ***) &config_listen_address_list,
2840                   &config_listen_address_list_len);
2841   }
2842   else
2843   {
2844     listen_socket_t sock;
2845     memset(&sock, 0, sizeof(sock));
2846     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2847     open_listen_socket (&sock);
2848   }
2849
2850   if (listen_fds_num < 1)
2851   {
2852     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2853     goto error;
2854   }
2855
2856   if (!stay_foreground)
2857   {
2858     pid_t child;
2859
2860     child = fork ();
2861     if (child < 0)
2862     {
2863       fprintf (stderr, "daemonize: fork(2) failed.\n");
2864       goto error;
2865     }
2866     else if (child > 0)
2867       exit(0);
2868
2869     /* Become session leader */
2870     setsid ();
2871
2872     /* Open the first three file descriptors to /dev/null */
2873     close (2);
2874     close (1);
2875     close (0);
2876
2877     open ("/dev/null", O_RDWR);
2878     if (dup(0) == -1 || dup(0) == -1){
2879         RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2880     }
2881   } /* if (!stay_foreground) */
2882
2883   /* Change into the /tmp directory. */
2884   base_dir = (config_base_dir != NULL)
2885     ? config_base_dir
2886     : "/tmp";
2887
2888   if (chdir (base_dir) != 0)
2889   {
2890     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2891     goto error;
2892   }
2893
2894   install_signal_handlers();
2895
2896   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2897   RRDD_LOG(LOG_INFO, "starting up");
2898
2899   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2900                                 (GDestroyNotify) free_cache_item);
2901   if (cache_tree == NULL)
2902   {
2903     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2904     goto error;
2905   }
2906
2907   return write_pidfile (pid_fd);
2908
2909 error:
2910   remove_pidfile();
2911   return -1;
2912 } /* }}} int daemonize */
2913
2914 static int cleanup (void) /* {{{ */
2915 {
2916   pthread_cond_broadcast (&flush_cond);
2917   pthread_join (flush_thread, NULL);
2918
2919   pthread_cond_broadcast (&queue_cond);
2920   for (int i = 0; i < config_queue_threads; i++)
2921     pthread_join (queue_threads[i], NULL);
2922
2923   if (config_flush_at_shutdown)
2924   {
2925     assert(cache_queue_head == NULL);
2926     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2927   }
2928
2929   free(queue_threads);
2930   free(config_base_dir);
2931
2932   pthread_mutex_lock(&cache_lock);
2933   g_tree_destroy(cache_tree);
2934
2935   pthread_mutex_lock(&journal_lock);
2936   journal_done();
2937
2938   RRDD_LOG(LOG_INFO, "goodbye");
2939   closelog ();
2940
2941   remove_pidfile ();
2942   free(config_pid_file);
2943
2944   return (0);
2945 } /* }}} int cleanup */
2946
2947 static int read_options (int argc, char **argv) /* {{{ */
2948 {
2949   int option;
2950   int status = 0;
2951
2952   char **permissions = NULL;
2953   size_t permissions_len = 0;
2954
2955   gid_t  socket_group = (gid_t)-1;
2956   mode_t socket_permissions = (mode_t)-1;
2957
2958   while ((option = getopt(argc, argv, "gl:s:m:P:f:w:z:t:Bb:p:Fj:a:h?")) != -1)
2959   {
2960     switch (option)
2961     {
2962       case 'g':
2963         stay_foreground=1;
2964         break;
2965
2966       case 'l':
2967       {
2968         listen_socket_t *new;
2969
2970         new = malloc(sizeof(listen_socket_t));
2971         if (new == NULL)
2972         {
2973           fprintf(stderr, "read_options: malloc failed.\n");
2974           return(2);
2975         }
2976         memset(new, 0, sizeof(listen_socket_t));
2977
2978         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2979
2980         /* Add permissions to the socket {{{ */
2981         if (permissions_len != 0)
2982         {
2983           size_t i;
2984           for (i = 0; i < permissions_len; i++)
2985           {
2986             status = socket_permission_add (new, permissions[i]);
2987             if (status != 0)
2988             {
2989               fprintf (stderr, "read_options: Adding permission \"%s\" to "
2990                   "socket failed. Most likely, this permission doesn't "
2991                   "exist. Check your command line.\n", permissions[i]);
2992               status = 4;
2993             }
2994           }
2995         }
2996         else /* if (permissions_len == 0) */
2997         {
2998           /* Add permission for ALL commands to the socket. */
2999           size_t i;
3000           for (i = 0; i < list_of_commands_len; i++)
3001           {
3002             status = socket_permission_add (new, list_of_commands[i].cmd);
3003             if (status != 0)
3004             {
3005               fprintf (stderr, "read_options: Adding permission \"%s\" to "
3006                   "socket failed. This should never happen, ever! Sorry.\n",
3007                   permissions[i]);
3008               status = 4;
3009             }
3010           }
3011         }
3012         /* }}} Done adding permissions. */
3013
3014         new->socket_group = socket_group;
3015         new->socket_permissions = socket_permissions;
3016
3017         if (!rrd_add_ptr((void ***)&config_listen_address_list,
3018                          &config_listen_address_list_len, new))
3019         {
3020           fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
3021           return (2);
3022         }
3023       }
3024       break;
3025
3026       /* set socket group permissions */
3027       case 's':
3028       {
3029         gid_t group_gid;
3030         struct group *grp;
3031
3032         group_gid = strtoul(optarg, NULL, 10);
3033         if (errno != EINVAL && group_gid>0)
3034         {
3035           /* we were passed a number */
3036           grp = getgrgid(group_gid);
3037         }
3038         else
3039         {
3040           grp = getgrnam(optarg);
3041         }
3042
3043         if (grp)
3044         {
3045           socket_group = grp->gr_gid;
3046         }
3047         else
3048         {
3049           /* no idea what the user wanted... */
3050           fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
3051           return (5);
3052         }
3053       }
3054       break;
3055
3056       /* set socket file permissions */
3057       case 'm':
3058       {
3059         long  tmp;
3060         char *endptr = NULL;
3061
3062         tmp = strtol (optarg, &endptr, 8);
3063         if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
3064             || (tmp > 07777) || (tmp < 0)) {
3065           fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
3066               optarg);
3067           return (5);
3068         }
3069
3070         socket_permissions = (mode_t)tmp;
3071       }
3072       break;
3073
3074       case 'P':
3075       {
3076         char *optcopy;
3077         char *saveptr;
3078         char *dummy;
3079         char *ptr;
3080
3081         rrd_free_ptrs ((void *) &permissions, &permissions_len);
3082
3083         optcopy = strdup (optarg);
3084         dummy = optcopy;
3085         saveptr = NULL;
3086         while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
3087         {
3088           dummy = NULL;
3089           rrd_add_strdup ((void *) &permissions, &permissions_len, ptr);
3090         }
3091
3092         free (optcopy);
3093       }
3094       break;
3095
3096       case 'f':
3097       {
3098         int temp;
3099
3100         temp = atoi (optarg);
3101         if (temp > 0)
3102           config_flush_interval = temp;
3103         else
3104         {
3105           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
3106           status = 3;
3107         }
3108       }
3109       break;
3110
3111       case 'w':
3112       {
3113         int temp;
3114
3115         temp = atoi (optarg);
3116         if (temp > 0)
3117           config_write_interval = temp;
3118         else
3119         {
3120           fprintf (stderr, "Invalid write interval: %s\n", optarg);
3121           status = 2;
3122         }
3123       }
3124       break;
3125
3126       case 'z':
3127       {
3128         int temp;
3129
3130         temp = atoi(optarg);
3131         if (temp > 0)
3132           config_write_jitter = temp;
3133         else
3134         {
3135           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
3136           status = 2;
3137         }
3138
3139         break;
3140       }
3141
3142       case 't':
3143       {
3144         int threads;
3145         threads = atoi(optarg);
3146         if (threads >= 1)
3147           config_queue_threads = threads;
3148         else
3149         {
3150           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
3151           return 1;
3152         }
3153       }
3154       break;
3155
3156       case 'B':
3157         config_write_base_only = 1;
3158         break;
3159
3160       case 'b':
3161       {
3162         size_t len;
3163         char base_realpath[PATH_MAX];
3164
3165         if (config_base_dir != NULL)
3166           free (config_base_dir);
3167         config_base_dir = strdup (optarg);
3168         if (config_base_dir == NULL)
3169         {
3170           fprintf (stderr, "read_options: strdup failed.\n");
3171           return (3);
3172         }
3173
3174         if (rrd_mkdir_p (config_base_dir, 0777) != 0)
3175         {
3176           fprintf (stderr, "Failed to create base directory '%s': %s\n",
3177               config_base_dir, rrd_strerror (errno));
3178           return (3);
3179         }
3180
3181         /* make sure that the base directory is not resolved via
3182          * symbolic links.  this makes some performance-enhancing
3183          * assumptions possible (we don't have to resolve paths
3184          * that start with a "/")
3185          */
3186         if (realpath(config_base_dir, base_realpath) == NULL)
3187         {
3188           fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3189               "%s\n", config_base_dir, rrd_strerror(errno));
3190           return 5;
3191         }
3192
3193         len = strlen (config_base_dir);
3194         while ((len > 0) && (config_base_dir[len - 1] == '/'))
3195         {
3196           config_base_dir[len - 1] = 0;
3197           len--;
3198         }
3199
3200         if (len < 1)
3201         {
3202           fprintf (stderr, "Invalid base directory: %s\n", optarg);
3203           return (4);
3204         }
3205
3206         _config_base_dir_len = len;
3207
3208         len = strlen (base_realpath);
3209         while ((len > 0) && (base_realpath[len - 1] == '/'))
3210         {
3211           base_realpath[len - 1] = '\0';
3212           len--;
3213         }
3214
3215         if (strncmp(config_base_dir,
3216                          base_realpath, sizeof(base_realpath)) != 0)
3217         {
3218           fprintf(stderr,
3219                   "Base directory (-b) resolved via file system links!\n"
3220                   "Please consult rrdcached '-b' documentation!\n"
3221                   "Consider specifying the real directory (%s)\n",
3222                   base_realpath);
3223           return 5;
3224         }
3225       }
3226       break;
3227
3228       case 'p':
3229       {
3230         if (config_pid_file != NULL)
3231           free (config_pid_file);
3232         config_pid_file = strdup (optarg);
3233         if (config_pid_file == NULL)
3234         {
3235           fprintf (stderr, "read_options: strdup failed.\n");
3236           return (3);
3237         }
3238       }
3239       break;
3240
3241       case 'F':
3242         config_flush_at_shutdown = 1;
3243         break;
3244
3245       case 'j':
3246       {
3247         const char *dir = journal_dir = strdup(optarg);
3248
3249         status = rrd_mkdir_p(dir, 0777);
3250         if (status != 0)
3251         {
3252           fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3253               dir, rrd_strerror(errno));
3254           return 6;
3255         }
3256
3257         if (access(dir, R_OK|W_OK|X_OK) != 0)
3258         {
3259           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3260                   errno ? rrd_strerror(errno) : "");
3261           return 6;
3262         }
3263       }
3264       break;
3265
3266       case 'a':
3267       {
3268         int temp = atoi(optarg);
3269         if (temp > 0)
3270           config_alloc_chunk = temp;
3271         else
3272         {
3273           fprintf(stderr, "Invalid allocation size: %s\n", optarg);
3274           return 10;
3275         }
3276       }
3277       break;
3278
3279       case 'h':
3280       case '?':
3281         printf ("RRDCacheD %s\n"
3282             "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3283             "\n"
3284             "Usage: rrdcached [options]\n"
3285             "\n"
3286             "Valid options are:\n"
3287             "  -l <address>  Socket address to listen to.\n"
3288             "  -P <perms>    Sets the permissions to assign to all following "
3289                             "sockets\n"
3290             "  -w <seconds>  Interval in which to write data.\n"
3291             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
3292             "  -t <threads>  Number of write threads.\n"
3293             "  -f <seconds>  Interval in which to flush dead data.\n"
3294             "  -p <file>     Location of the PID-file.\n"
3295             "  -b <dir>      Base directory to change to.\n"
3296             "  -B            Restrict file access to paths within -b <dir>\n"
3297             "  -g            Do not fork and run in the foreground.\n"
3298             "  -j <dir>      Directory in which to create the journal files.\n"
3299             "  -F            Always flush all updates at shutdown\n"
3300             "  -s <id|name>  Group owner of all following UNIX sockets\n"
3301             "                (the socket will also have read/write permissions "
3302                             "for that group)\n"
3303             "  -m <mode>     File permissions (octal) of all following UNIX "
3304                             "sockets\n"
3305             "  -a <size>     Memory allocation chunk size. Default is 1."
3306             "\n"
3307             "For more information and a detailed description of all options "
3308             "please refer\n"
3309             "to the rrdcached(1) manual page.\n",
3310             VERSION);
3311         if (option == 'h')
3312           status = -1;
3313         else
3314           status = 1;
3315         break;
3316     } /* switch (option) */
3317   } /* while (getopt) */
3318
3319   /* advise the user when values are not sane */
3320   if (config_flush_interval < 2 * config_write_interval)
3321     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3322             " 2x write interval (-w) !\n");
3323   if (config_write_jitter > config_write_interval)
3324     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3325             " write interval (-w) !\n");
3326
3327   if (config_write_base_only && config_base_dir == NULL)
3328     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3329             "  Consult the rrdcached documentation\n");
3330
3331   if (journal_dir == NULL)
3332     config_flush_at_shutdown = 1;
3333
3334   rrd_free_ptrs ((void *) &permissions, &permissions_len);
3335
3336   return (status);
3337 } /* }}} int read_options */
3338
3339 int main (int argc, char **argv)
3340 {
3341   int status;
3342
3343   status = read_options (argc, argv);
3344   if (status != 0)
3345   {
3346     if (status < 0)
3347       status = 0;
3348     return (status);
3349   }
3350
3351   status = daemonize ();
3352   if (status != 0)
3353   {
3354     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3355     return (1);
3356   }
3357
3358   journal_init();
3359
3360   /* start the queue threads */
3361   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3362   if (queue_threads == NULL)
3363   {
3364     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3365     cleanup();
3366     return (1);
3367   }
3368   for (int i = 0; i < config_queue_threads; i++)
3369   {
3370     memset (&queue_threads[i], 0, sizeof (*queue_threads));
3371     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3372     if (status != 0)
3373     {
3374       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3375       cleanup();
3376       return (1);
3377     }
3378   }
3379
3380   /* start the flush thread */
3381   memset(&flush_thread, 0, sizeof(flush_thread));
3382   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3383   if (status != 0)
3384   {
3385     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3386     cleanup();
3387     return (1);
3388   }
3389
3390   listen_thread_main (NULL);
3391   cleanup ();
3392
3393   return (0);
3394 } /* int main */
3395
3396 /*
3397  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3398  */