rrdcached: Added -m command line option. This option may be used to specify
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008,2009 Florian octo Forster
4  * Copyright (C) 2008,2009 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
73
74 #include "rrd.h"
75 #include "rrd_client.h"
76
77 #include <stdlib.h>
78
79 #ifndef WIN32
80 #ifdef HAVE_STDINT_H
81 #  include <stdint.h>
82 #endif
83 #include <unistd.h>
84 #include <strings.h>
85 #include <inttypes.h>
86 #include <sys/socket.h>
87
88 #else
89
90 #endif
91 #include <stdio.h>
92 #include <string.h>
93
94 #include <sys/types.h>
95 #include <sys/stat.h>
96 #include <dirent.h>
97 #include <fcntl.h>
98 #include <signal.h>
99 #include <sys/un.h>
100 #include <netdb.h>
101 #include <poll.h>
102 #include <syslog.h>
103 #include <pthread.h>
104 #include <errno.h>
105 #include <assert.h>
106 #include <sys/time.h>
107 #include <time.h>
108 #include <libgen.h>
109 #include <grp.h>
110
111 #include <glib-2.0/glib.h>
112 /* }}} */
113
114 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
115
116 #ifndef __GNUC__
117 # define __attribute__(x) /**/
118 #endif
119
120 /*
121  * Types
122  */
123 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
124
125 struct listen_socket_s
126 {
127   int fd;
128   char addr[PATH_MAX + 1];
129   int family;
130
131   /* state for BATCH processing */
132   time_t batch_start;
133   int batch_cmd;
134
135   /* buffered IO */
136   char *rbuf;
137   off_t next_cmd;
138   off_t next_read;
139
140   char *wbuf;
141   ssize_t wbuf_len;
142
143   uint32_t permissions;
144
145   gid_t  socket_group;
146   mode_t socket_permissions;
147 };
148 typedef struct listen_socket_s listen_socket_t;
149
150 struct command_s;
151 typedef struct command_s command_t;
152 /* note: guard against "unused" warnings in the handlers */
153 #define DISPATCH_PROTO  listen_socket_t *sock   __attribute__((unused)),\
154                         time_t now              __attribute__((unused)),\
155                         char  *buffer           __attribute__((unused)),\
156                         size_t buffer_size      __attribute__((unused))
157
158 #define HANDLER_PROTO   command_t *cmd          __attribute__((unused)),\
159                         DISPATCH_PROTO
160
161 struct command_s {
162   char   *cmd;
163   int (*handler)(HANDLER_PROTO);
164
165   char  context;                /* where we expect to see it */
166 #define CMD_CONTEXT_CLIENT      (1<<0)
167 #define CMD_CONTEXT_BATCH       (1<<1)
168 #define CMD_CONTEXT_JOURNAL     (1<<2)
169 #define CMD_CONTEXT_ANY         (0x7f)
170
171   char *syntax;
172   char *help;
173 };
174
175 struct cache_item_s;
176 typedef struct cache_item_s cache_item_t;
177 struct cache_item_s
178 {
179   char *file;
180   char **values;
181   size_t values_num;
182   time_t last_flush_time;
183   time_t last_update_stamp;
184 #define CI_FLAGS_IN_TREE  (1<<0)
185 #define CI_FLAGS_IN_QUEUE (1<<1)
186   int flags;
187   pthread_cond_t  flushed;
188   cache_item_t *prev;
189   cache_item_t *next;
190 };
191
192 struct callback_flush_data_s
193 {
194   time_t now;
195   time_t abs_timeout;
196   char **keys;
197   size_t keys_num;
198 };
199 typedef struct callback_flush_data_s callback_flush_data_t;
200
201 enum queue_side_e
202 {
203   HEAD,
204   TAIL
205 };
206 typedef enum queue_side_e queue_side_t;
207
208 /* describe a set of journal files */
209 typedef struct {
210   char **files;
211   size_t files_num;
212 } journal_set;
213
214 /* max length of socket command or response */
215 #define CMD_MAX 4096
216 #define RBUF_SIZE (CMD_MAX*2)
217
218 /*
219  * Variables
220  */
221 static int stay_foreground = 0;
222 static uid_t daemon_uid;
223
224 static listen_socket_t *listen_fds = NULL;
225 static size_t listen_fds_num = 0;
226
227 enum {
228   RUNNING,              /* normal operation */
229   FLUSHING,             /* flushing remaining values */
230   SHUTDOWN              /* shutting down */
231 } state = RUNNING;
232
233 static pthread_t *queue_threads;
234 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
235 static int config_queue_threads = 4;
236
237 static pthread_t flush_thread;
238 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
239
240 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
241 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
242 static int connection_threads_num = 0;
243
244 /* Cache stuff */
245 static GTree          *cache_tree = NULL;
246 static cache_item_t   *cache_queue_head = NULL;
247 static cache_item_t   *cache_queue_tail = NULL;
248 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
249
250 static int config_write_interval = 300;
251 static int config_write_jitter   = 0;
252 static int config_flush_interval = 3600;
253 static int config_flush_at_shutdown = 0;
254 static char *config_pid_file = NULL;
255 static char *config_base_dir = NULL;
256 static size_t _config_base_dir_len = 0;
257 static int config_write_base_only = 0;
258
259 static listen_socket_t **config_listen_address_list = NULL;
260 static size_t config_listen_address_list_len = 0;
261
262 static uint64_t stats_queue_length = 0;
263 static uint64_t stats_updates_received = 0;
264 static uint64_t stats_flush_received = 0;
265 static uint64_t stats_updates_written = 0;
266 static uint64_t stats_data_sets_written = 0;
267 static uint64_t stats_journal_bytes = 0;
268 static uint64_t stats_journal_rotate = 0;
269 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
270
271 /* Journaled updates */
272 #define JOURNAL_REPLAY(s) ((s) == NULL)
273 #define JOURNAL_BASE "rrd.journal"
274 static journal_set *journal_cur = NULL;
275 static journal_set *journal_old = NULL;
276 static char *journal_dir = NULL;
277 static FILE *journal_fh = NULL;         /* current journal file handle */
278 static long  journal_size = 0;          /* current journal size */
279 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
280 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
281 static int journal_write(char *cmd, char *args);
282 static void journal_done(void);
283 static void journal_rotate(void);
284
285 /* prototypes for forward refernces */
286 static int handle_request_help (HANDLER_PROTO);
287
288 /* 
289  * Functions
290  */
291 static void sig_common (const char *sig) /* {{{ */
292 {
293   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
294   state = FLUSHING;
295   pthread_cond_broadcast(&flush_cond);
296   pthread_cond_broadcast(&queue_cond);
297 } /* }}} void sig_common */
298
299 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
300 {
301   sig_common("INT");
302 } /* }}} void sig_int_handler */
303
304 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
305 {
306   sig_common("TERM");
307 } /* }}} void sig_term_handler */
308
309 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
310 {
311   config_flush_at_shutdown = 1;
312   sig_common("USR1");
313 } /* }}} void sig_usr1_handler */
314
315 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
316 {
317   config_flush_at_shutdown = 0;
318   sig_common("USR2");
319 } /* }}} void sig_usr2_handler */
320
321 static void install_signal_handlers(void) /* {{{ */
322 {
323   /* These structures are static, because `sigaction' behaves weird if the are
324    * overwritten.. */
325   static struct sigaction sa_int;
326   static struct sigaction sa_term;
327   static struct sigaction sa_pipe;
328   static struct sigaction sa_usr1;
329   static struct sigaction sa_usr2;
330
331   /* Install signal handlers */
332   memset (&sa_int, 0, sizeof (sa_int));
333   sa_int.sa_handler = sig_int_handler;
334   sigaction (SIGINT, &sa_int, NULL);
335
336   memset (&sa_term, 0, sizeof (sa_term));
337   sa_term.sa_handler = sig_term_handler;
338   sigaction (SIGTERM, &sa_term, NULL);
339
340   memset (&sa_pipe, 0, sizeof (sa_pipe));
341   sa_pipe.sa_handler = SIG_IGN;
342   sigaction (SIGPIPE, &sa_pipe, NULL);
343
344   memset (&sa_pipe, 0, sizeof (sa_usr1));
345   sa_usr1.sa_handler = sig_usr1_handler;
346   sigaction (SIGUSR1, &sa_usr1, NULL);
347
348   memset (&sa_usr2, 0, sizeof (sa_usr2));
349   sa_usr2.sa_handler = sig_usr2_handler;
350   sigaction (SIGUSR2, &sa_usr2, NULL);
351
352 } /* }}} void install_signal_handlers */
353
354 static int open_pidfile(char *action, int oflag) /* {{{ */
355 {
356   int fd;
357   const char *file;
358   char *file_copy, *dir;
359
360   file = (config_pid_file != NULL)
361     ? config_pid_file
362     : LOCALSTATEDIR "/run/rrdcached.pid";
363
364   /* dirname may modify its argument */
365   file_copy = strdup(file);
366   if (file_copy == NULL)
367   {
368     fprintf(stderr, "rrdcached: strdup(): %s\n",
369         rrd_strerror(errno));
370     return -1;
371   }
372
373   dir = dirname(file_copy);
374   if (rrd_mkdir_p(dir, 0777) != 0)
375   {
376     fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
377         dir, rrd_strerror(errno));
378     return -1;
379   }
380
381   free(file_copy);
382
383   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
384   if (fd < 0)
385     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
386             action, file, rrd_strerror(errno));
387
388   return(fd);
389 } /* }}} static int open_pidfile */
390
391 /* check existing pid file to see whether a daemon is running */
392 static int check_pidfile(void)
393 {
394   int pid_fd;
395   pid_t pid;
396   char pid_str[16];
397
398   pid_fd = open_pidfile("open", O_RDWR);
399   if (pid_fd < 0)
400     return pid_fd;
401
402   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
403     return -1;
404
405   pid = atoi(pid_str);
406   if (pid <= 0)
407     return -1;
408
409   /* another running process that we can signal COULD be
410    * a competing rrdcached */
411   if (pid != getpid() && kill(pid, 0) == 0)
412   {
413     fprintf(stderr,
414             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
415     close(pid_fd);
416     return -1;
417   }
418
419   lseek(pid_fd, 0, SEEK_SET);
420   if (ftruncate(pid_fd, 0) == -1)
421   {
422     fprintf(stderr,
423             "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
424     close(pid_fd);
425     return -1;
426   }
427
428   fprintf(stderr,
429           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
430           "rrdcached: starting normally.\n", pid);
431
432   return pid_fd;
433 } /* }}} static int check_pidfile */
434
435 static int write_pidfile (int fd) /* {{{ */
436 {
437   pid_t pid;
438   FILE *fh;
439
440   pid = getpid ();
441
442   fh = fdopen (fd, "w");
443   if (fh == NULL)
444   {
445     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
446     close(fd);
447     return (-1);
448   }
449
450   fprintf (fh, "%i\n", (int) pid);
451   fclose (fh);
452
453   return (0);
454 } /* }}} int write_pidfile */
455
456 static int remove_pidfile (void) /* {{{ */
457 {
458   char *file;
459   int status;
460
461   file = (config_pid_file != NULL)
462     ? config_pid_file
463     : LOCALSTATEDIR "/run/rrdcached.pid";
464
465   status = unlink (file);
466   if (status == 0)
467     return (0);
468   return (errno);
469 } /* }}} int remove_pidfile */
470
471 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
472 {
473   char *eol;
474
475   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
476                sock->next_read - sock->next_cmd);
477
478   if (eol == NULL)
479   {
480     /* no commands left, move remainder back to front of rbuf */
481     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
482             sock->next_read - sock->next_cmd);
483     sock->next_read -= sock->next_cmd;
484     sock->next_cmd = 0;
485     *len = 0;
486     return NULL;
487   }
488   else
489   {
490     char *cmd = sock->rbuf + sock->next_cmd;
491     *eol = '\0';
492
493     sock->next_cmd = eol - sock->rbuf + 1;
494
495     if (eol > sock->rbuf && *(eol-1) == '\r')
496       *(--eol) = '\0'; /* handle "\r\n" EOL */
497
498     *len = eol - cmd;
499
500     return cmd;
501   }
502
503   /* NOTREACHED */
504   assert(1==0);
505 } /* }}} char *next_cmd */
506
507 /* add the characters directly to the write buffer */
508 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
509 {
510   char *new_buf;
511
512   assert(sock != NULL);
513
514   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
515   if (new_buf == NULL)
516   {
517     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
518     return -1;
519   }
520
521   strncpy(new_buf + sock->wbuf_len, str, len + 1);
522
523   sock->wbuf = new_buf;
524   sock->wbuf_len += len;
525
526   return 0;
527 } /* }}} static int add_to_wbuf */
528
529 /* add the text to the "extra" info that's sent after the status line */
530 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
531 {
532   va_list argp;
533   char buffer[CMD_MAX];
534   int len;
535
536   if (JOURNAL_REPLAY(sock)) return 0;
537   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
538
539   va_start(argp, fmt);
540 #ifdef HAVE_VSNPRINTF
541   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
542 #else
543   len = vsprintf(buffer, fmt, argp);
544 #endif
545   va_end(argp);
546   if (len < 0)
547   {
548     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
549     return -1;
550   }
551
552   return add_to_wbuf(sock, buffer, len);
553 } /* }}} static int add_response_info */
554
555 static int count_lines(char *str) /* {{{ */
556 {
557   int lines = 0;
558
559   if (str != NULL)
560   {
561     while ((str = strchr(str, '\n')) != NULL)
562     {
563       ++lines;
564       ++str;
565     }
566   }
567
568   return lines;
569 } /* }}} static int count_lines */
570
571 /* send the response back to the user.
572  * returns 0 on success, -1 on error
573  * write buffer is always zeroed after this call */
574 static int send_response (listen_socket_t *sock, response_code rc,
575                           char *fmt, ...) /* {{{ */
576 {
577   va_list argp;
578   char buffer[CMD_MAX];
579   int lines;
580   ssize_t wrote;
581   int rclen, len;
582
583   if (JOURNAL_REPLAY(sock)) return rc;
584
585   if (sock->batch_start)
586   {
587     if (rc == RESP_OK)
588       return rc; /* no response on success during BATCH */
589     lines = sock->batch_cmd;
590   }
591   else if (rc == RESP_OK)
592     lines = count_lines(sock->wbuf);
593   else
594     lines = -1;
595
596   rclen = sprintf(buffer, "%d ", lines);
597   va_start(argp, fmt);
598 #ifdef HAVE_VSNPRINTF
599   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
600 #else
601   len = vsprintf(buffer+rclen, fmt, argp);
602 #endif
603   va_end(argp);
604   if (len < 0)
605     return -1;
606
607   len += rclen;
608
609   /* append the result to the wbuf, don't write to the user */
610   if (sock->batch_start)
611     return add_to_wbuf(sock, buffer, len);
612
613   /* first write must be complete */
614   if (len != write(sock->fd, buffer, len))
615   {
616     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
617     return -1;
618   }
619
620   if (sock->wbuf != NULL && rc == RESP_OK)
621   {
622     wrote = 0;
623     while (wrote < sock->wbuf_len)
624     {
625       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
626       if (wb <= 0)
627       {
628         RRDD_LOG(LOG_INFO, "send_response: could not write results");
629         return -1;
630       }
631       wrote += wb;
632     }
633   }
634
635   free(sock->wbuf); sock->wbuf = NULL;
636   sock->wbuf_len = 0;
637
638   return 0;
639 } /* }}} */
640
641 static void wipe_ci_values(cache_item_t *ci, time_t when)
642 {
643   ci->values = NULL;
644   ci->values_num = 0;
645
646   ci->last_flush_time = when;
647   if (config_write_jitter > 0)
648     ci->last_flush_time += (rrd_random() % config_write_jitter);
649 }
650
651 /* remove_from_queue
652  * remove a "cache_item_t" item from the queue.
653  * must hold 'cache_lock' when calling this
654  */
655 static void remove_from_queue(cache_item_t *ci) /* {{{ */
656 {
657   if (ci == NULL) return;
658   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
659
660   if (ci->prev == NULL)
661     cache_queue_head = ci->next; /* reset head */
662   else
663     ci->prev->next = ci->next;
664
665   if (ci->next == NULL)
666     cache_queue_tail = ci->prev; /* reset the tail */
667   else
668     ci->next->prev = ci->prev;
669
670   ci->next = ci->prev = NULL;
671   ci->flags &= ~CI_FLAGS_IN_QUEUE;
672
673   pthread_mutex_lock (&stats_lock);
674   assert (stats_queue_length > 0);
675   stats_queue_length--;
676   pthread_mutex_unlock (&stats_lock);
677
678 } /* }}} static void remove_from_queue */
679
680 /* free the resources associated with the cache_item_t
681  * must hold cache_lock when calling this function
682  */
683 static void *free_cache_item(cache_item_t *ci) /* {{{ */
684 {
685   if (ci == NULL) return NULL;
686
687   remove_from_queue(ci);
688
689   for (size_t i=0; i < ci->values_num; i++)
690     free(ci->values[i]);
691
692   free (ci->values);
693   free (ci->file);
694
695   /* in case anyone is waiting */
696   pthread_cond_broadcast(&ci->flushed);
697   pthread_cond_destroy(&ci->flushed);
698
699   free (ci);
700
701   return NULL;
702 } /* }}} static void *free_cache_item */
703
704 /*
705  * enqueue_cache_item:
706  * `cache_lock' must be acquired before calling this function!
707  */
708 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
709     queue_side_t side)
710 {
711   if (ci == NULL)
712     return (-1);
713
714   if (ci->values_num == 0)
715     return (0);
716
717   if (side == HEAD)
718   {
719     if (cache_queue_head == ci)
720       return 0;
721
722     /* remove if further down in queue */
723     remove_from_queue(ci);
724
725     ci->prev = NULL;
726     ci->next = cache_queue_head;
727     if (ci->next != NULL)
728       ci->next->prev = ci;
729     cache_queue_head = ci;
730
731     if (cache_queue_tail == NULL)
732       cache_queue_tail = cache_queue_head;
733   }
734   else /* (side == TAIL) */
735   {
736     /* We don't move values back in the list.. */
737     if (ci->flags & CI_FLAGS_IN_QUEUE)
738       return (0);
739
740     assert (ci->next == NULL);
741     assert (ci->prev == NULL);
742
743     ci->prev = cache_queue_tail;
744
745     if (cache_queue_tail == NULL)
746       cache_queue_head = ci;
747     else
748       cache_queue_tail->next = ci;
749
750     cache_queue_tail = ci;
751   }
752
753   ci->flags |= CI_FLAGS_IN_QUEUE;
754
755   pthread_cond_signal(&queue_cond);
756   pthread_mutex_lock (&stats_lock);
757   stats_queue_length++;
758   pthread_mutex_unlock (&stats_lock);
759
760   return (0);
761 } /* }}} int enqueue_cache_item */
762
763 /*
764  * tree_callback_flush:
765  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
766  * while this is in progress.
767  */
768 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
769     gpointer data)
770 {
771   cache_item_t *ci;
772   callback_flush_data_t *cfd;
773
774   ci = (cache_item_t *) value;
775   cfd = (callback_flush_data_t *) data;
776
777   if (ci->flags & CI_FLAGS_IN_QUEUE)
778     return FALSE;
779
780   if (ci->values_num > 0
781       && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
782   {
783     enqueue_cache_item (ci, TAIL);
784   }
785   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
786       && (ci->values_num <= 0))
787   {
788     assert ((char *) key == ci->file);
789     if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
790     {
791       RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
792       return (FALSE);
793     }
794   }
795
796   return (FALSE);
797 } /* }}} gboolean tree_callback_flush */
798
799 static int flush_old_values (int max_age)
800 {
801   callback_flush_data_t cfd;
802   size_t k;
803
804   memset (&cfd, 0, sizeof (cfd));
805   /* Pass the current time as user data so that we don't need to call
806    * `time' for each node. */
807   cfd.now = time (NULL);
808   cfd.keys = NULL;
809   cfd.keys_num = 0;
810
811   if (max_age > 0)
812     cfd.abs_timeout = cfd.now - max_age;
813   else
814     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
815
816   /* `tree_callback_flush' will return the keys of all values that haven't
817    * been touched in the last `config_flush_interval' seconds in `cfd'.
818    * The char*'s in this array point to the same memory as ci->file, so we
819    * don't need to free them separately. */
820   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
821
822   for (k = 0; k < cfd.keys_num; k++)
823   {
824     /* should never fail, since we have held the cache_lock
825      * the entire time */
826     assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
827   }
828
829   if (cfd.keys != NULL)
830   {
831     free (cfd.keys);
832     cfd.keys = NULL;
833   }
834
835   return (0);
836 } /* int flush_old_values */
837
838 static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
839 {
840   struct timeval now;
841   struct timespec next_flush;
842   int status;
843
844   gettimeofday (&now, NULL);
845   next_flush.tv_sec = now.tv_sec + config_flush_interval;
846   next_flush.tv_nsec = 1000 * now.tv_usec;
847
848   pthread_mutex_lock(&cache_lock);
849
850   while (state == RUNNING)
851   {
852     gettimeofday (&now, NULL);
853     if ((now.tv_sec > next_flush.tv_sec)
854         || ((now.tv_sec == next_flush.tv_sec)
855           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
856     {
857       RRDD_LOG(LOG_DEBUG, "flushing old values");
858
859       /* Determine the time of the next cache flush. */
860       next_flush.tv_sec = now.tv_sec + config_flush_interval;
861
862       /* Flush all values that haven't been written in the last
863        * `config_write_interval' seconds. */
864       flush_old_values (config_write_interval);
865
866       /* unlock the cache while we rotate so we don't block incoming
867        * updates if the fsync() blocks on disk I/O */
868       pthread_mutex_unlock(&cache_lock);
869       journal_rotate();
870       pthread_mutex_lock(&cache_lock);
871     }
872
873     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
874     if (status != 0 && status != ETIMEDOUT)
875     {
876       RRDD_LOG (LOG_ERR, "flush_thread_main: "
877                 "pthread_cond_timedwait returned %i.", status);
878     }
879   }
880
881   if (config_flush_at_shutdown)
882     flush_old_values (-1); /* flush everything */
883
884   state = SHUTDOWN;
885
886   pthread_mutex_unlock(&cache_lock);
887
888   return NULL;
889 } /* void *flush_thread_main */
890
891 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
892 {
893   pthread_mutex_lock (&cache_lock);
894
895   while (state != SHUTDOWN
896          || (cache_queue_head != NULL && config_flush_at_shutdown))
897   {
898     cache_item_t *ci;
899     char *file;
900     char **values;
901     size_t values_num;
902     int status;
903
904     /* Now, check if there's something to store away. If not, wait until
905      * something comes in. */
906     if (cache_queue_head == NULL)
907     {
908       status = pthread_cond_wait (&queue_cond, &cache_lock);
909       if ((status != 0) && (status != ETIMEDOUT))
910       {
911         RRDD_LOG (LOG_ERR, "queue_thread_main: "
912             "pthread_cond_wait returned %i.", status);
913       }
914     }
915
916     /* Check if a value has arrived. This may be NULL if we timed out or there
917      * was an interrupt such as a signal. */
918     if (cache_queue_head == NULL)
919       continue;
920
921     ci = cache_queue_head;
922
923     /* copy the relevant parts */
924     file = strdup (ci->file);
925     if (file == NULL)
926     {
927       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
928       continue;
929     }
930
931     assert(ci->values != NULL);
932     assert(ci->values_num > 0);
933
934     values = ci->values;
935     values_num = ci->values_num;
936
937     wipe_ci_values(ci, time(NULL));
938     remove_from_queue(ci);
939
940     pthread_mutex_unlock (&cache_lock);
941
942     rrd_clear_error ();
943     status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
944     if (status != 0)
945     {
946       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
947           "rrd_update_r (%s) failed with status %i. (%s)",
948           file, status, rrd_get_error());
949     }
950
951     journal_write("wrote", file);
952
953     /* Search again in the tree.  It's possible someone issued a "FORGET"
954      * while we were writing the update values. */
955     pthread_mutex_lock(&cache_lock);
956     ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
957     if (ci)
958       pthread_cond_broadcast(&ci->flushed);
959     pthread_mutex_unlock(&cache_lock);
960
961     if (status == 0)
962     {
963       pthread_mutex_lock (&stats_lock);
964       stats_updates_written++;
965       stats_data_sets_written += values_num;
966       pthread_mutex_unlock (&stats_lock);
967     }
968
969     rrd_free_ptrs((void ***) &values, &values_num);
970     free(file);
971
972     pthread_mutex_lock (&cache_lock);
973   }
974   pthread_mutex_unlock (&cache_lock);
975
976   return (NULL);
977 } /* }}} void *queue_thread_main */
978
979 static int buffer_get_field (char **buffer_ret, /* {{{ */
980     size_t *buffer_size_ret, char **field_ret)
981 {
982   char *buffer;
983   size_t buffer_pos;
984   size_t buffer_size;
985   char *field;
986   size_t field_size;
987   int status;
988
989   buffer = *buffer_ret;
990   buffer_pos = 0;
991   buffer_size = *buffer_size_ret;
992   field = *buffer_ret;
993   field_size = 0;
994
995   if (buffer_size <= 0)
996     return (-1);
997
998   /* This is ensured by `handle_request'. */
999   assert (buffer[buffer_size - 1] == '\0');
1000
1001   status = -1;
1002   while (buffer_pos < buffer_size)
1003   {
1004     /* Check for end-of-field or end-of-buffer */
1005     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1006     {
1007       field[field_size] = 0;
1008       field_size++;
1009       buffer_pos++;
1010       status = 0;
1011       break;
1012     }
1013     /* Handle escaped characters. */
1014     else if (buffer[buffer_pos] == '\\')
1015     {
1016       if (buffer_pos >= (buffer_size - 1))
1017         break;
1018       buffer_pos++;
1019       field[field_size] = buffer[buffer_pos];
1020       field_size++;
1021       buffer_pos++;
1022     }
1023     /* Normal operation */ 
1024     else
1025     {
1026       field[field_size] = buffer[buffer_pos];
1027       field_size++;
1028       buffer_pos++;
1029     }
1030   } /* while (buffer_pos < buffer_size) */
1031
1032   if (status != 0)
1033     return (status);
1034
1035   *buffer_ret = buffer + buffer_pos;
1036   *buffer_size_ret = buffer_size - buffer_pos;
1037   *field_ret = field;
1038
1039   return (0);
1040 } /* }}} int buffer_get_field */
1041
1042 /* if we're restricting writes to the base directory,
1043  * check whether the file falls within the dir
1044  * returns 1 if OK, otherwise 0
1045  */
1046 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1047 {
1048   assert(file != NULL);
1049
1050   if (!config_write_base_only
1051       || JOURNAL_REPLAY(sock)
1052       || config_base_dir == NULL)
1053     return 1;
1054
1055   if (strstr(file, "../") != NULL) goto err;
1056
1057   /* relative paths without "../" are ok */
1058   if (*file != '/') return 1;
1059
1060   /* file must be of the format base + "/" + <1+ char filename> */
1061   if (strlen(file) < _config_base_dir_len + 2) goto err;
1062   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1063   if (*(file + _config_base_dir_len) != '/') goto err;
1064
1065   return 1;
1066
1067 err:
1068   if (sock != NULL && sock->fd >= 0)
1069     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1070
1071   return 0;
1072 } /* }}} static int check_file_access */
1073
1074 /* when using a base dir, convert relative paths to absolute paths.
1075  * if necessary, modifies the "filename" pointer to point
1076  * to the new path created in "tmp".  "tmp" is provided
1077  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1078  *
1079  * this allows us to optimize for the expected case (absolute path)
1080  * with a no-op.
1081  */
1082 static void get_abs_path(char **filename, char *tmp)
1083 {
1084   assert(tmp != NULL);
1085   assert(filename != NULL && *filename != NULL);
1086
1087   if (config_base_dir == NULL || **filename == '/')
1088     return;
1089
1090   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1091   *filename = tmp;
1092 } /* }}} static int get_abs_path */
1093
1094 static int flush_file (const char *filename) /* {{{ */
1095 {
1096   cache_item_t *ci;
1097
1098   pthread_mutex_lock (&cache_lock);
1099
1100   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1101   if (ci == NULL)
1102   {
1103     pthread_mutex_unlock (&cache_lock);
1104     return (ENOENT);
1105   }
1106
1107   if (ci->values_num > 0)
1108   {
1109     /* Enqueue at head */
1110     enqueue_cache_item (ci, HEAD);
1111     pthread_cond_wait(&ci->flushed, &cache_lock);
1112   }
1113
1114   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1115    * may have been purged during our cond_wait() */
1116
1117   pthread_mutex_unlock(&cache_lock);
1118
1119   return (0);
1120 } /* }}} int flush_file */
1121
1122 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1123 {
1124   char *err = "Syntax error.\n";
1125
1126   if (cmd && cmd->syntax)
1127     err = cmd->syntax;
1128
1129   return send_response(sock, RESP_ERR, "Usage: %s", err);
1130 } /* }}} static int syntax_error() */
1131
1132 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1133 {
1134   uint64_t copy_queue_length;
1135   uint64_t copy_updates_received;
1136   uint64_t copy_flush_received;
1137   uint64_t copy_updates_written;
1138   uint64_t copy_data_sets_written;
1139   uint64_t copy_journal_bytes;
1140   uint64_t copy_journal_rotate;
1141
1142   uint64_t tree_nodes_number;
1143   uint64_t tree_depth;
1144
1145   pthread_mutex_lock (&stats_lock);
1146   copy_queue_length       = stats_queue_length;
1147   copy_updates_received   = stats_updates_received;
1148   copy_flush_received     = stats_flush_received;
1149   copy_updates_written    = stats_updates_written;
1150   copy_data_sets_written  = stats_data_sets_written;
1151   copy_journal_bytes      = stats_journal_bytes;
1152   copy_journal_rotate     = stats_journal_rotate;
1153   pthread_mutex_unlock (&stats_lock);
1154
1155   pthread_mutex_lock (&cache_lock);
1156   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1157   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1158   pthread_mutex_unlock (&cache_lock);
1159
1160   add_response_info(sock,
1161                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1162   add_response_info(sock,
1163                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1164   add_response_info(sock,
1165                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1166   add_response_info(sock,
1167                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1168   add_response_info(sock,
1169                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1170   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1171   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1172   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1173   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1174
1175   send_response(sock, RESP_OK, "Statistics follow\n");
1176
1177   return (0);
1178 } /* }}} int handle_request_stats */
1179
1180 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1181 {
1182   char *file, file_tmp[PATH_MAX];
1183   int status;
1184
1185   status = buffer_get_field (&buffer, &buffer_size, &file);
1186   if (status != 0)
1187   {
1188     return syntax_error(sock,cmd);
1189   }
1190   else
1191   {
1192     pthread_mutex_lock(&stats_lock);
1193     stats_flush_received++;
1194     pthread_mutex_unlock(&stats_lock);
1195
1196     get_abs_path(&file, file_tmp);
1197     if (!check_file_access(file, sock)) return 0;
1198
1199     status = flush_file (file);
1200     if (status == 0)
1201       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1202     else if (status == ENOENT)
1203     {
1204       /* no file in our tree; see whether it exists at all */
1205       struct stat statbuf;
1206
1207       memset(&statbuf, 0, sizeof(statbuf));
1208       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1209         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1210       else
1211         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1212     }
1213     else if (status < 0)
1214       return send_response(sock, RESP_ERR, "Internal error.\n");
1215     else
1216       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1217   }
1218
1219   /* NOTREACHED */
1220   assert(1==0);
1221 } /* }}} int handle_request_flush */
1222
1223 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1224 {
1225   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1226
1227   pthread_mutex_lock(&cache_lock);
1228   flush_old_values(-1);
1229   pthread_mutex_unlock(&cache_lock);
1230
1231   return send_response(sock, RESP_OK, "Started flush.\n");
1232 } /* }}} static int handle_request_flushall */
1233
1234 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1235 {
1236   int status;
1237   char *file, file_tmp[PATH_MAX];
1238   cache_item_t *ci;
1239
1240   status = buffer_get_field(&buffer, &buffer_size, &file);
1241   if (status != 0)
1242     return syntax_error(sock,cmd);
1243
1244   get_abs_path(&file, file_tmp);
1245
1246   pthread_mutex_lock(&cache_lock);
1247   ci = g_tree_lookup(cache_tree, file);
1248   if (ci == NULL)
1249   {
1250     pthread_mutex_unlock(&cache_lock);
1251     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1252   }
1253
1254   for (size_t i=0; i < ci->values_num; i++)
1255     add_response_info(sock, "%s\n", ci->values[i]);
1256
1257   pthread_mutex_unlock(&cache_lock);
1258   return send_response(sock, RESP_OK, "updates pending\n");
1259 } /* }}} static int handle_request_pending */
1260
1261 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1262 {
1263   int status;
1264   gboolean found;
1265   char *file, file_tmp[PATH_MAX];
1266
1267   status = buffer_get_field(&buffer, &buffer_size, &file);
1268   if (status != 0)
1269     return syntax_error(sock,cmd);
1270
1271   get_abs_path(&file, file_tmp);
1272   if (!check_file_access(file, sock)) return 0;
1273
1274   pthread_mutex_lock(&cache_lock);
1275   found = g_tree_remove(cache_tree, file);
1276   pthread_mutex_unlock(&cache_lock);
1277
1278   if (found == TRUE)
1279   {
1280     if (!JOURNAL_REPLAY(sock))
1281       journal_write("forget", file);
1282
1283     return send_response(sock, RESP_OK, "Gone!\n");
1284   }
1285   else
1286     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1287
1288   /* NOTREACHED */
1289   assert(1==0);
1290 } /* }}} static int handle_request_forget */
1291
1292 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1293 {
1294   cache_item_t *ci;
1295
1296   pthread_mutex_lock(&cache_lock);
1297
1298   ci = cache_queue_head;
1299   while (ci != NULL)
1300   {
1301     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1302     ci = ci->next;
1303   }
1304
1305   pthread_mutex_unlock(&cache_lock);
1306
1307   return send_response(sock, RESP_OK, "in queue.\n");
1308 } /* }}} int handle_request_queue */
1309
1310 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1311 {
1312   char *file, file_tmp[PATH_MAX];
1313   int values_num = 0;
1314   int status;
1315   char orig_buf[CMD_MAX];
1316
1317   cache_item_t *ci;
1318
1319   /* save it for the journal later */
1320   if (!JOURNAL_REPLAY(sock))
1321     strncpy(orig_buf, buffer, buffer_size);
1322
1323   status = buffer_get_field (&buffer, &buffer_size, &file);
1324   if (status != 0)
1325     return syntax_error(sock,cmd);
1326
1327   pthread_mutex_lock(&stats_lock);
1328   stats_updates_received++;
1329   pthread_mutex_unlock(&stats_lock);
1330
1331   get_abs_path(&file, file_tmp);
1332   if (!check_file_access(file, sock)) return 0;
1333
1334   pthread_mutex_lock (&cache_lock);
1335   ci = g_tree_lookup (cache_tree, file);
1336
1337   if (ci == NULL) /* {{{ */
1338   {
1339     struct stat statbuf;
1340     cache_item_t *tmp;
1341
1342     /* don't hold the lock while we setup; stat(2) might block */
1343     pthread_mutex_unlock(&cache_lock);
1344
1345     memset (&statbuf, 0, sizeof (statbuf));
1346     status = stat (file, &statbuf);
1347     if (status != 0)
1348     {
1349       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1350
1351       status = errno;
1352       if (status == ENOENT)
1353         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1354       else
1355         return send_response(sock, RESP_ERR,
1356                              "stat failed with error %i.\n", status);
1357     }
1358     if (!S_ISREG (statbuf.st_mode))
1359       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1360
1361     if (access(file, R_OK|W_OK) != 0)
1362       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1363                            file, rrd_strerror(errno));
1364
1365     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1366     if (ci == NULL)
1367     {
1368       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1369
1370       return send_response(sock, RESP_ERR, "malloc failed.\n");
1371     }
1372     memset (ci, 0, sizeof (cache_item_t));
1373
1374     ci->file = strdup (file);
1375     if (ci->file == NULL)
1376     {
1377       free (ci);
1378       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1379
1380       return send_response(sock, RESP_ERR, "strdup failed.\n");
1381     }
1382
1383     wipe_ci_values(ci, now);
1384     ci->flags = CI_FLAGS_IN_TREE;
1385     pthread_cond_init(&ci->flushed, NULL);
1386
1387     pthread_mutex_lock(&cache_lock);
1388
1389     /* another UPDATE might have added this entry in the meantime */
1390     tmp = g_tree_lookup (cache_tree, file);
1391     if (tmp == NULL)
1392       g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1393     else
1394     {
1395       free_cache_item (ci);
1396       ci = tmp;
1397     }
1398
1399     /* state may have changed while we were unlocked */
1400     if (state == SHUTDOWN)
1401       return -1;
1402   } /* }}} */
1403   assert (ci != NULL);
1404
1405   /* don't re-write updates in replay mode */
1406   if (!JOURNAL_REPLAY(sock))
1407     journal_write("update", orig_buf);
1408
1409   while (buffer_size > 0)
1410   {
1411     char *value;
1412     time_t stamp;
1413     char *eostamp;
1414
1415     status = buffer_get_field (&buffer, &buffer_size, &value);
1416     if (status != 0)
1417     {
1418       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1419       break;
1420     }
1421
1422     /* make sure update time is always moving forward */
1423     stamp = strtol(value, &eostamp, 10);
1424     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1425     {
1426       pthread_mutex_unlock(&cache_lock);
1427       return send_response(sock, RESP_ERR,
1428                            "Cannot find timestamp in '%s'!\n", value);
1429     }
1430     else if (stamp <= ci->last_update_stamp)
1431     {
1432       pthread_mutex_unlock(&cache_lock);
1433       return send_response(sock, RESP_ERR,
1434                            "illegal attempt to update using time %ld when last"
1435                            " update time is %ld (minimum one second step)\n",
1436                            stamp, ci->last_update_stamp);
1437     }
1438     else
1439       ci->last_update_stamp = stamp;
1440
1441     if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
1442     {
1443       RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1444       continue;
1445     }
1446
1447     values_num++;
1448   }
1449
1450   if (((now - ci->last_flush_time) >= config_write_interval)
1451       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1452       && (ci->values_num > 0))
1453   {
1454     enqueue_cache_item (ci, TAIL);
1455   }
1456
1457   pthread_mutex_unlock (&cache_lock);
1458
1459   if (values_num < 1)
1460     return send_response(sock, RESP_ERR, "No values updated.\n");
1461   else
1462     return send_response(sock, RESP_OK,
1463                          "errors, enqueued %i value(s).\n", values_num);
1464
1465   /* NOTREACHED */
1466   assert(1==0);
1467
1468 } /* }}} int handle_request_update */
1469
1470 /* we came across a "WROTE" entry during journal replay.
1471  * throw away any values that we have accumulated for this file
1472  */
1473 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1474 {
1475   cache_item_t *ci;
1476   const char *file = buffer;
1477
1478   pthread_mutex_lock(&cache_lock);
1479
1480   ci = g_tree_lookup(cache_tree, file);
1481   if (ci == NULL)
1482   {
1483     pthread_mutex_unlock(&cache_lock);
1484     return (0);
1485   }
1486
1487   if (ci->values)
1488     rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1489
1490   wipe_ci_values(ci, now);
1491   remove_from_queue(ci);
1492
1493   pthread_mutex_unlock(&cache_lock);
1494   return (0);
1495 } /* }}} int handle_request_wrote */
1496
1497 /* start "BATCH" processing */
1498 static int batch_start (HANDLER_PROTO) /* {{{ */
1499 {
1500   int status;
1501   if (sock->batch_start)
1502     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1503
1504   status = send_response(sock, RESP_OK,
1505                          "Go ahead.  End with dot '.' on its own line.\n");
1506   sock->batch_start = time(NULL);
1507   sock->batch_cmd = 0;
1508
1509   return status;
1510 } /* }}} static int batch_start */
1511
1512 /* finish "BATCH" processing and return results to the client */
1513 static int batch_done (HANDLER_PROTO) /* {{{ */
1514 {
1515   assert(sock->batch_start);
1516   sock->batch_start = 0;
1517   sock->batch_cmd  = 0;
1518   return send_response(sock, RESP_OK, "errors\n");
1519 } /* }}} static int batch_done */
1520
1521 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1522 {
1523   return -1;
1524 } /* }}} static int handle_request_quit */
1525
1526 static command_t list_of_commands[] = { /* {{{ */
1527   {
1528     "UPDATE",
1529     handle_request_update,
1530     CMD_CONTEXT_ANY,
1531     "UPDATE <filename> <values> [<values> ...]\n"
1532     ,
1533     "Adds the given file to the internal cache if it is not yet known and\n"
1534     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1535     "for details.\n"
1536     "\n"
1537     "Each <values> has the following form:\n"
1538     "  <values> = <time>:<value>[:<value>[...]]\n"
1539     "See the rrdupdate(1) manpage for details.\n"
1540   },
1541   {
1542     "WROTE",
1543     handle_request_wrote,
1544     CMD_CONTEXT_JOURNAL,
1545     NULL,
1546     NULL
1547   },
1548   {
1549     "FLUSH",
1550     handle_request_flush,
1551     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1552     "FLUSH <filename>\n"
1553     ,
1554     "Adds the given filename to the head of the update queue and returns\n"
1555     "after it has been dequeued.\n"
1556   },
1557   {
1558     "FLUSHALL",
1559     handle_request_flushall,
1560     CMD_CONTEXT_CLIENT,
1561     "FLUSHALL\n"
1562     ,
1563     "Triggers writing of all pending updates.  Returns immediately.\n"
1564   },
1565   {
1566     "PENDING",
1567     handle_request_pending,
1568     CMD_CONTEXT_CLIENT,
1569     "PENDING <filename>\n"
1570     ,
1571     "Shows any 'pending' updates for a file, in order.\n"
1572     "The updates shown have not yet been written to the underlying RRD file.\n"
1573   },
1574   {
1575     "FORGET",
1576     handle_request_forget,
1577     CMD_CONTEXT_ANY,
1578     "FORGET <filename>\n"
1579     ,
1580     "Removes the file completely from the cache.\n"
1581     "Any pending updates for the file will be lost.\n"
1582   },
1583   {
1584     "QUEUE",
1585     handle_request_queue,
1586     CMD_CONTEXT_CLIENT,
1587     "QUEUE\n"
1588     ,
1589         "Shows all files in the output queue.\n"
1590     "The output is zero or more lines in the following format:\n"
1591     "(where <num_vals> is the number of values to be written)\n"
1592     "\n"
1593     "<num_vals> <filename>\n"
1594   },
1595   {
1596     "STATS",
1597     handle_request_stats,
1598     CMD_CONTEXT_CLIENT,
1599     "STATS\n"
1600     ,
1601     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1602     "a description of the values.\n"
1603   },
1604   {
1605     "HELP",
1606     handle_request_help,
1607     CMD_CONTEXT_CLIENT,
1608     "HELP [<command>]\n",
1609     NULL, /* special! */
1610   },
1611   {
1612     "BATCH",
1613     batch_start,
1614     CMD_CONTEXT_CLIENT,
1615     "BATCH\n"
1616     ,
1617     "The 'BATCH' command permits the client to initiate a bulk load\n"
1618     "   of commands to rrdcached.\n"
1619     "\n"
1620     "Usage:\n"
1621     "\n"
1622     "    client: BATCH\n"
1623     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1624     "    client: command #1\n"
1625     "    client: command #2\n"
1626     "    client: ... and so on\n"
1627     "    client: .\n"
1628     "    server: 2 errors\n"
1629     "    server: 7 message for command #7\n"
1630     "    server: 9 message for command #9\n"
1631     "\n"
1632     "For more information, consult the rrdcached(1) documentation.\n"
1633   },
1634   {
1635     ".",   /* BATCH terminator */
1636     batch_done,
1637     CMD_CONTEXT_BATCH,
1638     NULL,
1639     NULL
1640   },
1641   {
1642     "QUIT",
1643     handle_request_quit,
1644     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1645     "QUIT\n"
1646     ,
1647     "Disconnect from rrdcached.\n"
1648   }
1649 }; /* }}} command_t list_of_commands[] */
1650 static size_t list_of_commands_len = sizeof (list_of_commands)
1651   / sizeof (list_of_commands[0]);
1652
1653 static command_t *find_command(char *cmd)
1654 {
1655   size_t i;
1656
1657   for (i = 0; i < list_of_commands_len; i++)
1658     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1659       return (&list_of_commands[i]);
1660   return NULL;
1661 }
1662
1663 /* We currently use the index in the `list_of_commands' array as a bit position
1664  * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
1665  * outside these functions so that switching to a more elegant storage method
1666  * is easily possible. */
1667 static ssize_t find_command_index (const char *cmd) /* {{{ */
1668 {
1669   size_t i;
1670
1671   for (i = 0; i < list_of_commands_len; i++)
1672     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1673       return ((ssize_t) i);
1674   return (-1);
1675 } /* }}} ssize_t find_command_index */
1676
1677 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
1678     const char *cmd)
1679 {
1680   ssize_t i;
1681
1682   if (JOURNAL_REPLAY(sock))
1683     return (1);
1684
1685   if (cmd == NULL)
1686     return (-1);
1687
1688   if ((strcasecmp ("QUIT", cmd) == 0)
1689       || (strcasecmp ("HELP", cmd) == 0))
1690     return (1);
1691   else if (strcmp (".", cmd) == 0)
1692     cmd = "BATCH";
1693
1694   i = find_command_index (cmd);
1695   if (i < 0)
1696     return (-1);
1697   assert (i < 32);
1698
1699   if ((sock->permissions & (1 << i)) != 0)
1700     return (1);
1701   return (0);
1702 } /* }}} int socket_permission_check */
1703
1704 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
1705     const char *cmd)
1706 {
1707   ssize_t i;
1708
1709   i = find_command_index (cmd);
1710   if (i < 0)
1711     return (-1);
1712   assert (i < 32);
1713
1714   sock->permissions |= (1 << i);
1715   return (0);
1716 } /* }}} int socket_permission_add */
1717
1718 /* check whether commands are received in the expected context */
1719 static int command_check_context(listen_socket_t *sock, command_t *cmd)
1720 {
1721   if (JOURNAL_REPLAY(sock))
1722     return (cmd->context & CMD_CONTEXT_JOURNAL);
1723   else if (sock->batch_start)
1724     return (cmd->context & CMD_CONTEXT_BATCH);
1725   else
1726     return (cmd->context & CMD_CONTEXT_CLIENT);
1727
1728   /* NOTREACHED */
1729   assert(1==0);
1730 }
1731
1732 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1733 {
1734   int status;
1735   char *cmd_str;
1736   char *resp_txt;
1737   command_t *help = NULL;
1738
1739   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1740   if (status == 0)
1741     help = find_command(cmd_str);
1742
1743   if (help && (help->syntax || help->help))
1744   {
1745     char tmp[CMD_MAX];
1746
1747     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1748     resp_txt = tmp;
1749
1750     if (help->syntax)
1751       add_response_info(sock, "Usage: %s\n", help->syntax);
1752
1753     if (help->help)
1754       add_response_info(sock, "%s\n", help->help);
1755   }
1756   else
1757   {
1758     size_t i;
1759
1760     resp_txt = "Command overview\n";
1761
1762     for (i = 0; i < list_of_commands_len; i++)
1763     {
1764       if (list_of_commands[i].syntax == NULL)
1765         continue;
1766       add_response_info (sock, "%s", list_of_commands[i].syntax);
1767     }
1768   }
1769
1770   return send_response(sock, RESP_OK, resp_txt);
1771 } /* }}} int handle_request_help */
1772
1773 static int handle_request (DISPATCH_PROTO) /* {{{ */
1774 {
1775   char *buffer_ptr = buffer;
1776   char *cmd_str = NULL;
1777   command_t *cmd = NULL;
1778   int status;
1779
1780   assert (buffer[buffer_size - 1] == '\0');
1781
1782   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1783   if (status != 0)
1784   {
1785     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1786     return (-1);
1787   }
1788
1789   if (sock != NULL && sock->batch_start)
1790     sock->batch_cmd++;
1791
1792   cmd = find_command(cmd_str);
1793   if (!cmd)
1794     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1795
1796   if (!socket_permission_check (sock, cmd->cmd))
1797     return send_response(sock, RESP_ERR, "Permission denied.\n");
1798
1799   if (!command_check_context(sock, cmd))
1800     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1801
1802   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1803 } /* }}} int handle_request */
1804
1805 static void journal_set_free (journal_set *js) /* {{{ */
1806 {
1807   if (js == NULL)
1808     return;
1809
1810   rrd_free_ptrs((void ***) &js->files, &js->files_num);
1811
1812   free(js);
1813 } /* }}} journal_set_free */
1814
1815 static void journal_set_remove (journal_set *js) /* {{{ */
1816 {
1817   if (js == NULL)
1818     return;
1819
1820   for (uint i=0; i < js->files_num; i++)
1821   {
1822     RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
1823     unlink(js->files[i]);
1824   }
1825 } /* }}} journal_set_remove */
1826
1827 /* close current journal file handle.
1828  * MUST hold journal_lock before calling */
1829 static void journal_close(void) /* {{{ */
1830 {
1831   if (journal_fh != NULL)
1832   {
1833     if (fclose(journal_fh) != 0)
1834       RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
1835   }
1836
1837   journal_fh = NULL;
1838   journal_size = 0;
1839 } /* }}} journal_close */
1840
1841 /* MUST hold journal_lock before calling */
1842 static void journal_new_file(void) /* {{{ */
1843 {
1844   struct timeval now;
1845   int  new_fd;
1846   char new_file[PATH_MAX + 1];
1847
1848   assert(journal_dir != NULL);
1849   assert(journal_cur != NULL);
1850
1851   journal_close();
1852
1853   gettimeofday(&now, NULL);
1854   /* this format assures that the files sort in strcmp() order */
1855   snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
1856            journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
1857
1858   new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
1859                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1860   if (new_fd < 0)
1861     goto error;
1862
1863   journal_fh = fdopen(new_fd, "a");
1864   if (journal_fh == NULL)
1865     goto error;
1866
1867   journal_size = ftell(journal_fh);
1868   RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
1869
1870   /* record the file in the journal set */
1871   rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
1872
1873   return;
1874
1875 error:
1876   RRDD_LOG(LOG_CRIT,
1877            "JOURNALING DISABLED: Error while trying to create %s : %s",
1878            new_file, rrd_strerror(errno));
1879   RRDD_LOG(LOG_CRIT,
1880            "JOURNALING DISABLED: All values will be flushed at shutdown");
1881
1882   close(new_fd);
1883   config_flush_at_shutdown = 1;
1884
1885 } /* }}} journal_new_file */
1886
1887 /* MUST NOT hold journal_lock before calling this */
1888 static void journal_rotate(void) /* {{{ */
1889 {
1890   journal_set *old_js = NULL;
1891
1892   if (journal_dir == NULL)
1893     return;
1894
1895   RRDD_LOG(LOG_DEBUG, "rotating journals");
1896
1897   pthread_mutex_lock(&stats_lock);
1898   ++stats_journal_rotate;
1899   pthread_mutex_unlock(&stats_lock);
1900
1901   pthread_mutex_lock(&journal_lock);
1902
1903   journal_close();
1904
1905   /* rotate the journal sets */
1906   old_js = journal_old;
1907   journal_old = journal_cur;
1908   journal_cur = calloc(1, sizeof(journal_set));
1909
1910   if (journal_cur != NULL)
1911     journal_new_file();
1912   else
1913     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
1914
1915   pthread_mutex_unlock(&journal_lock);
1916
1917   journal_set_remove(old_js);
1918   journal_set_free  (old_js);
1919
1920 } /* }}} static void journal_rotate */
1921
1922 /* MUST hold journal_lock when calling */
1923 static void journal_done(void) /* {{{ */
1924 {
1925   if (journal_cur == NULL)
1926     return;
1927
1928   journal_close();
1929
1930   if (config_flush_at_shutdown)
1931   {
1932     RRDD_LOG(LOG_INFO, "removing journals");
1933     journal_set_remove(journal_old);
1934     journal_set_remove(journal_cur);
1935   }
1936   else
1937   {
1938     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1939              "journals will be used at next startup");
1940   }
1941
1942   journal_set_free(journal_cur);
1943   journal_set_free(journal_old);
1944   free(journal_dir);
1945
1946 } /* }}} static void journal_done */
1947
1948 static int journal_write(char *cmd, char *args) /* {{{ */
1949 {
1950   int chars;
1951
1952   if (journal_fh == NULL)
1953     return 0;
1954
1955   pthread_mutex_lock(&journal_lock);
1956   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1957   journal_size += chars;
1958
1959   if (journal_size > JOURNAL_MAX)
1960     journal_new_file();
1961
1962   pthread_mutex_unlock(&journal_lock);
1963
1964   if (chars > 0)
1965   {
1966     pthread_mutex_lock(&stats_lock);
1967     stats_journal_bytes += chars;
1968     pthread_mutex_unlock(&stats_lock);
1969   }
1970
1971   return chars;
1972 } /* }}} static int journal_write */
1973
1974 static int journal_replay (const char *file) /* {{{ */
1975 {
1976   FILE *fh;
1977   int entry_cnt = 0;
1978   int fail_cnt = 0;
1979   uint64_t line = 0;
1980   char entry[CMD_MAX];
1981   time_t now;
1982
1983   if (file == NULL) return 0;
1984
1985   {
1986     char *reason = "unknown error";
1987     int status = 0;
1988     struct stat statbuf;
1989
1990     memset(&statbuf, 0, sizeof(statbuf));
1991     if (stat(file, &statbuf) != 0)
1992     {
1993       reason = "stat error";
1994       status = errno;
1995     }
1996     else if (!S_ISREG(statbuf.st_mode))
1997     {
1998       reason = "not a regular file";
1999       status = EPERM;
2000     }
2001     if (statbuf.st_uid != daemon_uid)
2002     {
2003       reason = "not owned by daemon user";
2004       status = EACCES;
2005     }
2006     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2007     {
2008       reason = "must not be user/group writable";
2009       status = EACCES;
2010     }
2011
2012     if (status != 0)
2013     {
2014       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2015                file, rrd_strerror(status), reason);
2016       return 0;
2017     }
2018   }
2019
2020   fh = fopen(file, "r");
2021   if (fh == NULL)
2022   {
2023     if (errno != ENOENT)
2024       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2025                file, rrd_strerror(errno));
2026     return 0;
2027   }
2028   else
2029     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2030
2031   now = time(NULL);
2032
2033   while(!feof(fh))
2034   {
2035     size_t entry_len;
2036
2037     ++line;
2038     if (fgets(entry, sizeof(entry), fh) == NULL)
2039       break;
2040     entry_len = strlen(entry);
2041
2042     /* check \n termination in case journal writing crashed mid-line */
2043     if (entry_len == 0)
2044       continue;
2045     else if (entry[entry_len - 1] != '\n')
2046     {
2047       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2048       ++fail_cnt;
2049       continue;
2050     }
2051
2052     entry[entry_len - 1] = '\0';
2053
2054     if (handle_request(NULL, now, entry, entry_len) == 0)
2055       ++entry_cnt;
2056     else
2057       ++fail_cnt;
2058   }
2059
2060   fclose(fh);
2061
2062   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2063            entry_cnt, fail_cnt);
2064
2065   return entry_cnt > 0 ? 1 : 0;
2066 } /* }}} static int journal_replay */
2067
2068 static int journal_sort(const void *v1, const void *v2)
2069 {
2070   char **jn1 = (char **) v1;
2071   char **jn2 = (char **) v2;
2072
2073   return strcmp(*jn1,*jn2);
2074 }
2075
2076 static void journal_init(void) /* {{{ */
2077 {
2078   int had_journal = 0;
2079   DIR *dir;
2080   struct dirent *dent;
2081   char path[PATH_MAX+1];
2082
2083   if (journal_dir == NULL) return;
2084
2085   pthread_mutex_lock(&journal_lock);
2086
2087   journal_cur = calloc(1, sizeof(journal_set));
2088   if (journal_cur == NULL)
2089   {
2090     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2091     return;
2092   }
2093
2094   RRDD_LOG(LOG_INFO, "checking for journal files");
2095
2096   /* Handle old journal files during transition.  This gives them the
2097    * correct sort order.  TODO: remove after first release
2098    */
2099   {
2100     char old_path[PATH_MAX+1];
2101     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2102     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2103     rename(old_path, path);
2104
2105     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE        );
2106     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2107     rename(old_path, path);
2108   }
2109
2110   dir = opendir(journal_dir);
2111   while ((dent = readdir(dir)) != NULL)
2112   {
2113     /* looks like a journal file? */
2114     if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2115       continue;
2116
2117     snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2118
2119     if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2120     {
2121       RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2122                dent->d_name);
2123       break;
2124     }
2125   }
2126   closedir(dir);
2127
2128   qsort(journal_cur->files, journal_cur->files_num,
2129         sizeof(journal_cur->files[0]), journal_sort);
2130
2131   for (uint i=0; i < journal_cur->files_num; i++)
2132     had_journal += journal_replay(journal_cur->files[i]);
2133
2134   journal_new_file();
2135
2136   /* it must have been a crash.  start a flush */
2137   if (had_journal && config_flush_at_shutdown)
2138     flush_old_values(-1);
2139
2140   pthread_mutex_unlock(&journal_lock);
2141
2142   RRDD_LOG(LOG_INFO, "journal processing complete");
2143
2144 } /* }}} static void journal_init */
2145
2146 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2147 {
2148   assert(sock != NULL);
2149
2150   free(sock->rbuf);  sock->rbuf = NULL;
2151   free(sock->wbuf);  sock->wbuf = NULL;
2152   free(sock);
2153 } /* }}} void free_listen_socket */
2154
2155 static void close_connection(listen_socket_t *sock) /* {{{ */
2156 {
2157   if (sock->fd >= 0)
2158   {
2159     close(sock->fd);
2160     sock->fd = -1;
2161   }
2162
2163   free_listen_socket(sock);
2164
2165 } /* }}} void close_connection */
2166
2167 static void *connection_thread_main (void *args) /* {{{ */
2168 {
2169   listen_socket_t *sock;
2170   int fd;
2171
2172   sock = (listen_socket_t *) args;
2173   fd = sock->fd;
2174
2175   /* init read buffers */
2176   sock->next_read = sock->next_cmd = 0;
2177   sock->rbuf = malloc(RBUF_SIZE);
2178   if (sock->rbuf == NULL)
2179   {
2180     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2181     close_connection(sock);
2182     return NULL;
2183   }
2184
2185   pthread_mutex_lock (&connection_threads_lock);
2186   connection_threads_num++;
2187   pthread_mutex_unlock (&connection_threads_lock);
2188
2189   while (state == RUNNING)
2190   {
2191     char *cmd;
2192     ssize_t cmd_len;
2193     ssize_t rbytes;
2194     time_t now;
2195
2196     struct pollfd pollfd;
2197     int status;
2198
2199     pollfd.fd = fd;
2200     pollfd.events = POLLIN | POLLPRI;
2201     pollfd.revents = 0;
2202
2203     status = poll (&pollfd, 1, /* timeout = */ 500);
2204     if (state != RUNNING)
2205       break;
2206     else if (status == 0) /* timeout */
2207       continue;
2208     else if (status < 0) /* error */
2209     {
2210       status = errno;
2211       if (status != EINTR)
2212         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2213       continue;
2214     }
2215
2216     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2217       break;
2218     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2219     {
2220       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2221           "poll(2) returned something unexpected: %#04hx",
2222           pollfd.revents);
2223       break;
2224     }
2225
2226     rbytes = read(fd, sock->rbuf + sock->next_read,
2227                   RBUF_SIZE - sock->next_read);
2228     if (rbytes < 0)
2229     {
2230       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2231       break;
2232     }
2233     else if (rbytes == 0)
2234       break; /* eof */
2235
2236     sock->next_read += rbytes;
2237
2238     if (sock->batch_start)
2239       now = sock->batch_start;
2240     else
2241       now = time(NULL);
2242
2243     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2244     {
2245       status = handle_request (sock, now, cmd, cmd_len+1);
2246       if (status != 0)
2247         goto out_close;
2248     }
2249   }
2250
2251 out_close:
2252   close_connection(sock);
2253
2254   /* Remove this thread from the connection threads list */
2255   pthread_mutex_lock (&connection_threads_lock);
2256   connection_threads_num--;
2257   if (connection_threads_num <= 0)
2258     pthread_cond_broadcast(&connection_threads_done);
2259   pthread_mutex_unlock (&connection_threads_lock);
2260
2261   return (NULL);
2262 } /* }}} void *connection_thread_main */
2263
2264 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2265 {
2266   int fd;
2267   struct sockaddr_un sa;
2268   listen_socket_t *temp;
2269   int status;
2270   const char *path;
2271   char *path_copy, *dir;
2272
2273   path = sock->addr;
2274   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2275     path += strlen("unix:");
2276
2277   /* dirname may modify its argument */
2278   path_copy = strdup(path);
2279   if (path_copy == NULL)
2280   {
2281     fprintf(stderr, "rrdcached: strdup(): %s\n",
2282         rrd_strerror(errno));
2283     return (-1);
2284   }
2285
2286   dir = dirname(path_copy);
2287   if (rrd_mkdir_p(dir, 0777) != 0)
2288   {
2289     fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2290         dir, rrd_strerror(errno));
2291     return (-1);
2292   }
2293
2294   free(path_copy);
2295
2296   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2297       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2298   if (temp == NULL)
2299   {
2300     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2301     return (-1);
2302   }
2303   listen_fds = temp;
2304   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2305
2306   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2307   if (fd < 0)
2308   {
2309     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2310              rrd_strerror(errno));
2311     return (-1);
2312   }
2313
2314   memset (&sa, 0, sizeof (sa));
2315   sa.sun_family = AF_UNIX;
2316   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2317
2318   /* if we've gotten this far, we own the pid file.  any daemon started
2319    * with the same args must not be alive.  therefore, ensure that we can
2320    * create the socket...
2321    */
2322   unlink(path);
2323
2324   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2325   if (status != 0)
2326   {
2327     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2328              path, rrd_strerror(errno));
2329     close (fd);
2330     return (-1);
2331   }
2332
2333   /* tweak the sockets group ownership */
2334   if (sock->socket_group != (gid_t)-1)
2335   {
2336     if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2337          (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2338     {
2339       fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2340     }
2341   }
2342
2343   if (sock->socket_permissions != (mode_t)-1)
2344   {
2345     if (chmod(path, sock->socket_permissions) != 0)
2346       fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2347           (unsigned int)sock->socket_permissions, strerror(errno));
2348   }
2349
2350   status = listen (fd, /* backlog = */ 10);
2351   if (status != 0)
2352   {
2353     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2354              path, rrd_strerror(errno));
2355     close (fd);
2356     unlink (path);
2357     return (-1);
2358   }
2359
2360   listen_fds[listen_fds_num].fd = fd;
2361   listen_fds[listen_fds_num].family = PF_UNIX;
2362   strncpy(listen_fds[listen_fds_num].addr, path,
2363           sizeof (listen_fds[listen_fds_num].addr) - 1);
2364   listen_fds_num++;
2365
2366   return (0);
2367 } /* }}} int open_listen_socket_unix */
2368
2369 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2370 {
2371   struct addrinfo ai_hints;
2372   struct addrinfo *ai_res;
2373   struct addrinfo *ai_ptr;
2374   char addr_copy[NI_MAXHOST];
2375   char *addr;
2376   char *port;
2377   int status;
2378
2379   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2380   addr_copy[sizeof (addr_copy) - 1] = 0;
2381   addr = addr_copy;
2382
2383   memset (&ai_hints, 0, sizeof (ai_hints));
2384   ai_hints.ai_flags = 0;
2385 #ifdef AI_ADDRCONFIG
2386   ai_hints.ai_flags |= AI_ADDRCONFIG;
2387 #endif
2388   ai_hints.ai_family = AF_UNSPEC;
2389   ai_hints.ai_socktype = SOCK_STREAM;
2390
2391   port = NULL;
2392   if (*addr == '[') /* IPv6+port format */
2393   {
2394     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2395     addr++;
2396
2397     port = strchr (addr, ']');
2398     if (port == NULL)
2399     {
2400       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2401       return (-1);
2402     }
2403     *port = 0;
2404     port++;
2405
2406     if (*port == ':')
2407       port++;
2408     else if (*port == 0)
2409       port = NULL;
2410     else
2411     {
2412       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2413       return (-1);
2414     }
2415   } /* if (*addr == '[') */
2416   else
2417   {
2418     port = rindex(addr, ':');
2419     if (port != NULL)
2420     {
2421       *port = 0;
2422       port++;
2423     }
2424   }
2425   ai_res = NULL;
2426   status = getaddrinfo (addr,
2427                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2428                         &ai_hints, &ai_res);
2429   if (status != 0)
2430   {
2431     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2432              addr, gai_strerror (status));
2433     return (-1);
2434   }
2435
2436   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2437   {
2438     int fd;
2439     listen_socket_t *temp;
2440     int one = 1;
2441
2442     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2443         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2444     if (temp == NULL)
2445     {
2446       fprintf (stderr,
2447                "rrdcached: open_listen_socket_network: realloc failed.\n");
2448       continue;
2449     }
2450     listen_fds = temp;
2451     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2452
2453     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2454     if (fd < 0)
2455     {
2456       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2457                rrd_strerror(errno));
2458       continue;
2459     }
2460
2461     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2462
2463     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2464     if (status != 0)
2465     {
2466       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2467                sock->addr, rrd_strerror(errno));
2468       close (fd);
2469       continue;
2470     }
2471
2472     status = listen (fd, /* backlog = */ 10);
2473     if (status != 0)
2474     {
2475       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2476                sock->addr, rrd_strerror(errno));
2477       close (fd);
2478       freeaddrinfo(ai_res);
2479       return (-1);
2480     }
2481
2482     listen_fds[listen_fds_num].fd = fd;
2483     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2484     listen_fds_num++;
2485   } /* for (ai_ptr) */
2486
2487   freeaddrinfo(ai_res);
2488   return (0);
2489 } /* }}} static int open_listen_socket_network */
2490
2491 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2492 {
2493   assert(sock != NULL);
2494   assert(sock->addr != NULL);
2495
2496   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2497       || sock->addr[0] == '/')
2498     return (open_listen_socket_unix(sock));
2499   else
2500     return (open_listen_socket_network(sock));
2501 } /* }}} int open_listen_socket */
2502
2503 static int close_listen_sockets (void) /* {{{ */
2504 {
2505   size_t i;
2506
2507   for (i = 0; i < listen_fds_num; i++)
2508   {
2509     close (listen_fds[i].fd);
2510
2511     if (listen_fds[i].family == PF_UNIX)
2512       unlink(listen_fds[i].addr);
2513   }
2514
2515   free (listen_fds);
2516   listen_fds = NULL;
2517   listen_fds_num = 0;
2518
2519   return (0);
2520 } /* }}} int close_listen_sockets */
2521
2522 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2523 {
2524   struct pollfd *pollfds;
2525   int pollfds_num;
2526   int status;
2527   int i;
2528
2529   if (listen_fds_num < 1)
2530   {
2531     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2532     return (NULL);
2533   }
2534
2535   pollfds_num = listen_fds_num;
2536   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2537   if (pollfds == NULL)
2538   {
2539     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2540     return (NULL);
2541   }
2542   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2543
2544   RRDD_LOG(LOG_INFO, "listening for connections");
2545
2546   while (state == RUNNING)
2547   {
2548     for (i = 0; i < pollfds_num; i++)
2549     {
2550       pollfds[i].fd = listen_fds[i].fd;
2551       pollfds[i].events = POLLIN | POLLPRI;
2552       pollfds[i].revents = 0;
2553     }
2554
2555     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2556     if (state != RUNNING)
2557       break;
2558     else if (status == 0) /* timeout */
2559       continue;
2560     else if (status < 0) /* error */
2561     {
2562       status = errno;
2563       if (status != EINTR)
2564       {
2565         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2566       }
2567       continue;
2568     }
2569
2570     for (i = 0; i < pollfds_num; i++)
2571     {
2572       listen_socket_t *client_sock;
2573       struct sockaddr_storage client_sa;
2574       socklen_t client_sa_size;
2575       pthread_t tid;
2576       pthread_attr_t attr;
2577
2578       if (pollfds[i].revents == 0)
2579         continue;
2580
2581       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2582       {
2583         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2584             "poll(2) returned something unexpected for listen FD #%i.",
2585             pollfds[i].fd);
2586         continue;
2587       }
2588
2589       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2590       if (client_sock == NULL)
2591       {
2592         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2593         continue;
2594       }
2595       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2596
2597       client_sa_size = sizeof (client_sa);
2598       client_sock->fd = accept (pollfds[i].fd,
2599           (struct sockaddr *) &client_sa, &client_sa_size);
2600       if (client_sock->fd < 0)
2601       {
2602         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2603         free(client_sock);
2604         continue;
2605       }
2606
2607       pthread_attr_init (&attr);
2608       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2609
2610       status = pthread_create (&tid, &attr, connection_thread_main,
2611                                client_sock);
2612       if (status != 0)
2613       {
2614         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2615         close_connection(client_sock);
2616         continue;
2617       }
2618     } /* for (pollfds_num) */
2619   } /* while (state == RUNNING) */
2620
2621   RRDD_LOG(LOG_INFO, "starting shutdown");
2622
2623   close_listen_sockets ();
2624
2625   pthread_mutex_lock (&connection_threads_lock);
2626   while (connection_threads_num > 0)
2627     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2628   pthread_mutex_unlock (&connection_threads_lock);
2629
2630   free(pollfds);
2631
2632   return (NULL);
2633 } /* }}} void *listen_thread_main */
2634
2635 static int daemonize (void) /* {{{ */
2636 {
2637   int pid_fd;
2638   char *base_dir;
2639
2640   daemon_uid = geteuid();
2641
2642   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2643   if (pid_fd < 0)
2644     pid_fd = check_pidfile();
2645   if (pid_fd < 0)
2646     return pid_fd;
2647
2648   /* open all the listen sockets */
2649   if (config_listen_address_list_len > 0)
2650   {
2651     for (size_t i = 0; i < config_listen_address_list_len; i++)
2652       open_listen_socket (config_listen_address_list[i]);
2653
2654     rrd_free_ptrs((void ***) &config_listen_address_list,
2655                   &config_listen_address_list_len);
2656   }
2657   else
2658   {
2659     listen_socket_t sock;
2660     memset(&sock, 0, sizeof(sock));
2661     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2662     open_listen_socket (&sock);
2663   }
2664
2665   if (listen_fds_num < 1)
2666   {
2667     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2668     goto error;
2669   }
2670
2671   if (!stay_foreground)
2672   {
2673     pid_t child;
2674
2675     child = fork ();
2676     if (child < 0)
2677     {
2678       fprintf (stderr, "daemonize: fork(2) failed.\n");
2679       goto error;
2680     }
2681     else if (child > 0)
2682       exit(0);
2683
2684     /* Become session leader */
2685     setsid ();
2686
2687     /* Open the first three file descriptors to /dev/null */
2688     close (2);
2689     close (1);
2690     close (0);
2691
2692     open ("/dev/null", O_RDWR);
2693     if (dup(0) == -1 || dup(0) == -1){
2694         RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2695     }
2696   } /* if (!stay_foreground) */
2697
2698   /* Change into the /tmp directory. */
2699   base_dir = (config_base_dir != NULL)
2700     ? config_base_dir
2701     : "/tmp";
2702
2703   if (chdir (base_dir) != 0)
2704   {
2705     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2706     goto error;
2707   }
2708
2709   install_signal_handlers();
2710
2711   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2712   RRDD_LOG(LOG_INFO, "starting up");
2713
2714   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2715                                 (GDestroyNotify) free_cache_item);
2716   if (cache_tree == NULL)
2717   {
2718     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2719     goto error;
2720   }
2721
2722   return write_pidfile (pid_fd);
2723
2724 error:
2725   remove_pidfile();
2726   return -1;
2727 } /* }}} int daemonize */
2728
2729 static int cleanup (void) /* {{{ */
2730 {
2731   pthread_cond_broadcast (&flush_cond);
2732   pthread_join (flush_thread, NULL);
2733
2734   pthread_cond_broadcast (&queue_cond);
2735   for (int i = 0; i < config_queue_threads; i++)
2736     pthread_join (queue_threads[i], NULL);
2737
2738   if (config_flush_at_shutdown)
2739   {
2740     assert(cache_queue_head == NULL);
2741     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2742   }
2743
2744   free(queue_threads);
2745   free(config_base_dir);
2746
2747   pthread_mutex_lock(&cache_lock);
2748   g_tree_destroy(cache_tree);
2749
2750   pthread_mutex_lock(&journal_lock);
2751   journal_done();
2752
2753   RRDD_LOG(LOG_INFO, "goodbye");
2754   closelog ();
2755
2756   remove_pidfile ();
2757   free(config_pid_file);
2758
2759   return (0);
2760 } /* }}} int cleanup */
2761
2762 static int read_options (int argc, char **argv) /* {{{ */
2763 {
2764   int option;
2765   int status = 0;
2766
2767   char **permissions = NULL;
2768   size_t permissions_len = 0;
2769
2770   gid_t  socket_group = (gid_t)-1;
2771   mode_t socket_permissions = (mode_t)-1;
2772
2773   while ((option = getopt(argc, argv, "gl:s:m:P:f:w:z:t:Bb:p:Fj:h?")) != -1)
2774   {
2775     switch (option)
2776     {
2777       case 'g':
2778         stay_foreground=1;
2779         break;
2780
2781       case 'l':
2782       {
2783         listen_socket_t *new;
2784
2785         new = malloc(sizeof(listen_socket_t));
2786         if (new == NULL)
2787         {
2788           fprintf(stderr, "read_options: malloc failed.\n");
2789           return(2);
2790         }
2791         memset(new, 0, sizeof(listen_socket_t));
2792
2793         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2794
2795         /* Add permissions to the socket {{{ */
2796         if (permissions_len != 0)
2797         {
2798           size_t i;
2799           for (i = 0; i < permissions_len; i++)
2800           {
2801             status = socket_permission_add (new, permissions[i]);
2802             if (status != 0)
2803             {
2804               fprintf (stderr, "read_options: Adding permission \"%s\" to "
2805                   "socket failed. Most likely, this permission doesn't "
2806                   "exist. Check your command line.\n", permissions[i]);
2807               status = 4;
2808             }
2809           }
2810         }
2811         else /* if (permissions_len == 0) */
2812         {
2813           /* Add permission for ALL commands to the socket. */
2814           size_t i;
2815           for (i = 0; i < list_of_commands_len; i++)
2816           {
2817             status = socket_permission_add (new, list_of_commands[i].cmd);
2818             if (status != 0)
2819             {
2820               fprintf (stderr, "read_options: Adding permission \"%s\" to "
2821                   "socket failed. This should never happen, ever! Sorry.\n",
2822                   permissions[i]);
2823               status = 4;
2824             }
2825           }
2826         }
2827         /* }}} Done adding permissions. */
2828
2829         new->socket_group = socket_group;
2830         new->socket_permissions = socket_permissions;
2831
2832         if (!rrd_add_ptr((void ***)&config_listen_address_list,
2833                          &config_listen_address_list_len, new))
2834         {
2835           fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2836           return (2);
2837         }
2838       }
2839       break;
2840
2841       /* set socket group permissions */
2842       case 's':
2843       {
2844         gid_t group_gid;
2845         struct group *grp;
2846
2847         group_gid = strtoul(optarg, NULL, 10);
2848         if (errno != EINVAL && group_gid>0)
2849         {
2850           /* we were passed a number */
2851           grp = getgrgid(group_gid);
2852         }
2853         else
2854         {
2855           grp = getgrnam(optarg);
2856         }
2857
2858         if (grp)
2859         {
2860           socket_group = grp->gr_gid;
2861         }
2862         else
2863         {
2864           /* no idea what the user wanted... */
2865           fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
2866           return (5);
2867         }
2868       }
2869       break;
2870
2871       /* set socket file permissions */
2872       case 'm':
2873       {
2874         long  tmp;
2875         char *endptr = NULL;
2876
2877         tmp = strtol (optarg, &endptr, 8);
2878         if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
2879             || (tmp > 07777) || (tmp < 0)) {
2880           fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
2881               optarg);
2882           return (5);
2883         }
2884
2885         socket_permissions = (mode_t)tmp;
2886       }
2887       break;
2888
2889       case 'P':
2890       {
2891         char *optcopy;
2892         char *saveptr;
2893         char *dummy;
2894         char *ptr;
2895
2896         rrd_free_ptrs ((void *) &permissions, &permissions_len);
2897
2898         optcopy = strdup (optarg);
2899         dummy = optcopy;
2900         saveptr = NULL;
2901         while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
2902         {
2903           dummy = NULL;
2904           rrd_add_strdup ((void *) &permissions, &permissions_len, ptr);
2905         }
2906
2907         free (optcopy);
2908       }
2909       break;
2910
2911       case 'f':
2912       {
2913         int temp;
2914
2915         temp = atoi (optarg);
2916         if (temp > 0)
2917           config_flush_interval = temp;
2918         else
2919         {
2920           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2921           status = 3;
2922         }
2923       }
2924       break;
2925
2926       case 'w':
2927       {
2928         int temp;
2929
2930         temp = atoi (optarg);
2931         if (temp > 0)
2932           config_write_interval = temp;
2933         else
2934         {
2935           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2936           status = 2;
2937         }
2938       }
2939       break;
2940
2941       case 'z':
2942       {
2943         int temp;
2944
2945         temp = atoi(optarg);
2946         if (temp > 0)
2947           config_write_jitter = temp;
2948         else
2949         {
2950           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2951           status = 2;
2952         }
2953
2954         break;
2955       }
2956
2957       case 't':
2958       {
2959         int threads;
2960         threads = atoi(optarg);
2961         if (threads >= 1)
2962           config_queue_threads = threads;
2963         else
2964         {
2965           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2966           return 1;
2967         }
2968       }
2969       break;
2970
2971       case 'B':
2972         config_write_base_only = 1;
2973         break;
2974
2975       case 'b':
2976       {
2977         size_t len;
2978         char base_realpath[PATH_MAX];
2979
2980         if (config_base_dir != NULL)
2981           free (config_base_dir);
2982         config_base_dir = strdup (optarg);
2983         if (config_base_dir == NULL)
2984         {
2985           fprintf (stderr, "read_options: strdup failed.\n");
2986           return (3);
2987         }
2988
2989         if (rrd_mkdir_p (config_base_dir, 0777) != 0)
2990         {
2991           fprintf (stderr, "Failed to create base directory '%s': %s\n",
2992               config_base_dir, rrd_strerror (errno));
2993           return (3);
2994         }
2995
2996         /* make sure that the base directory is not resolved via
2997          * symbolic links.  this makes some performance-enhancing
2998          * assumptions possible (we don't have to resolve paths
2999          * that start with a "/")
3000          */
3001         if (realpath(config_base_dir, base_realpath) == NULL)
3002         {
3003           fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3004               "%s\n", config_base_dir, rrd_strerror(errno));
3005           return 5;
3006         }
3007
3008         len = strlen (config_base_dir);
3009         while ((len > 0) && (config_base_dir[len - 1] == '/'))
3010         {
3011           config_base_dir[len - 1] = 0;
3012           len--;
3013         }
3014
3015         if (len < 1)
3016         {
3017           fprintf (stderr, "Invalid base directory: %s\n", optarg);
3018           return (4);
3019         }
3020
3021         _config_base_dir_len = len;
3022
3023         len = strlen (base_realpath);
3024         while ((len > 0) && (base_realpath[len - 1] == '/'))
3025         {
3026           base_realpath[len - 1] = '\0';
3027           len--;
3028         }
3029
3030         if (strncmp(config_base_dir,
3031                          base_realpath, sizeof(base_realpath)) != 0)
3032         {
3033           fprintf(stderr,
3034                   "Base directory (-b) resolved via file system links!\n"
3035                   "Please consult rrdcached '-b' documentation!\n"
3036                   "Consider specifying the real directory (%s)\n",
3037                   base_realpath);
3038           return 5;
3039         }
3040       }
3041       break;
3042
3043       case 'p':
3044       {
3045         if (config_pid_file != NULL)
3046           free (config_pid_file);
3047         config_pid_file = strdup (optarg);
3048         if (config_pid_file == NULL)
3049         {
3050           fprintf (stderr, "read_options: strdup failed.\n");
3051           return (3);
3052         }
3053       }
3054       break;
3055
3056       case 'F':
3057         config_flush_at_shutdown = 1;
3058         break;
3059
3060       case 'j':
3061       {
3062         const char *dir = journal_dir = strdup(optarg);
3063
3064         status = rrd_mkdir_p(dir, 0777);
3065         if (status != 0)
3066         {
3067           fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3068               dir, rrd_strerror(errno));
3069           return 6;
3070         }
3071
3072         if (access(dir, R_OK|W_OK|X_OK) != 0)
3073         {
3074           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3075                   errno ? rrd_strerror(errno) : "");
3076           return 6;
3077         }
3078       }
3079       break;
3080
3081       case 'h':
3082       case '?':
3083         printf ("RRDCacheD %s\n"
3084             "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3085             "\n"
3086             "Usage: rrdcached [options]\n"
3087             "\n"
3088             "Valid options are:\n"
3089             "  -l <address>  Socket address to listen to.\n"
3090             "  -P <perms>    Sets the permissions to assign to all following "
3091                             "sockets\n"
3092             "  -w <seconds>  Interval in which to write data.\n"
3093             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
3094             "  -t <threads>  Number of write threads.\n"
3095             "  -f <seconds>  Interval in which to flush dead data.\n"
3096             "  -p <file>     Location of the PID-file.\n"
3097             "  -b <dir>      Base directory to change to.\n"
3098             "  -B            Restrict file access to paths within -b <dir>\n"
3099             "  -g            Do not fork and run in the foreground.\n"
3100             "  -j <dir>      Directory in which to create the journal files.\n"
3101             "  -F            Always flush all updates at shutdown\n"
3102             "  -s <id|name>  Make socket g+rw to named group\n"
3103             "\n"
3104             "For more information and a detailed description of all options "
3105             "please refer\n"
3106             "to the rrdcached(1) manual page.\n",
3107             VERSION);
3108         status = -1;
3109         break;
3110     } /* switch (option) */
3111   } /* while (getopt) */
3112
3113   /* advise the user when values are not sane */
3114   if (config_flush_interval < 2 * config_write_interval)
3115     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3116             " 2x write interval (-w) !\n");
3117   if (config_write_jitter > config_write_interval)
3118     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3119             " write interval (-w) !\n");
3120
3121   if (config_write_base_only && config_base_dir == NULL)
3122     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3123             "  Consult the rrdcached documentation\n");
3124
3125   if (journal_dir == NULL)
3126     config_flush_at_shutdown = 1;
3127
3128   rrd_free_ptrs ((void *) &permissions, &permissions_len);
3129
3130   return (status);
3131 } /* }}} int read_options */
3132
3133 int main (int argc, char **argv)
3134 {
3135   int status;
3136
3137   status = read_options (argc, argv);
3138   if (status != 0)
3139   {
3140     if (status < 0)
3141       status = 0;
3142     return (status);
3143   }
3144
3145   status = daemonize ();
3146   if (status != 0)
3147   {
3148     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3149     return (1);
3150   }
3151
3152   journal_init();
3153
3154   /* start the queue threads */
3155   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3156   if (queue_threads == NULL)
3157   {
3158     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3159     cleanup();
3160     return (1);
3161   }
3162   for (int i = 0; i < config_queue_threads; i++)
3163   {
3164     memset (&queue_threads[i], 0, sizeof (*queue_threads));
3165     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3166     if (status != 0)
3167     {
3168       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3169       cleanup();
3170       return (1);
3171     }
3172   }
3173
3174   /* start the flush thread */
3175   memset(&flush_thread, 0, sizeof(flush_thread));
3176   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3177   if (status != 0)
3178   {
3179     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3180     cleanup();
3181     return (1);
3182   }
3183
3184   listen_thread_main (NULL);
3185   cleanup ();
3186
3187   return (0);
3188 } /* int main */
3189
3190 /*
3191  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3192  */