The journal files are time-stamped and replayed in order. This allows
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
73
74 #include "rrd.h"
75 #include "rrd_client.h"
76
77 #include <stdlib.h>
78
79 #ifndef WIN32
80 #include <stdint.h>
81 #include <unistd.h>
82 #include <strings.h>
83 #include <inttypes.h>
84 #       include <sys/socket.h>
85
86 #else
87
88 #endif
89 #include <stdio.h>
90 #include <string.h>
91
92 #include <sys/types.h>
93 #include <sys/stat.h>
94 #include <dirent.h>
95 #include <fcntl.h>
96 #include <signal.h>
97 #include <sys/un.h>
98 #include <netdb.h>
99 #include <poll.h>
100 #include <syslog.h>
101 #include <pthread.h>
102 #include <errno.h>
103 #include <assert.h>
104 #include <sys/time.h>
105 #include <time.h>
106
107 #include <glib-2.0/glib.h>
108 /* }}} */
109
110 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
111
112 #ifndef __GNUC__
113 # define __attribute__(x) /**/
114 #endif
115
116 /*
117  * Types
118  */
119 typedef enum
120 {
121   PRIV_LOW,
122   PRIV_HIGH
123 } socket_privilege;
124
125 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
126
127 struct listen_socket_s
128 {
129   int fd;
130   char addr[PATH_MAX + 1];
131   int family;
132   socket_privilege privilege;
133
134   /* state for BATCH processing */
135   time_t batch_start;
136   int batch_cmd;
137
138   /* buffered IO */
139   char *rbuf;
140   off_t next_cmd;
141   off_t next_read;
142
143   char *wbuf;
144   ssize_t wbuf_len;
145 };
146 typedef struct listen_socket_s listen_socket_t;
147
148 struct command;
149 /* note: guard against "unused" warnings in the handlers */
150 #define DISPATCH_PROTO  listen_socket_t *sock   __attribute__((unused)),\
151                         time_t now              __attribute__((unused)),\
152                         char  *buffer           __attribute__((unused)),\
153                         size_t buffer_size      __attribute__((unused))
154
155 #define HANDLER_PROTO   struct command *cmd     __attribute__((unused)),\
156                         DISPATCH_PROTO
157
158 struct command {
159   char   *cmd;
160   int (*handler)(HANDLER_PROTO);
161   socket_privilege min_priv;
162
163   char  context;                /* where we expect to see it */
164 #define CMD_CONTEXT_CLIENT      (1<<0)
165 #define CMD_CONTEXT_BATCH       (1<<1)
166 #define CMD_CONTEXT_JOURNAL     (1<<2)
167 #define CMD_CONTEXT_ANY         (0x7f)
168
169   char *syntax;
170   char *help;
171 };
172
173 struct cache_item_s;
174 typedef struct cache_item_s cache_item_t;
175 struct cache_item_s
176 {
177   char *file;
178   char **values;
179   size_t values_num;
180   time_t last_flush_time;
181   time_t last_update_stamp;
182 #define CI_FLAGS_IN_TREE  (1<<0)
183 #define CI_FLAGS_IN_QUEUE (1<<1)
184   int flags;
185   pthread_cond_t  flushed;
186   cache_item_t *prev;
187   cache_item_t *next;
188 };
189
190 struct callback_flush_data_s
191 {
192   time_t now;
193   time_t abs_timeout;
194   char **keys;
195   size_t keys_num;
196 };
197 typedef struct callback_flush_data_s callback_flush_data_t;
198
199 enum queue_side_e
200 {
201   HEAD,
202   TAIL
203 };
204 typedef enum queue_side_e queue_side_t;
205
206 /* describe a set of journal files */
207 typedef struct {
208   char **files;
209   size_t files_num;
210 } journal_set;
211
212 /* max length of socket command or response */
213 #define CMD_MAX 4096
214 #define RBUF_SIZE (CMD_MAX*2)
215
216 /*
217  * Variables
218  */
219 static int stay_foreground = 0;
220 static uid_t daemon_uid;
221
222 static listen_socket_t *listen_fds = NULL;
223 static size_t listen_fds_num = 0;
224
225 enum {
226   RUNNING,              /* normal operation */
227   FLUSHING,             /* flushing remaining values */
228   SHUTDOWN              /* shutting down */
229 } state = RUNNING;
230
231 static pthread_t *queue_threads;
232 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
233 static int config_queue_threads = 4;
234
235 static pthread_t flush_thread;
236 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
237
238 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
239 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
240 static int connection_threads_num = 0;
241
242 /* Cache stuff */
243 static GTree          *cache_tree = NULL;
244 static cache_item_t   *cache_queue_head = NULL;
245 static cache_item_t   *cache_queue_tail = NULL;
246 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
247
248 static int config_write_interval = 300;
249 static int config_write_jitter   = 0;
250 static int config_flush_interval = 3600;
251 static int config_flush_at_shutdown = 0;
252 static char *config_pid_file = NULL;
253 static char *config_base_dir = NULL;
254 static size_t _config_base_dir_len = 0;
255 static int config_write_base_only = 0;
256
257 static listen_socket_t **config_listen_address_list = NULL;
258 static size_t config_listen_address_list_len = 0;
259
260 static uint64_t stats_queue_length = 0;
261 static uint64_t stats_updates_received = 0;
262 static uint64_t stats_flush_received = 0;
263 static uint64_t stats_updates_written = 0;
264 static uint64_t stats_data_sets_written = 0;
265 static uint64_t stats_journal_bytes = 0;
266 static uint64_t stats_journal_rotate = 0;
267 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
268
269 /* Journaled updates */
270 #define JOURNAL_BASE "rrd.journal"
271 static journal_set *journal_cur = NULL;
272 static journal_set *journal_old = NULL;
273 static char *journal_dir = NULL;
274 static FILE *journal_fh = NULL;         /* current journal file handle */
275 static long  journal_size = 0;          /* current journal size */
276 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
277 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
278 static int journal_write(char *cmd, char *args);
279 static void journal_done(void);
280 static void journal_rotate(void);
281
282 /* prototypes for forward refernces */
283 static int handle_request_help (HANDLER_PROTO);
284
285 /* 
286  * Functions
287  */
288 static void sig_common (const char *sig) /* {{{ */
289 {
290   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
291   state = FLUSHING;
292   pthread_cond_broadcast(&flush_cond);
293   pthread_cond_broadcast(&queue_cond);
294 } /* }}} void sig_common */
295
296 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
297 {
298   sig_common("INT");
299 } /* }}} void sig_int_handler */
300
301 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
302 {
303   sig_common("TERM");
304 } /* }}} void sig_term_handler */
305
306 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
307 {
308   config_flush_at_shutdown = 1;
309   sig_common("USR1");
310 } /* }}} void sig_usr1_handler */
311
312 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
313 {
314   config_flush_at_shutdown = 0;
315   sig_common("USR2");
316 } /* }}} void sig_usr2_handler */
317
318 static void install_signal_handlers(void) /* {{{ */
319 {
320   /* These structures are static, because `sigaction' behaves weird if the are
321    * overwritten.. */
322   static struct sigaction sa_int;
323   static struct sigaction sa_term;
324   static struct sigaction sa_pipe;
325   static struct sigaction sa_usr1;
326   static struct sigaction sa_usr2;
327
328   /* Install signal handlers */
329   memset (&sa_int, 0, sizeof (sa_int));
330   sa_int.sa_handler = sig_int_handler;
331   sigaction (SIGINT, &sa_int, NULL);
332
333   memset (&sa_term, 0, sizeof (sa_term));
334   sa_term.sa_handler = sig_term_handler;
335   sigaction (SIGTERM, &sa_term, NULL);
336
337   memset (&sa_pipe, 0, sizeof (sa_pipe));
338   sa_pipe.sa_handler = SIG_IGN;
339   sigaction (SIGPIPE, &sa_pipe, NULL);
340
341   memset (&sa_pipe, 0, sizeof (sa_usr1));
342   sa_usr1.sa_handler = sig_usr1_handler;
343   sigaction (SIGUSR1, &sa_usr1, NULL);
344
345   memset (&sa_usr2, 0, sizeof (sa_usr2));
346   sa_usr2.sa_handler = sig_usr2_handler;
347   sigaction (SIGUSR2, &sa_usr2, NULL);
348
349 } /* }}} void install_signal_handlers */
350
351 static int open_pidfile(char *action, int oflag) /* {{{ */
352 {
353   int fd;
354   char *file;
355
356   file = (config_pid_file != NULL)
357     ? config_pid_file
358     : LOCALSTATEDIR "/run/rrdcached.pid";
359
360   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
361   if (fd < 0)
362     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
363             action, file, rrd_strerror(errno));
364
365   return(fd);
366 } /* }}} static int open_pidfile */
367
368 /* check existing pid file to see whether a daemon is running */
369 static int check_pidfile(void)
370 {
371   int pid_fd;
372   pid_t pid;
373   char pid_str[16];
374
375   pid_fd = open_pidfile("open", O_RDWR);
376   if (pid_fd < 0)
377     return pid_fd;
378
379   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
380     return -1;
381
382   pid = atoi(pid_str);
383   if (pid <= 0)
384     return -1;
385
386   /* another running process that we can signal COULD be
387    * a competing rrdcached */
388   if (pid != getpid() && kill(pid, 0) == 0)
389   {
390     fprintf(stderr,
391             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
392     close(pid_fd);
393     return -1;
394   }
395
396   lseek(pid_fd, 0, SEEK_SET);
397   if (ftruncate(pid_fd, 0) == -1)
398   {
399     fprintf(stderr,
400             "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
401     close(pid_fd);
402     return -1;
403   }
404
405   fprintf(stderr,
406           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
407           "rrdcached: starting normally.\n", pid);
408
409   return pid_fd;
410 } /* }}} static int check_pidfile */
411
412 static int write_pidfile (int fd) /* {{{ */
413 {
414   pid_t pid;
415   FILE *fh;
416
417   pid = getpid ();
418
419   fh = fdopen (fd, "w");
420   if (fh == NULL)
421   {
422     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
423     close(fd);
424     return (-1);
425   }
426
427   fprintf (fh, "%i\n", (int) pid);
428   fclose (fh);
429
430   return (0);
431 } /* }}} int write_pidfile */
432
433 static int remove_pidfile (void) /* {{{ */
434 {
435   char *file;
436   int status;
437
438   file = (config_pid_file != NULL)
439     ? config_pid_file
440     : LOCALSTATEDIR "/run/rrdcached.pid";
441
442   status = unlink (file);
443   if (status == 0)
444     return (0);
445   return (errno);
446 } /* }}} int remove_pidfile */
447
448 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
449 {
450   char *eol;
451
452   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
453                sock->next_read - sock->next_cmd);
454
455   if (eol == NULL)
456   {
457     /* no commands left, move remainder back to front of rbuf */
458     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
459             sock->next_read - sock->next_cmd);
460     sock->next_read -= sock->next_cmd;
461     sock->next_cmd = 0;
462     *len = 0;
463     return NULL;
464   }
465   else
466   {
467     char *cmd = sock->rbuf + sock->next_cmd;
468     *eol = '\0';
469
470     sock->next_cmd = eol - sock->rbuf + 1;
471
472     if (eol > sock->rbuf && *(eol-1) == '\r')
473       *(--eol) = '\0'; /* handle "\r\n" EOL */
474
475     *len = eol - cmd;
476
477     return cmd;
478   }
479
480   /* NOTREACHED */
481   assert(1==0);
482 }
483
484 /* add the characters directly to the write buffer */
485 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
486 {
487   char *new_buf;
488
489   assert(sock != NULL);
490
491   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
492   if (new_buf == NULL)
493   {
494     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
495     return -1;
496   }
497
498   strncpy(new_buf + sock->wbuf_len, str, len + 1);
499
500   sock->wbuf = new_buf;
501   sock->wbuf_len += len;
502
503   return 0;
504 } /* }}} static int add_to_wbuf */
505
506 /* add the text to the "extra" info that's sent after the status line */
507 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
508 {
509   va_list argp;
510   char buffer[CMD_MAX];
511   int len;
512
513   if (sock == NULL) return 0; /* journal replay mode */
514   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
515
516   va_start(argp, fmt);
517 #ifdef HAVE_VSNPRINTF
518   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
519 #else
520   len = vsprintf(buffer, fmt, argp);
521 #endif
522   va_end(argp);
523   if (len < 0)
524   {
525     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
526     return -1;
527   }
528
529   return add_to_wbuf(sock, buffer, len);
530 } /* }}} static int add_response_info */
531
532 static int count_lines(char *str) /* {{{ */
533 {
534   int lines = 0;
535
536   if (str != NULL)
537   {
538     while ((str = strchr(str, '\n')) != NULL)
539     {
540       ++lines;
541       ++str;
542     }
543   }
544
545   return lines;
546 } /* }}} static int count_lines */
547
548 /* send the response back to the user.
549  * returns 0 on success, -1 on error
550  * write buffer is always zeroed after this call */
551 static int send_response (listen_socket_t *sock, response_code rc,
552                           char *fmt, ...) /* {{{ */
553 {
554   va_list argp;
555   char buffer[CMD_MAX];
556   int lines;
557   ssize_t wrote;
558   int rclen, len;
559
560   if (sock == NULL) return rc;  /* journal replay mode */
561
562   if (sock->batch_start)
563   {
564     if (rc == RESP_OK)
565       return rc; /* no response on success during BATCH */
566     lines = sock->batch_cmd;
567   }
568   else if (rc == RESP_OK)
569     lines = count_lines(sock->wbuf);
570   else
571     lines = -1;
572
573   rclen = sprintf(buffer, "%d ", lines);
574   va_start(argp, fmt);
575 #ifdef HAVE_VSNPRINTF
576   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
577 #else
578   len = vsprintf(buffer+rclen, fmt, argp);
579 #endif
580   va_end(argp);
581   if (len < 0)
582     return -1;
583
584   len += rclen;
585
586   /* append the result to the wbuf, don't write to the user */
587   if (sock->batch_start)
588     return add_to_wbuf(sock, buffer, len);
589
590   /* first write must be complete */
591   if (len != write(sock->fd, buffer, len))
592   {
593     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
594     return -1;
595   }
596
597   if (sock->wbuf != NULL && rc == RESP_OK)
598   {
599     wrote = 0;
600     while (wrote < sock->wbuf_len)
601     {
602       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
603       if (wb <= 0)
604       {
605         RRDD_LOG(LOG_INFO, "send_response: could not write results");
606         return -1;
607       }
608       wrote += wb;
609     }
610   }
611
612   free(sock->wbuf); sock->wbuf = NULL;
613   sock->wbuf_len = 0;
614
615   return 0;
616 } /* }}} */
617
618 static void wipe_ci_values(cache_item_t *ci, time_t when)
619 {
620   ci->values = NULL;
621   ci->values_num = 0;
622
623   ci->last_flush_time = when;
624   if (config_write_jitter > 0)
625     ci->last_flush_time += (rrd_random() % config_write_jitter);
626 }
627
628 /* remove_from_queue
629  * remove a "cache_item_t" item from the queue.
630  * must hold 'cache_lock' when calling this
631  */
632 static void remove_from_queue(cache_item_t *ci) /* {{{ */
633 {
634   if (ci == NULL) return;
635   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
636
637   if (ci->prev == NULL)
638     cache_queue_head = ci->next; /* reset head */
639   else
640     ci->prev->next = ci->next;
641
642   if (ci->next == NULL)
643     cache_queue_tail = ci->prev; /* reset the tail */
644   else
645     ci->next->prev = ci->prev;
646
647   ci->next = ci->prev = NULL;
648   ci->flags &= ~CI_FLAGS_IN_QUEUE;
649
650   pthread_mutex_lock (&stats_lock);
651   assert (stats_queue_length > 0);
652   stats_queue_length--;
653   pthread_mutex_unlock (&stats_lock);
654
655 } /* }}} static void remove_from_queue */
656
657 /* free the resources associated with the cache_item_t
658  * must hold cache_lock when calling this function
659  */
660 static void *free_cache_item(cache_item_t *ci) /* {{{ */
661 {
662   if (ci == NULL) return NULL;
663
664   remove_from_queue(ci);
665
666   for (size_t i=0; i < ci->values_num; i++)
667     free(ci->values[i]);
668
669   free (ci->values);
670   free (ci->file);
671
672   /* in case anyone is waiting */
673   pthread_cond_broadcast(&ci->flushed);
674   pthread_cond_destroy(&ci->flushed);
675
676   free (ci);
677
678   return NULL;
679 } /* }}} static void *free_cache_item */
680
681 /*
682  * enqueue_cache_item:
683  * `cache_lock' must be acquired before calling this function!
684  */
685 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
686     queue_side_t side)
687 {
688   if (ci == NULL)
689     return (-1);
690
691   if (ci->values_num == 0)
692     return (0);
693
694   if (side == HEAD)
695   {
696     if (cache_queue_head == ci)
697       return 0;
698
699     /* remove if further down in queue */
700     remove_from_queue(ci);
701
702     ci->prev = NULL;
703     ci->next = cache_queue_head;
704     if (ci->next != NULL)
705       ci->next->prev = ci;
706     cache_queue_head = ci;
707
708     if (cache_queue_tail == NULL)
709       cache_queue_tail = cache_queue_head;
710   }
711   else /* (side == TAIL) */
712   {
713     /* We don't move values back in the list.. */
714     if (ci->flags & CI_FLAGS_IN_QUEUE)
715       return (0);
716
717     assert (ci->next == NULL);
718     assert (ci->prev == NULL);
719
720     ci->prev = cache_queue_tail;
721
722     if (cache_queue_tail == NULL)
723       cache_queue_head = ci;
724     else
725       cache_queue_tail->next = ci;
726
727     cache_queue_tail = ci;
728   }
729
730   ci->flags |= CI_FLAGS_IN_QUEUE;
731
732   pthread_cond_signal(&queue_cond);
733   pthread_mutex_lock (&stats_lock);
734   stats_queue_length++;
735   pthread_mutex_unlock (&stats_lock);
736
737   return (0);
738 } /* }}} int enqueue_cache_item */
739
740 /*
741  * tree_callback_flush:
742  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
743  * while this is in progress.
744  */
745 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
746     gpointer data)
747 {
748   cache_item_t *ci;
749   callback_flush_data_t *cfd;
750
751   ci = (cache_item_t *) value;
752   cfd = (callback_flush_data_t *) data;
753
754   if (ci->flags & CI_FLAGS_IN_QUEUE)
755     return FALSE;
756
757   if (ci->values_num > 0
758       && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
759   {
760     enqueue_cache_item (ci, TAIL);
761   }
762   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
763       && (ci->values_num <= 0))
764   {
765     assert ((char *) key == ci->file);
766     if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
767     {
768       RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
769       return (FALSE);
770     }
771   }
772
773   return (FALSE);
774 } /* }}} gboolean tree_callback_flush */
775
776 static int flush_old_values (int max_age)
777 {
778   callback_flush_data_t cfd;
779   size_t k;
780
781   memset (&cfd, 0, sizeof (cfd));
782   /* Pass the current time as user data so that we don't need to call
783    * `time' for each node. */
784   cfd.now = time (NULL);
785   cfd.keys = NULL;
786   cfd.keys_num = 0;
787
788   if (max_age > 0)
789     cfd.abs_timeout = cfd.now - max_age;
790   else
791     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
792
793   /* `tree_callback_flush' will return the keys of all values that haven't
794    * been touched in the last `config_flush_interval' seconds in `cfd'.
795    * The char*'s in this array point to the same memory as ci->file, so we
796    * don't need to free them separately. */
797   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
798
799   for (k = 0; k < cfd.keys_num; k++)
800   {
801     /* should never fail, since we have held the cache_lock
802      * the entire time */
803     assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
804   }
805
806   if (cfd.keys != NULL)
807   {
808     free (cfd.keys);
809     cfd.keys = NULL;
810   }
811
812   return (0);
813 } /* int flush_old_values */
814
815 static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
816 {
817   struct timeval now;
818   struct timespec next_flush;
819   int status;
820
821   gettimeofday (&now, NULL);
822   next_flush.tv_sec = now.tv_sec + config_flush_interval;
823   next_flush.tv_nsec = 1000 * now.tv_usec;
824
825   pthread_mutex_lock(&cache_lock);
826
827   while (state == RUNNING)
828   {
829     gettimeofday (&now, NULL);
830     if ((now.tv_sec > next_flush.tv_sec)
831         || ((now.tv_sec == next_flush.tv_sec)
832           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
833     {
834       RRDD_LOG(LOG_DEBUG, "flushing old values");
835
836       /* Determine the time of the next cache flush. */
837       next_flush.tv_sec = now.tv_sec + config_flush_interval;
838
839       /* Flush all values that haven't been written in the last
840        * `config_write_interval' seconds. */
841       flush_old_values (config_write_interval);
842
843       /* unlock the cache while we rotate so we don't block incoming
844        * updates if the fsync() blocks on disk I/O */
845       pthread_mutex_unlock(&cache_lock);
846       journal_rotate();
847       pthread_mutex_lock(&cache_lock);
848     }
849
850     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
851     if (status != 0 && status != ETIMEDOUT)
852     {
853       RRDD_LOG (LOG_ERR, "flush_thread_main: "
854                 "pthread_cond_timedwait returned %i.", status);
855     }
856   }
857
858   if (config_flush_at_shutdown)
859     flush_old_values (-1); /* flush everything */
860
861   state = SHUTDOWN;
862
863   pthread_mutex_unlock(&cache_lock);
864
865   return NULL;
866 } /* void *flush_thread_main */
867
868 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
869 {
870   pthread_mutex_lock (&cache_lock);
871
872   while (state != SHUTDOWN
873          || (cache_queue_head != NULL && config_flush_at_shutdown))
874   {
875     cache_item_t *ci;
876     char *file;
877     char **values;
878     size_t values_num;
879     int status;
880
881     /* Now, check if there's something to store away. If not, wait until
882      * something comes in. */
883     if (cache_queue_head == NULL)
884     {
885       status = pthread_cond_wait (&queue_cond, &cache_lock);
886       if ((status != 0) && (status != ETIMEDOUT))
887       {
888         RRDD_LOG (LOG_ERR, "queue_thread_main: "
889             "pthread_cond_wait returned %i.", status);
890       }
891     }
892
893     /* Check if a value has arrived. This may be NULL if we timed out or there
894      * was an interrupt such as a signal. */
895     if (cache_queue_head == NULL)
896       continue;
897
898     ci = cache_queue_head;
899
900     /* copy the relevant parts */
901     file = strdup (ci->file);
902     if (file == NULL)
903     {
904       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
905       continue;
906     }
907
908     assert(ci->values != NULL);
909     assert(ci->values_num > 0);
910
911     values = ci->values;
912     values_num = ci->values_num;
913
914     wipe_ci_values(ci, time(NULL));
915     remove_from_queue(ci);
916
917     pthread_mutex_unlock (&cache_lock);
918
919     rrd_clear_error ();
920     status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
921     if (status != 0)
922     {
923       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
924           "rrd_update_r (%s) failed with status %i. (%s)",
925           file, status, rrd_get_error());
926     }
927
928     journal_write("wrote", file);
929
930     /* Search again in the tree.  It's possible someone issued a "FORGET"
931      * while we were writing the update values. */
932     pthread_mutex_lock(&cache_lock);
933     ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
934     if (ci)
935       pthread_cond_broadcast(&ci->flushed);
936     pthread_mutex_unlock(&cache_lock);
937
938     if (status == 0)
939     {
940       pthread_mutex_lock (&stats_lock);
941       stats_updates_written++;
942       stats_data_sets_written += values_num;
943       pthread_mutex_unlock (&stats_lock);
944     }
945
946     rrd_free_ptrs((void ***) &values, &values_num);
947     free(file);
948
949     pthread_mutex_lock (&cache_lock);
950   }
951   pthread_mutex_unlock (&cache_lock);
952
953   return (NULL);
954 } /* }}} void *queue_thread_main */
955
956 static int buffer_get_field (char **buffer_ret, /* {{{ */
957     size_t *buffer_size_ret, char **field_ret)
958 {
959   char *buffer;
960   size_t buffer_pos;
961   size_t buffer_size;
962   char *field;
963   size_t field_size;
964   int status;
965
966   buffer = *buffer_ret;
967   buffer_pos = 0;
968   buffer_size = *buffer_size_ret;
969   field = *buffer_ret;
970   field_size = 0;
971
972   if (buffer_size <= 0)
973     return (-1);
974
975   /* This is ensured by `handle_request'. */
976   assert (buffer[buffer_size - 1] == '\0');
977
978   status = -1;
979   while (buffer_pos < buffer_size)
980   {
981     /* Check for end-of-field or end-of-buffer */
982     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
983     {
984       field[field_size] = 0;
985       field_size++;
986       buffer_pos++;
987       status = 0;
988       break;
989     }
990     /* Handle escaped characters. */
991     else if (buffer[buffer_pos] == '\\')
992     {
993       if (buffer_pos >= (buffer_size - 1))
994         break;
995       buffer_pos++;
996       field[field_size] = buffer[buffer_pos];
997       field_size++;
998       buffer_pos++;
999     }
1000     /* Normal operation */ 
1001     else
1002     {
1003       field[field_size] = buffer[buffer_pos];
1004       field_size++;
1005       buffer_pos++;
1006     }
1007   } /* while (buffer_pos < buffer_size) */
1008
1009   if (status != 0)
1010     return (status);
1011
1012   *buffer_ret = buffer + buffer_pos;
1013   *buffer_size_ret = buffer_size - buffer_pos;
1014   *field_ret = field;
1015
1016   return (0);
1017 } /* }}} int buffer_get_field */
1018
1019 /* if we're restricting writes to the base directory,
1020  * check whether the file falls within the dir
1021  * returns 1 if OK, otherwise 0
1022  */
1023 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1024 {
1025   assert(file != NULL);
1026
1027   if (!config_write_base_only
1028       || sock == NULL /* journal replay */
1029       || config_base_dir == NULL)
1030     return 1;
1031
1032   if (strstr(file, "../") != NULL) goto err;
1033
1034   /* relative paths without "../" are ok */
1035   if (*file != '/') return 1;
1036
1037   /* file must be of the format base + "/" + <1+ char filename> */
1038   if (strlen(file) < _config_base_dir_len + 2) goto err;
1039   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1040   if (*(file + _config_base_dir_len) != '/') goto err;
1041
1042   return 1;
1043
1044 err:
1045   if (sock != NULL && sock->fd >= 0)
1046     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1047
1048   return 0;
1049 } /* }}} static int check_file_access */
1050
1051 /* when using a base dir, convert relative paths to absolute paths.
1052  * if necessary, modifies the "filename" pointer to point
1053  * to the new path created in "tmp".  "tmp" is provided
1054  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1055  *
1056  * this allows us to optimize for the expected case (absolute path)
1057  * with a no-op.
1058  */
1059 static void get_abs_path(char **filename, char *tmp)
1060 {
1061   assert(tmp != NULL);
1062   assert(filename != NULL && *filename != NULL);
1063
1064   if (config_base_dir == NULL || **filename == '/')
1065     return;
1066
1067   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1068   *filename = tmp;
1069 } /* }}} static int get_abs_path */
1070
1071 /* returns 1 if we have the required privilege level,
1072  * otherwise issue an error to the user on sock */
1073 static int has_privilege (listen_socket_t *sock, /* {{{ */
1074                           socket_privilege priv)
1075 {
1076   if (sock == NULL) /* journal replay */
1077     return 1;
1078
1079   if (sock->privilege >= priv)
1080     return 1;
1081
1082   return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1083 } /* }}} static int has_privilege */
1084
1085 static int flush_file (const char *filename) /* {{{ */
1086 {
1087   cache_item_t *ci;
1088
1089   pthread_mutex_lock (&cache_lock);
1090
1091   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1092   if (ci == NULL)
1093   {
1094     pthread_mutex_unlock (&cache_lock);
1095     return (ENOENT);
1096   }
1097
1098   if (ci->values_num > 0)
1099   {
1100     /* Enqueue at head */
1101     enqueue_cache_item (ci, HEAD);
1102     pthread_cond_wait(&ci->flushed, &cache_lock);
1103   }
1104
1105   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1106    * may have been purged during our cond_wait() */
1107
1108   pthread_mutex_unlock(&cache_lock);
1109
1110   return (0);
1111 } /* }}} int flush_file */
1112
1113 static int syntax_error(listen_socket_t *sock, struct command *cmd) /* {{{ */
1114 {
1115   char *err = "Syntax error.\n";
1116
1117   if (cmd && cmd->syntax)
1118     err = cmd->syntax;
1119
1120   return send_response(sock, RESP_ERR, "Usage: %s", err);
1121 } /* }}} static int syntax_error() */
1122
1123 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1124 {
1125   uint64_t copy_queue_length;
1126   uint64_t copy_updates_received;
1127   uint64_t copy_flush_received;
1128   uint64_t copy_updates_written;
1129   uint64_t copy_data_sets_written;
1130   uint64_t copy_journal_bytes;
1131   uint64_t copy_journal_rotate;
1132
1133   uint64_t tree_nodes_number;
1134   uint64_t tree_depth;
1135
1136   pthread_mutex_lock (&stats_lock);
1137   copy_queue_length       = stats_queue_length;
1138   copy_updates_received   = stats_updates_received;
1139   copy_flush_received     = stats_flush_received;
1140   copy_updates_written    = stats_updates_written;
1141   copy_data_sets_written  = stats_data_sets_written;
1142   copy_journal_bytes      = stats_journal_bytes;
1143   copy_journal_rotate     = stats_journal_rotate;
1144   pthread_mutex_unlock (&stats_lock);
1145
1146   pthread_mutex_lock (&cache_lock);
1147   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1148   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1149   pthread_mutex_unlock (&cache_lock);
1150
1151   add_response_info(sock,
1152                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1153   add_response_info(sock,
1154                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1155   add_response_info(sock,
1156                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1157   add_response_info(sock,
1158                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1159   add_response_info(sock,
1160                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1161   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1162   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1163   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1164   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1165
1166   send_response(sock, RESP_OK, "Statistics follow\n");
1167
1168   return (0);
1169 } /* }}} int handle_request_stats */
1170
1171 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1172 {
1173   char *file, file_tmp[PATH_MAX];
1174   int status;
1175
1176   status = buffer_get_field (&buffer, &buffer_size, &file);
1177   if (status != 0)
1178   {
1179     return syntax_error(sock,cmd);
1180   }
1181   else
1182   {
1183     pthread_mutex_lock(&stats_lock);
1184     stats_flush_received++;
1185     pthread_mutex_unlock(&stats_lock);
1186
1187     get_abs_path(&file, file_tmp);
1188     if (!check_file_access(file, sock)) return 0;
1189
1190     status = flush_file (file);
1191     if (status == 0)
1192       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1193     else if (status == ENOENT)
1194     {
1195       /* no file in our tree; see whether it exists at all */
1196       struct stat statbuf;
1197
1198       memset(&statbuf, 0, sizeof(statbuf));
1199       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1200         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1201       else
1202         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1203     }
1204     else if (status < 0)
1205       return send_response(sock, RESP_ERR, "Internal error.\n");
1206     else
1207       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1208   }
1209
1210   /* NOTREACHED */
1211   assert(1==0);
1212 } /* }}} int handle_request_flush */
1213
1214 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1215 {
1216   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1217
1218   pthread_mutex_lock(&cache_lock);
1219   flush_old_values(-1);
1220   pthread_mutex_unlock(&cache_lock);
1221
1222   return send_response(sock, RESP_OK, "Started flush.\n");
1223 } /* }}} static int handle_request_flushall */
1224
1225 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1226 {
1227   int status;
1228   char *file, file_tmp[PATH_MAX];
1229   cache_item_t *ci;
1230
1231   status = buffer_get_field(&buffer, &buffer_size, &file);
1232   if (status != 0)
1233     return syntax_error(sock,cmd);
1234
1235   get_abs_path(&file, file_tmp);
1236
1237   pthread_mutex_lock(&cache_lock);
1238   ci = g_tree_lookup(cache_tree, file);
1239   if (ci == NULL)
1240   {
1241     pthread_mutex_unlock(&cache_lock);
1242     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1243   }
1244
1245   for (size_t i=0; i < ci->values_num; i++)
1246     add_response_info(sock, "%s\n", ci->values[i]);
1247
1248   pthread_mutex_unlock(&cache_lock);
1249   return send_response(sock, RESP_OK, "updates pending\n");
1250 } /* }}} static int handle_request_pending */
1251
1252 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1253 {
1254   int status;
1255   gboolean found;
1256   char *file, file_tmp[PATH_MAX];
1257
1258   status = buffer_get_field(&buffer, &buffer_size, &file);
1259   if (status != 0)
1260     return syntax_error(sock,cmd);
1261
1262   get_abs_path(&file, file_tmp);
1263   if (!check_file_access(file, sock)) return 0;
1264
1265   pthread_mutex_lock(&cache_lock);
1266   found = g_tree_remove(cache_tree, file);
1267   pthread_mutex_unlock(&cache_lock);
1268
1269   if (found == TRUE)
1270   {
1271     if (sock != NULL)
1272       journal_write("forget", file);
1273
1274     return send_response(sock, RESP_OK, "Gone!\n");
1275   }
1276   else
1277     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1278
1279   /* NOTREACHED */
1280   assert(1==0);
1281 } /* }}} static int handle_request_forget */
1282
1283 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1284 {
1285   cache_item_t *ci;
1286
1287   pthread_mutex_lock(&cache_lock);
1288
1289   ci = cache_queue_head;
1290   while (ci != NULL)
1291   {
1292     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1293     ci = ci->next;
1294   }
1295
1296   pthread_mutex_unlock(&cache_lock);
1297
1298   return send_response(sock, RESP_OK, "in queue.\n");
1299 } /* }}} int handle_request_queue */
1300
1301 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1302 {
1303   char *file, file_tmp[PATH_MAX];
1304   int values_num = 0;
1305   int status;
1306   char orig_buf[CMD_MAX];
1307
1308   cache_item_t *ci;
1309
1310   /* save it for the journal later */
1311   strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1312
1313   status = buffer_get_field (&buffer, &buffer_size, &file);
1314   if (status != 0)
1315     return syntax_error(sock,cmd);
1316
1317   pthread_mutex_lock(&stats_lock);
1318   stats_updates_received++;
1319   pthread_mutex_unlock(&stats_lock);
1320
1321   get_abs_path(&file, file_tmp);
1322   if (!check_file_access(file, sock)) return 0;
1323
1324   pthread_mutex_lock (&cache_lock);
1325   ci = g_tree_lookup (cache_tree, file);
1326
1327   if (ci == NULL) /* {{{ */
1328   {
1329     struct stat statbuf;
1330     cache_item_t *tmp;
1331
1332     /* don't hold the lock while we setup; stat(2) might block */
1333     pthread_mutex_unlock(&cache_lock);
1334
1335     memset (&statbuf, 0, sizeof (statbuf));
1336     status = stat (file, &statbuf);
1337     if (status != 0)
1338     {
1339       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1340
1341       status = errno;
1342       if (status == ENOENT)
1343         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1344       else
1345         return send_response(sock, RESP_ERR,
1346                              "stat failed with error %i.\n", status);
1347     }
1348     if (!S_ISREG (statbuf.st_mode))
1349       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1350
1351     if (access(file, R_OK|W_OK) != 0)
1352       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1353                            file, rrd_strerror(errno));
1354
1355     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1356     if (ci == NULL)
1357     {
1358       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1359
1360       return send_response(sock, RESP_ERR, "malloc failed.\n");
1361     }
1362     memset (ci, 0, sizeof (cache_item_t));
1363
1364     ci->file = strdup (file);
1365     if (ci->file == NULL)
1366     {
1367       free (ci);
1368       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1369
1370       return send_response(sock, RESP_ERR, "strdup failed.\n");
1371     }
1372
1373     wipe_ci_values(ci, now);
1374     ci->flags = CI_FLAGS_IN_TREE;
1375     pthread_cond_init(&ci->flushed, NULL);
1376
1377     pthread_mutex_lock(&cache_lock);
1378
1379     /* another UPDATE might have added this entry in the meantime */
1380     tmp = g_tree_lookup (cache_tree, file);
1381     if (tmp == NULL)
1382       g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1383     else
1384     {
1385       free_cache_item (ci);
1386       ci = tmp;
1387     }
1388
1389     /* state may have changed while we were unlocked */
1390     if (state == SHUTDOWN)
1391       return -1;
1392   } /* }}} */
1393   assert (ci != NULL);
1394
1395   /* don't re-write updates in replay mode */
1396   if (sock != NULL)
1397     journal_write("update", orig_buf);
1398
1399   while (buffer_size > 0)
1400   {
1401     char *value;
1402     time_t stamp;
1403     char *eostamp;
1404
1405     status = buffer_get_field (&buffer, &buffer_size, &value);
1406     if (status != 0)
1407     {
1408       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1409       break;
1410     }
1411
1412     /* make sure update time is always moving forward */
1413     stamp = strtol(value, &eostamp, 10);
1414     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1415     {
1416       pthread_mutex_unlock(&cache_lock);
1417       return send_response(sock, RESP_ERR,
1418                            "Cannot find timestamp in '%s'!\n", value);
1419     }
1420     else if (stamp <= ci->last_update_stamp)
1421     {
1422       pthread_mutex_unlock(&cache_lock);
1423       return send_response(sock, RESP_ERR,
1424                            "illegal attempt to update using time %ld when last"
1425                            " update time is %ld (minimum one second step)\n",
1426                            stamp, ci->last_update_stamp);
1427     }
1428     else
1429       ci->last_update_stamp = stamp;
1430
1431     if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
1432     {
1433       RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1434       continue;
1435     }
1436
1437     values_num++;
1438   }
1439
1440   if (((now - ci->last_flush_time) >= config_write_interval)
1441       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1442       && (ci->values_num > 0))
1443   {
1444     enqueue_cache_item (ci, TAIL);
1445   }
1446
1447   pthread_mutex_unlock (&cache_lock);
1448
1449   if (values_num < 1)
1450     return send_response(sock, RESP_ERR, "No values updated.\n");
1451   else
1452     return send_response(sock, RESP_OK,
1453                          "errors, enqueued %i value(s).\n", values_num);
1454
1455   /* NOTREACHED */
1456   assert(1==0);
1457
1458 } /* }}} int handle_request_update */
1459
1460 /* we came across a "WROTE" entry during journal replay.
1461  * throw away any values that we have accumulated for this file
1462  */
1463 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1464 {
1465   cache_item_t *ci;
1466   const char *file = buffer;
1467
1468   pthread_mutex_lock(&cache_lock);
1469
1470   ci = g_tree_lookup(cache_tree, file);
1471   if (ci == NULL)
1472   {
1473     pthread_mutex_unlock(&cache_lock);
1474     return (0);
1475   }
1476
1477   if (ci->values)
1478     rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1479
1480   wipe_ci_values(ci, now);
1481   remove_from_queue(ci);
1482
1483   pthread_mutex_unlock(&cache_lock);
1484   return (0);
1485 } /* }}} int handle_request_wrote */
1486
1487 /* start "BATCH" processing */
1488 static int batch_start (HANDLER_PROTO) /* {{{ */
1489 {
1490   int status;
1491   if (sock->batch_start)
1492     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1493
1494   status = send_response(sock, RESP_OK,
1495                          "Go ahead.  End with dot '.' on its own line.\n");
1496   sock->batch_start = time(NULL);
1497   sock->batch_cmd = 0;
1498
1499   return status;
1500 } /* }}} static int batch_start */
1501
1502 /* finish "BATCH" processing and return results to the client */
1503 static int batch_done (HANDLER_PROTO) /* {{{ */
1504 {
1505   assert(sock->batch_start);
1506   sock->batch_start = 0;
1507   sock->batch_cmd  = 0;
1508   return send_response(sock, RESP_OK, "errors\n");
1509 } /* }}} static int batch_done */
1510
1511 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1512 {
1513   return -1;
1514 } /* }}} static int handle_request_quit */
1515
1516 struct command COMMANDS[] = {
1517   {
1518     "UPDATE",
1519     handle_request_update,
1520     PRIV_HIGH,
1521     CMD_CONTEXT_ANY,
1522     "UPDATE <filename> <values> [<values> ...]\n"
1523     ,
1524     "Adds the given file to the internal cache if it is not yet known and\n"
1525     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1526     "for details.\n"
1527     "\n"
1528     "Each <values> has the following form:\n"
1529     "  <values> = <time>:<value>[:<value>[...]]\n"
1530     "See the rrdupdate(1) manpage for details.\n"
1531   },
1532   {
1533     "WROTE",
1534     handle_request_wrote,
1535     PRIV_HIGH,
1536     CMD_CONTEXT_JOURNAL,
1537     NULL,
1538     NULL
1539   },
1540   {
1541     "FLUSH",
1542     handle_request_flush,
1543     PRIV_LOW,
1544     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1545     "FLUSH <filename>\n"
1546     ,
1547     "Adds the given filename to the head of the update queue and returns\n"
1548     "after it has been dequeued.\n"
1549   },
1550   {
1551     "FLUSHALL",
1552     handle_request_flushall,
1553     PRIV_HIGH,
1554     CMD_CONTEXT_CLIENT,
1555     "FLUSHALL\n"
1556     ,
1557     "Triggers writing of all pending updates.  Returns immediately.\n"
1558   },
1559   {
1560     "PENDING",
1561     handle_request_pending,
1562     PRIV_HIGH,
1563     CMD_CONTEXT_CLIENT,
1564     "PENDING <filename>\n"
1565     ,
1566     "Shows any 'pending' updates for a file, in order.\n"
1567     "The updates shown have not yet been written to the underlying RRD file.\n"
1568   },
1569   {
1570     "FORGET",
1571     handle_request_forget,
1572     PRIV_HIGH,
1573     CMD_CONTEXT_ANY,
1574     "FORGET <filename>\n"
1575     ,
1576     "Removes the file completely from the cache.\n"
1577     "Any pending updates for the file will be lost.\n"
1578   },
1579   {
1580     "QUEUE",
1581     handle_request_queue,
1582     PRIV_LOW,
1583     CMD_CONTEXT_CLIENT,
1584     "QUEUE\n"
1585     ,
1586         "Shows all files in the output queue.\n"
1587     "The output is zero or more lines in the following format:\n"
1588     "(where <num_vals> is the number of values to be written)\n"
1589     "\n"
1590     "<num_vals> <filename>\n"
1591   },
1592   {
1593     "STATS",
1594     handle_request_stats,
1595     PRIV_LOW,
1596     CMD_CONTEXT_CLIENT,
1597     "STATS\n"
1598     ,
1599     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1600     "a description of the values.\n"
1601   },
1602   {
1603     "HELP",
1604     handle_request_help,
1605     PRIV_LOW,
1606     CMD_CONTEXT_CLIENT,
1607     "HELP [<command>]\n",
1608     NULL, /* special! */
1609   },
1610   {
1611     "BATCH",
1612     batch_start,
1613     PRIV_LOW,
1614     CMD_CONTEXT_CLIENT,
1615     "BATCH\n"
1616     ,
1617     "The 'BATCH' command permits the client to initiate a bulk load\n"
1618     "   of commands to rrdcached.\n"
1619     "\n"
1620     "Usage:\n"
1621     "\n"
1622     "    client: BATCH\n"
1623     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1624     "    client: command #1\n"
1625     "    client: command #2\n"
1626     "    client: ... and so on\n"
1627     "    client: .\n"
1628     "    server: 2 errors\n"
1629     "    server: 7 message for command #7\n"
1630     "    server: 9 message for command #9\n"
1631     "\n"
1632     "For more information, consult the rrdcached(1) documentation.\n"
1633   },
1634   {
1635     ".",   /* BATCH terminator */
1636     batch_done,
1637     PRIV_LOW,
1638     CMD_CONTEXT_BATCH,
1639     NULL,
1640     NULL
1641   },
1642   {
1643     "QUIT",
1644     handle_request_quit,
1645     PRIV_LOW,
1646     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1647     "QUIT\n"
1648     ,
1649     "Disconnect from rrdcached.\n"
1650   },
1651   {NULL,NULL,0,0,NULL,NULL}  /* LAST ENTRY */
1652 };
1653
1654 static struct command *find_command(char *cmd)
1655 {
1656   struct command *c = COMMANDS;
1657
1658   while (c->cmd != NULL)
1659   {
1660     if (strcasecmp(cmd, c->cmd) == 0)
1661       break;
1662     c++;
1663   }
1664
1665   if (c->cmd == NULL)
1666     return NULL;
1667   else
1668     return c;
1669 }
1670
1671 /* check whether commands are received in the expected context */
1672 static int command_check_context(listen_socket_t *sock, struct command *cmd)
1673 {
1674   if (sock == NULL)
1675     return (cmd->context & CMD_CONTEXT_JOURNAL);
1676   else if (sock->batch_start)
1677     return (cmd->context & CMD_CONTEXT_BATCH);
1678   else
1679     return (cmd->context & CMD_CONTEXT_CLIENT);
1680
1681   /* NOTREACHED */
1682   assert(1==0);
1683 }
1684
1685 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1686 {
1687   int status;
1688   char *cmd_str;
1689   char *resp_txt;
1690   struct command *help = NULL;
1691
1692   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1693   if (status == 0)
1694     help = find_command(cmd_str);
1695
1696   if (help && (help->syntax || help->help))
1697   {
1698     char tmp[CMD_MAX];
1699
1700     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1701     resp_txt = tmp;
1702
1703     if (help->syntax)
1704       add_response_info(sock, "Usage: %s\n", help->syntax);
1705
1706     if (help->help)
1707       add_response_info(sock, "%s\n", help->help);
1708   }
1709   else
1710   {
1711     help = COMMANDS;
1712     resp_txt = "Command overview\n";
1713
1714     while (help->cmd)
1715     {
1716       if (help->syntax)
1717         add_response_info(sock, "%s", help->syntax);
1718       help++;
1719     }
1720   }
1721
1722   return send_response(sock, RESP_OK, resp_txt);
1723 } /* }}} int handle_request_help */
1724
1725 /* if sock==NULL, we are in journal replay mode */
1726 static int handle_request (DISPATCH_PROTO) /* {{{ */
1727 {
1728   char *buffer_ptr = buffer;
1729   char *cmd_str = NULL;
1730   struct command *cmd = NULL;
1731   int status;
1732
1733   assert (buffer[buffer_size - 1] == '\0');
1734
1735   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1736   if (status != 0)
1737   {
1738     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1739     return (-1);
1740   }
1741
1742   if (sock != NULL && sock->batch_start)
1743     sock->batch_cmd++;
1744
1745   cmd = find_command(cmd_str);
1746   if (!cmd)
1747     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1748
1749   status = has_privilege(sock, cmd->min_priv);
1750   if (status <= 0)
1751     return status;
1752
1753   if (!command_check_context(sock, cmd))
1754     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1755
1756   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1757 } /* }}} int handle_request */
1758
1759 static void journal_set_free (journal_set *js) /* {{{ */
1760 {
1761   if (js == NULL)
1762     return;
1763
1764   rrd_free_ptrs((void ***) &js->files, &js->files_num);
1765
1766   free(js);
1767 } /* }}} journal_set_free */
1768
1769 static void journal_set_remove (journal_set *js) /* {{{ */
1770 {
1771   if (js == NULL)
1772     return;
1773
1774   for (uint i=0; i < js->files_num; i++)
1775   {
1776     RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
1777     unlink(js->files[i]);
1778   }
1779 } /* }}} journal_set_remove */
1780
1781 /* close current journal file handle.
1782  * MUST hold journal_lock before calling */
1783 static void journal_close(void) /* {{{ */
1784 {
1785   if (journal_fh != NULL)
1786   {
1787     if (fclose(journal_fh) != 0)
1788       RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
1789   }
1790
1791   journal_fh = NULL;
1792   journal_size = 0;
1793 } /* }}} journal_close */
1794
1795 /* MUST hold journal_lock before calling */
1796 static void journal_new_file(void) /* {{{ */
1797 {
1798   struct timeval now;
1799   int  new_fd;
1800   char new_file[PATH_MAX + 1];
1801
1802   assert(journal_dir != NULL);
1803   assert(journal_cur != NULL);
1804
1805   journal_close();
1806
1807   gettimeofday(&now, NULL);
1808   /* this format assures that the files sort in strcmp() order */
1809   snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
1810            journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
1811
1812   new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
1813                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1814   if (new_fd < 0)
1815     goto error;
1816
1817   journal_fh = fdopen(new_fd, "a");
1818   if (journal_fh == NULL)
1819     goto error;
1820
1821   journal_size = ftell(journal_fh);
1822   RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
1823
1824   /* record the file in the journal set */
1825   rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
1826
1827   return;
1828
1829 error:
1830   RRDD_LOG(LOG_CRIT,
1831            "JOURNALING DISABLED: Error while trying to create %s : %s",
1832            new_file, rrd_strerror(errno));
1833   RRDD_LOG(LOG_CRIT,
1834            "JOURNALING DISABLED: All values will be flushed at shutdown");
1835
1836   close(new_fd);
1837   config_flush_at_shutdown = 1;
1838
1839 } /* }}} journal_new_file */
1840
1841 /* MUST NOT hold journal_lock before calling this */
1842 static void journal_rotate(void) /* {{{ */
1843 {
1844   journal_set *old_js = NULL;
1845
1846   if (journal_dir == NULL)
1847     return;
1848
1849   RRDD_LOG(LOG_DEBUG, "rotating journals");
1850
1851   pthread_mutex_lock(&stats_lock);
1852   ++stats_journal_rotate;
1853   pthread_mutex_unlock(&stats_lock);
1854
1855   pthread_mutex_lock(&journal_lock);
1856
1857   journal_close();
1858
1859   /* rotate the journal sets */
1860   old_js = journal_old;
1861   journal_old = journal_cur;
1862   journal_cur = calloc(1, sizeof(journal_set));
1863
1864   if (journal_cur != NULL)
1865     journal_new_file();
1866   else
1867     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
1868
1869   pthread_mutex_unlock(&journal_lock);
1870
1871   journal_set_remove(old_js);
1872   journal_set_free  (old_js);
1873
1874 } /* }}} static void journal_rotate */
1875
1876 /* MUST hold journal_lock when calling */
1877 static void journal_done(void) /* {{{ */
1878 {
1879   if (journal_cur == NULL)
1880     return;
1881
1882   journal_close();
1883
1884   if (config_flush_at_shutdown)
1885   {
1886     RRDD_LOG(LOG_INFO, "removing journals");
1887     journal_set_remove(journal_old);
1888     journal_set_remove(journal_cur);
1889   }
1890   else
1891   {
1892     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1893              "journals will be used at next startup");
1894   }
1895
1896   journal_set_free(journal_cur);
1897   journal_set_free(journal_old);
1898   free(journal_dir);
1899
1900 } /* }}} static void journal_done */
1901
1902 static int journal_write(char *cmd, char *args) /* {{{ */
1903 {
1904   int chars;
1905
1906   if (journal_fh == NULL)
1907     return 0;
1908
1909   pthread_mutex_lock(&journal_lock);
1910   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1911   journal_size += chars;
1912
1913   if (journal_size > JOURNAL_MAX)
1914     journal_new_file();
1915
1916   pthread_mutex_unlock(&journal_lock);
1917
1918   if (chars > 0)
1919   {
1920     pthread_mutex_lock(&stats_lock);
1921     stats_journal_bytes += chars;
1922     pthread_mutex_unlock(&stats_lock);
1923   }
1924
1925   return chars;
1926 } /* }}} static int journal_write */
1927
1928 static int journal_replay (const char *file) /* {{{ */
1929 {
1930   FILE *fh;
1931   int entry_cnt = 0;
1932   int fail_cnt = 0;
1933   uint64_t line = 0;
1934   char entry[CMD_MAX];
1935   time_t now;
1936
1937   if (file == NULL) return 0;
1938
1939   {
1940     char *reason = "unknown error";
1941     int status = 0;
1942     struct stat statbuf;
1943
1944     memset(&statbuf, 0, sizeof(statbuf));
1945     if (stat(file, &statbuf) != 0)
1946     {
1947       reason = "stat error";
1948       status = errno;
1949     }
1950     else if (!S_ISREG(statbuf.st_mode))
1951     {
1952       reason = "not a regular file";
1953       status = EPERM;
1954     }
1955     if (statbuf.st_uid != daemon_uid)
1956     {
1957       reason = "not owned by daemon user";
1958       status = EACCES;
1959     }
1960     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1961     {
1962       reason = "must not be user/group writable";
1963       status = EACCES;
1964     }
1965
1966     if (status != 0)
1967     {
1968       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1969                file, rrd_strerror(status), reason);
1970       return 0;
1971     }
1972   }
1973
1974   fh = fopen(file, "r");
1975   if (fh == NULL)
1976   {
1977     if (errno != ENOENT)
1978       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1979                file, rrd_strerror(errno));
1980     return 0;
1981   }
1982   else
1983     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1984
1985   now = time(NULL);
1986
1987   while(!feof(fh))
1988   {
1989     size_t entry_len;
1990
1991     ++line;
1992     if (fgets(entry, sizeof(entry), fh) == NULL)
1993       break;
1994     entry_len = strlen(entry);
1995
1996     /* check \n termination in case journal writing crashed mid-line */
1997     if (entry_len == 0)
1998       continue;
1999     else if (entry[entry_len - 1] != '\n')
2000     {
2001       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2002       ++fail_cnt;
2003       continue;
2004     }
2005
2006     entry[entry_len - 1] = '\0';
2007
2008     if (handle_request(NULL, now, entry, entry_len) == 0)
2009       ++entry_cnt;
2010     else
2011       ++fail_cnt;
2012   }
2013
2014   fclose(fh);
2015
2016   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2017            entry_cnt, fail_cnt);
2018
2019   return entry_cnt > 0 ? 1 : 0;
2020 } /* }}} static int journal_replay */
2021
2022 static int journal_sort(const void *v1, const void *v2)
2023 {
2024   char **jn1 = (char **) v1;
2025   char **jn2 = (char **) v2;
2026
2027   return strcmp(*jn1,*jn2);
2028 }
2029
2030 static void journal_init(void) /* {{{ */
2031 {
2032   int had_journal = 0;
2033   DIR *dir;
2034   struct dirent *dent;
2035   char path[PATH_MAX+1];
2036
2037   if (journal_dir == NULL) return;
2038
2039   pthread_mutex_lock(&journal_lock);
2040
2041   journal_cur = calloc(1, sizeof(journal_set));
2042   if (journal_cur == NULL)
2043   {
2044     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2045     return;
2046   }
2047
2048   RRDD_LOG(LOG_INFO, "checking for journal files");
2049
2050   /* Handle old journal files during transition.  This gives them the
2051    * correct sort order.  TODO: remove after first release
2052    */
2053   {
2054     char old_path[PATH_MAX+1];
2055     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2056     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2057     rename(old_path, path);
2058
2059     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE        );
2060     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2061     rename(old_path, path);
2062   }
2063
2064   dir = opendir(journal_dir);
2065   while ((dent = readdir(dir)) != NULL)
2066   {
2067     /* looks like a journal file? */
2068     if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2069       continue;
2070
2071     snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2072
2073     if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2074     {
2075       RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2076                dent->d_name);
2077       break;
2078     }
2079   }
2080   closedir(dir);
2081
2082   qsort(journal_cur->files, journal_cur->files_num,
2083         sizeof(journal_cur->files[0]), journal_sort);
2084
2085   for (uint i=0; i < journal_cur->files_num; i++)
2086     had_journal += journal_replay(journal_cur->files[i]);
2087
2088   journal_new_file();
2089
2090   /* it must have been a crash.  start a flush */
2091   if (had_journal && config_flush_at_shutdown)
2092     flush_old_values(-1);
2093
2094   pthread_mutex_unlock(&journal_lock);
2095
2096   RRDD_LOG(LOG_INFO, "journal processing complete");
2097
2098 } /* }}} static void journal_init */
2099
2100 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2101 {
2102   assert(sock != NULL);
2103
2104   free(sock->rbuf);  sock->rbuf = NULL;
2105   free(sock->wbuf);  sock->wbuf = NULL;
2106   free(sock);
2107 } /* }}} void free_listen_socket */
2108
2109 static void close_connection(listen_socket_t *sock) /* {{{ */
2110 {
2111   if (sock->fd >= 0)
2112   {
2113     close(sock->fd);
2114     sock->fd = -1;
2115   }
2116
2117   free_listen_socket(sock);
2118
2119 } /* }}} void close_connection */
2120
2121 static void *connection_thread_main (void *args) /* {{{ */
2122 {
2123   listen_socket_t *sock;
2124   int fd;
2125
2126   sock = (listen_socket_t *) args;
2127   fd = sock->fd;
2128
2129   /* init read buffers */
2130   sock->next_read = sock->next_cmd = 0;
2131   sock->rbuf = malloc(RBUF_SIZE);
2132   if (sock->rbuf == NULL)
2133   {
2134     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2135     close_connection(sock);
2136     return NULL;
2137   }
2138
2139   pthread_mutex_lock (&connection_threads_lock);
2140   connection_threads_num++;
2141   pthread_mutex_unlock (&connection_threads_lock);
2142
2143   while (state == RUNNING)
2144   {
2145     char *cmd;
2146     ssize_t cmd_len;
2147     ssize_t rbytes;
2148     time_t now;
2149
2150     struct pollfd pollfd;
2151     int status;
2152
2153     pollfd.fd = fd;
2154     pollfd.events = POLLIN | POLLPRI;
2155     pollfd.revents = 0;
2156
2157     status = poll (&pollfd, 1, /* timeout = */ 500);
2158     if (state != RUNNING)
2159       break;
2160     else if (status == 0) /* timeout */
2161       continue;
2162     else if (status < 0) /* error */
2163     {
2164       status = errno;
2165       if (status != EINTR)
2166         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2167       continue;
2168     }
2169
2170     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2171       break;
2172     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2173     {
2174       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2175           "poll(2) returned something unexpected: %#04hx",
2176           pollfd.revents);
2177       break;
2178     }
2179
2180     rbytes = read(fd, sock->rbuf + sock->next_read,
2181                   RBUF_SIZE - sock->next_read);
2182     if (rbytes < 0)
2183     {
2184       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2185       break;
2186     }
2187     else if (rbytes == 0)
2188       break; /* eof */
2189
2190     sock->next_read += rbytes;
2191
2192     if (sock->batch_start)
2193       now = sock->batch_start;
2194     else
2195       now = time(NULL);
2196
2197     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2198     {
2199       status = handle_request (sock, now, cmd, cmd_len+1);
2200       if (status != 0)
2201         goto out_close;
2202     }
2203   }
2204
2205 out_close:
2206   close_connection(sock);
2207
2208   /* Remove this thread from the connection threads list */
2209   pthread_mutex_lock (&connection_threads_lock);
2210   connection_threads_num--;
2211   if (connection_threads_num <= 0)
2212     pthread_cond_broadcast(&connection_threads_done);
2213   pthread_mutex_unlock (&connection_threads_lock);
2214
2215   return (NULL);
2216 } /* }}} void *connection_thread_main */
2217
2218 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2219 {
2220   int fd;
2221   struct sockaddr_un sa;
2222   listen_socket_t *temp;
2223   int status;
2224   const char *path;
2225
2226   path = sock->addr;
2227   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2228     path += strlen("unix:");
2229
2230   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2231       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2232   if (temp == NULL)
2233   {
2234     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2235     return (-1);
2236   }
2237   listen_fds = temp;
2238   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2239
2240   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2241   if (fd < 0)
2242   {
2243     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2244              rrd_strerror(errno));
2245     return (-1);
2246   }
2247
2248   memset (&sa, 0, sizeof (sa));
2249   sa.sun_family = AF_UNIX;
2250   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2251
2252   /* if we've gotten this far, we own the pid file.  any daemon started
2253    * with the same args must not be alive.  therefore, ensure that we can
2254    * create the socket...
2255    */
2256   unlink(path);
2257
2258   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2259   if (status != 0)
2260   {
2261     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2262              path, rrd_strerror(errno));
2263     close (fd);
2264     return (-1);
2265   }
2266
2267   status = listen (fd, /* backlog = */ 10);
2268   if (status != 0)
2269   {
2270     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2271              path, rrd_strerror(errno));
2272     close (fd);
2273     unlink (path);
2274     return (-1);
2275   }
2276
2277   listen_fds[listen_fds_num].fd = fd;
2278   listen_fds[listen_fds_num].family = PF_UNIX;
2279   strncpy(listen_fds[listen_fds_num].addr, path,
2280           sizeof (listen_fds[listen_fds_num].addr) - 1);
2281   listen_fds_num++;
2282
2283   return (0);
2284 } /* }}} int open_listen_socket_unix */
2285
2286 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2287 {
2288   struct addrinfo ai_hints;
2289   struct addrinfo *ai_res;
2290   struct addrinfo *ai_ptr;
2291   char addr_copy[NI_MAXHOST];
2292   char *addr;
2293   char *port;
2294   int status;
2295
2296   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2297   addr_copy[sizeof (addr_copy) - 1] = 0;
2298   addr = addr_copy;
2299
2300   memset (&ai_hints, 0, sizeof (ai_hints));
2301   ai_hints.ai_flags = 0;
2302 #ifdef AI_ADDRCONFIG
2303   ai_hints.ai_flags |= AI_ADDRCONFIG;
2304 #endif
2305   ai_hints.ai_family = AF_UNSPEC;
2306   ai_hints.ai_socktype = SOCK_STREAM;
2307
2308   port = NULL;
2309   if (*addr == '[') /* IPv6+port format */
2310   {
2311     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2312     addr++;
2313
2314     port = strchr (addr, ']');
2315     if (port == NULL)
2316     {
2317       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2318       return (-1);
2319     }
2320     *port = 0;
2321     port++;
2322
2323     if (*port == ':')
2324       port++;
2325     else if (*port == 0)
2326       port = NULL;
2327     else
2328     {
2329       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2330       return (-1);
2331     }
2332   } /* if (*addr = ']') */
2333   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2334   {
2335     port = rindex(addr, ':');
2336     if (port != NULL)
2337     {
2338       *port = 0;
2339       port++;
2340     }
2341   }
2342   ai_res = NULL;
2343   status = getaddrinfo (addr,
2344                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2345                         &ai_hints, &ai_res);
2346   if (status != 0)
2347   {
2348     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2349              addr, gai_strerror (status));
2350     return (-1);
2351   }
2352
2353   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2354   {
2355     int fd;
2356     listen_socket_t *temp;
2357     int one = 1;
2358
2359     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2360         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2361     if (temp == NULL)
2362     {
2363       fprintf (stderr,
2364                "rrdcached: open_listen_socket_network: realloc failed.\n");
2365       continue;
2366     }
2367     listen_fds = temp;
2368     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2369
2370     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2371     if (fd < 0)
2372     {
2373       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2374                rrd_strerror(errno));
2375       continue;
2376     }
2377
2378     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2379
2380     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2381     if (status != 0)
2382     {
2383       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2384                sock->addr, rrd_strerror(errno));
2385       close (fd);
2386       continue;
2387     }
2388
2389     status = listen (fd, /* backlog = */ 10);
2390     if (status != 0)
2391     {
2392       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2393                sock->addr, rrd_strerror(errno));
2394       close (fd);
2395       freeaddrinfo(ai_res);
2396       return (-1);
2397     }
2398
2399     listen_fds[listen_fds_num].fd = fd;
2400     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2401     listen_fds_num++;
2402   } /* for (ai_ptr) */
2403
2404   freeaddrinfo(ai_res);
2405   return (0);
2406 } /* }}} static int open_listen_socket_network */
2407
2408 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2409 {
2410   assert(sock != NULL);
2411   assert(sock->addr != NULL);
2412
2413   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2414       || sock->addr[0] == '/')
2415     return (open_listen_socket_unix(sock));
2416   else
2417     return (open_listen_socket_network(sock));
2418 } /* }}} int open_listen_socket */
2419
2420 static int close_listen_sockets (void) /* {{{ */
2421 {
2422   size_t i;
2423
2424   for (i = 0; i < listen_fds_num; i++)
2425   {
2426     close (listen_fds[i].fd);
2427
2428     if (listen_fds[i].family == PF_UNIX)
2429       unlink(listen_fds[i].addr);
2430   }
2431
2432   free (listen_fds);
2433   listen_fds = NULL;
2434   listen_fds_num = 0;
2435
2436   return (0);
2437 } /* }}} int close_listen_sockets */
2438
2439 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2440 {
2441   struct pollfd *pollfds;
2442   int pollfds_num;
2443   int status;
2444   int i;
2445
2446   if (listen_fds_num < 1)
2447   {
2448     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2449     return (NULL);
2450   }
2451
2452   pollfds_num = listen_fds_num;
2453   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2454   if (pollfds == NULL)
2455   {
2456     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2457     return (NULL);
2458   }
2459   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2460
2461   RRDD_LOG(LOG_INFO, "listening for connections");
2462
2463   while (state == RUNNING)
2464   {
2465     for (i = 0; i < pollfds_num; i++)
2466     {
2467       pollfds[i].fd = listen_fds[i].fd;
2468       pollfds[i].events = POLLIN | POLLPRI;
2469       pollfds[i].revents = 0;
2470     }
2471
2472     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2473     if (state != RUNNING)
2474       break;
2475     else if (status == 0) /* timeout */
2476       continue;
2477     else if (status < 0) /* error */
2478     {
2479       status = errno;
2480       if (status != EINTR)
2481       {
2482         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2483       }
2484       continue;
2485     }
2486
2487     for (i = 0; i < pollfds_num; i++)
2488     {
2489       listen_socket_t *client_sock;
2490       struct sockaddr_storage client_sa;
2491       socklen_t client_sa_size;
2492       pthread_t tid;
2493       pthread_attr_t attr;
2494
2495       if (pollfds[i].revents == 0)
2496         continue;
2497
2498       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2499       {
2500         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2501             "poll(2) returned something unexpected for listen FD #%i.",
2502             pollfds[i].fd);
2503         continue;
2504       }
2505
2506       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2507       if (client_sock == NULL)
2508       {
2509         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2510         continue;
2511       }
2512       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2513
2514       client_sa_size = sizeof (client_sa);
2515       client_sock->fd = accept (pollfds[i].fd,
2516           (struct sockaddr *) &client_sa, &client_sa_size);
2517       if (client_sock->fd < 0)
2518       {
2519         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2520         free(client_sock);
2521         continue;
2522       }
2523
2524       pthread_attr_init (&attr);
2525       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2526
2527       status = pthread_create (&tid, &attr, connection_thread_main,
2528                                client_sock);
2529       if (status != 0)
2530       {
2531         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2532         close_connection(client_sock);
2533         continue;
2534       }
2535     } /* for (pollfds_num) */
2536   } /* while (state == RUNNING) */
2537
2538   RRDD_LOG(LOG_INFO, "starting shutdown");
2539
2540   close_listen_sockets ();
2541
2542   pthread_mutex_lock (&connection_threads_lock);
2543   while (connection_threads_num > 0)
2544     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2545   pthread_mutex_unlock (&connection_threads_lock);
2546
2547   free(pollfds);
2548
2549   return (NULL);
2550 } /* }}} void *listen_thread_main */
2551
2552 static int daemonize (void) /* {{{ */
2553 {
2554   int pid_fd;
2555   char *base_dir;
2556
2557   daemon_uid = geteuid();
2558
2559   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2560   if (pid_fd < 0)
2561     pid_fd = check_pidfile();
2562   if (pid_fd < 0)
2563     return pid_fd;
2564
2565   /* open all the listen sockets */
2566   if (config_listen_address_list_len > 0)
2567   {
2568     for (size_t i = 0; i < config_listen_address_list_len; i++)
2569       open_listen_socket (config_listen_address_list[i]);
2570
2571     rrd_free_ptrs((void ***) &config_listen_address_list,
2572                   &config_listen_address_list_len);
2573   }
2574   else
2575   {
2576     listen_socket_t sock;
2577     memset(&sock, 0, sizeof(sock));
2578     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2579     open_listen_socket (&sock);
2580   }
2581
2582   if (listen_fds_num < 1)
2583   {
2584     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2585     goto error;
2586   }
2587
2588   if (!stay_foreground)
2589   {
2590     pid_t child;
2591
2592     child = fork ();
2593     if (child < 0)
2594     {
2595       fprintf (stderr, "daemonize: fork(2) failed.\n");
2596       goto error;
2597     }
2598     else if (child > 0)
2599       exit(0);
2600
2601     /* Become session leader */
2602     setsid ();
2603
2604     /* Open the first three file descriptors to /dev/null */
2605     close (2);
2606     close (1);
2607     close (0);
2608
2609     open ("/dev/null", O_RDWR);
2610     if (dup(0) == -1 || dup(0) == -1){
2611         RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2612     }
2613   } /* if (!stay_foreground) */
2614
2615   /* Change into the /tmp directory. */
2616   base_dir = (config_base_dir != NULL)
2617     ? config_base_dir
2618     : "/tmp";
2619
2620   if (chdir (base_dir) != 0)
2621   {
2622     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2623     goto error;
2624   }
2625
2626   install_signal_handlers();
2627
2628   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2629   RRDD_LOG(LOG_INFO, "starting up");
2630
2631   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2632                                 (GDestroyNotify) free_cache_item);
2633   if (cache_tree == NULL)
2634   {
2635     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2636     goto error;
2637   }
2638
2639   return write_pidfile (pid_fd);
2640
2641 error:
2642   remove_pidfile();
2643   return -1;
2644 } /* }}} int daemonize */
2645
2646 static int cleanup (void) /* {{{ */
2647 {
2648   pthread_cond_broadcast (&flush_cond);
2649   pthread_join (flush_thread, NULL);
2650
2651   pthread_cond_broadcast (&queue_cond);
2652   for (int i = 0; i < config_queue_threads; i++)
2653     pthread_join (queue_threads[i], NULL);
2654
2655   if (config_flush_at_shutdown)
2656   {
2657     assert(cache_queue_head == NULL);
2658     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2659   }
2660
2661   free(queue_threads);
2662   free(config_base_dir);
2663   free(config_pid_file);
2664
2665   pthread_mutex_lock(&cache_lock);
2666   g_tree_destroy(cache_tree);
2667
2668   pthread_mutex_lock(&journal_lock);
2669   journal_done();
2670
2671   RRDD_LOG(LOG_INFO, "goodbye");
2672   closelog ();
2673
2674   remove_pidfile ();
2675
2676   return (0);
2677 } /* }}} int cleanup */
2678
2679 static int read_options (int argc, char **argv) /* {{{ */
2680 {
2681   int option;
2682   int status = 0;
2683
2684   while ((option = getopt(argc, argv, "gl:L:f:w:z:t:Bb:p:Fj:h?")) != -1)
2685   {
2686     switch (option)
2687     {
2688       case 'g':
2689         stay_foreground=1;
2690         break;
2691
2692       case 'L':
2693       case 'l':
2694       {
2695         listen_socket_t *new;
2696
2697         new = malloc(sizeof(listen_socket_t));
2698         if (new == NULL)
2699         {
2700           fprintf(stderr, "read_options: malloc failed.\n");
2701           return(2);
2702         }
2703         memset(new, 0, sizeof(listen_socket_t));
2704
2705         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2706         new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2707
2708         if (!rrd_add_ptr((void ***)&config_listen_address_list,
2709                          &config_listen_address_list_len, new))
2710         {
2711           fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2712           return (2);
2713         }
2714       }
2715       break;
2716
2717       case 'f':
2718       {
2719         int temp;
2720
2721         temp = atoi (optarg);
2722         if (temp > 0)
2723           config_flush_interval = temp;
2724         else
2725         {
2726           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2727           status = 3;
2728         }
2729       }
2730       break;
2731
2732       case 'w':
2733       {
2734         int temp;
2735
2736         temp = atoi (optarg);
2737         if (temp > 0)
2738           config_write_interval = temp;
2739         else
2740         {
2741           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2742           status = 2;
2743         }
2744       }
2745       break;
2746
2747       case 'z':
2748       {
2749         int temp;
2750
2751         temp = atoi(optarg);
2752         if (temp > 0)
2753           config_write_jitter = temp;
2754         else
2755         {
2756           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2757           status = 2;
2758         }
2759
2760         break;
2761       }
2762
2763       case 't':
2764       {
2765         int threads;
2766         threads = atoi(optarg);
2767         if (threads >= 1)
2768           config_queue_threads = threads;
2769         else
2770         {
2771           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2772           return 1;
2773         }
2774       }
2775       break;
2776
2777       case 'B':
2778         config_write_base_only = 1;
2779         break;
2780
2781       case 'b':
2782       {
2783         size_t len;
2784         char base_realpath[PATH_MAX];
2785
2786         if (config_base_dir != NULL)
2787           free (config_base_dir);
2788         config_base_dir = strdup (optarg);
2789         if (config_base_dir == NULL)
2790         {
2791           fprintf (stderr, "read_options: strdup failed.\n");
2792           return (3);
2793         }
2794
2795         /* make sure that the base directory is not resolved via
2796          * symbolic links.  this makes some performance-enhancing
2797          * assumptions possible (we don't have to resolve paths
2798          * that start with a "/")
2799          */
2800         if (realpath(config_base_dir, base_realpath) == NULL)
2801         {
2802           fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2803           return 5;
2804         }
2805         else if (strncmp(config_base_dir,
2806                          base_realpath, sizeof(base_realpath)) != 0)
2807         {
2808           fprintf(stderr,
2809                   "Base directory (-b) resolved via file system links!\n"
2810                   "Please consult rrdcached '-b' documentation!\n"
2811                   "Consider specifying the real directory (%s)\n",
2812                   base_realpath);
2813           return 5;
2814         }
2815
2816         len = strlen (config_base_dir);
2817         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2818         {
2819           config_base_dir[len - 1] = 0;
2820           len--;
2821         }
2822
2823         if (len < 1)
2824         {
2825           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2826           return (4);
2827         }
2828
2829         _config_base_dir_len = len;
2830       }
2831       break;
2832
2833       case 'p':
2834       {
2835         if (config_pid_file != NULL)
2836           free (config_pid_file);
2837         config_pid_file = strdup (optarg);
2838         if (config_pid_file == NULL)
2839         {
2840           fprintf (stderr, "read_options: strdup failed.\n");
2841           return (3);
2842         }
2843       }
2844       break;
2845
2846       case 'F':
2847         config_flush_at_shutdown = 1;
2848         break;
2849
2850       case 'j':
2851       {
2852         struct stat statbuf;
2853         const char *dir = journal_dir = strdup(optarg);
2854
2855         status = stat(dir, &statbuf);
2856         if (status != 0)
2857         {
2858           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2859           return 6;
2860         }
2861
2862         if (!S_ISDIR(statbuf.st_mode)
2863             || access(dir, R_OK|W_OK|X_OK) != 0)
2864         {
2865           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2866                   errno ? rrd_strerror(errno) : "");
2867           return 6;
2868         }
2869       }
2870       break;
2871
2872       case 'h':
2873       case '?':
2874         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2875             "\n"
2876             "Usage: rrdcached [options]\n"
2877             "\n"
2878             "Valid options are:\n"
2879             "  -l <address>  Socket address to listen to.\n"
2880             "  -L <address>  Socket address to listen to ('FLUSH' only).\n"
2881             "  -w <seconds>  Interval in which to write data.\n"
2882             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2883             "  -t <threads>  Number of write threads.\n"
2884             "  -f <seconds>  Interval in which to flush dead data.\n"
2885             "  -p <file>     Location of the PID-file.\n"
2886             "  -b <dir>      Base directory to change to.\n"
2887             "  -B            Restrict file access to paths within -b <dir>\n"
2888             "  -g            Do not fork and run in the foreground.\n"
2889             "  -j <dir>      Directory in which to create the journal files.\n"
2890             "  -F            Always flush all updates at shutdown\n"
2891             "\n"
2892             "For more information and a detailed description of all options "
2893             "please refer\n"
2894             "to the rrdcached(1) manual page.\n",
2895             VERSION);
2896         status = -1;
2897         break;
2898     } /* switch (option) */
2899   } /* while (getopt) */
2900
2901   /* advise the user when values are not sane */
2902   if (config_flush_interval < 2 * config_write_interval)
2903     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2904             " 2x write interval (-w) !\n");
2905   if (config_write_jitter > config_write_interval)
2906     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2907             " write interval (-w) !\n");
2908
2909   if (config_write_base_only && config_base_dir == NULL)
2910     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2911             "  Consult the rrdcached documentation\n");
2912
2913   if (journal_dir == NULL)
2914     config_flush_at_shutdown = 1;
2915
2916   return (status);
2917 } /* }}} int read_options */
2918
2919 int main (int argc, char **argv)
2920 {
2921   int status;
2922
2923   status = read_options (argc, argv);
2924   if (status != 0)
2925   {
2926     if (status < 0)
2927       status = 0;
2928     return (status);
2929   }
2930
2931   status = daemonize ();
2932   if (status != 0)
2933   {
2934     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
2935     return (1);
2936   }
2937
2938   journal_init();
2939
2940   /* start the queue threads */
2941   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
2942   if (queue_threads == NULL)
2943   {
2944     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
2945     cleanup();
2946     return (1);
2947   }
2948   for (int i = 0; i < config_queue_threads; i++)
2949   {
2950     memset (&queue_threads[i], 0, sizeof (*queue_threads));
2951     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
2952     if (status != 0)
2953     {
2954       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2955       cleanup();
2956       return (1);
2957     }
2958   }
2959
2960   /* start the flush thread */
2961   memset(&flush_thread, 0, sizeof(flush_thread));
2962   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
2963   if (status != 0)
2964   {
2965     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
2966     cleanup();
2967     return (1);
2968   }
2969
2970   listen_thread_main (NULL);
2971   cleanup ();
2972
2973   return (0);
2974 } /* int main */
2975
2976 /*
2977  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2978  */