Add a "FETCH" command to RRDCacheD which behaves like a (simplified
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008-2010 Florian octo Forster
4  * Copyright (C) 2008,2009 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
73
74 #include "rrd.h"
75 #include "rrd_client.h"
76
77 #include <stdlib.h>
78
79 #ifndef WIN32
80 #ifdef HAVE_STDINT_H
81 #  include <stdint.h>
82 #endif
83 #include <unistd.h>
84 #include <strings.h>
85 #include <inttypes.h>
86 #include <sys/socket.h>
87
88 #else
89
90 #endif
91 #include <stdio.h>
92 #include <string.h>
93
94 #include <sys/types.h>
95 #include <sys/stat.h>
96 #include <dirent.h>
97 #include <fcntl.h>
98 #include <signal.h>
99 #include <sys/un.h>
100 #include <netdb.h>
101 #include <poll.h>
102 #include <syslog.h>
103 #include <pthread.h>
104 #include <errno.h>
105 #include <assert.h>
106 #include <sys/time.h>
107 #include <time.h>
108 #include <libgen.h>
109 #include <grp.h>
110
111 #include <glib-2.0/glib.h>
112 /* }}} */
113
114 #define RRDD_LOG(severity, ...) \
115   do { \
116     if (stay_foreground) \
117       fprintf(stderr, __VA_ARGS__); \
118     syslog ((severity), __VA_ARGS__); \
119   } while (0)
120
121 #ifndef __GNUC__
122 # define __attribute__(x) /**/
123 #endif
124
125 /*
126  * Types
127  */
128 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
129
130 struct listen_socket_s
131 {
132   int fd;
133   char addr[PATH_MAX + 1];
134   int family;
135
136   /* state for BATCH processing */
137   time_t batch_start;
138   int batch_cmd;
139
140   /* buffered IO */
141   char *rbuf;
142   off_t next_cmd;
143   off_t next_read;
144
145   char *wbuf;
146   ssize_t wbuf_len;
147
148   uint32_t permissions;
149
150   gid_t  socket_group;
151   mode_t socket_permissions;
152 };
153 typedef struct listen_socket_s listen_socket_t;
154
155 struct command_s;
156 typedef struct command_s command_t;
157 /* note: guard against "unused" warnings in the handlers */
158 #define DISPATCH_PROTO  listen_socket_t *sock   __attribute__((unused)),\
159                         time_t now              __attribute__((unused)),\
160                         char  *buffer           __attribute__((unused)),\
161                         size_t buffer_size      __attribute__((unused))
162
163 #define HANDLER_PROTO   command_t *cmd          __attribute__((unused)),\
164                         DISPATCH_PROTO
165
166 struct command_s {
167   char   *cmd;
168   int (*handler)(HANDLER_PROTO);
169
170   char  context;                /* where we expect to see it */
171 #define CMD_CONTEXT_CLIENT      (1<<0)
172 #define CMD_CONTEXT_BATCH       (1<<1)
173 #define CMD_CONTEXT_JOURNAL     (1<<2)
174 #define CMD_CONTEXT_ANY         (0x7f)
175
176   char *syntax;
177   char *help;
178 };
179
180 struct cache_item_s;
181 typedef struct cache_item_s cache_item_t;
182 struct cache_item_s
183 {
184   char *file;
185   char **values;
186   size_t values_num;            /* number of valid pointers */
187   size_t values_alloc;          /* number of allocated pointers */
188   time_t last_flush_time;
189   time_t last_update_stamp;
190 #define CI_FLAGS_IN_TREE  (1<<0)
191 #define CI_FLAGS_IN_QUEUE (1<<1)
192   int flags;
193   pthread_cond_t  flushed;
194   cache_item_t *prev;
195   cache_item_t *next;
196 };
197
198 struct callback_flush_data_s
199 {
200   time_t now;
201   time_t abs_timeout;
202   char **keys;
203   size_t keys_num;
204 };
205 typedef struct callback_flush_data_s callback_flush_data_t;
206
207 enum queue_side_e
208 {
209   HEAD,
210   TAIL
211 };
212 typedef enum queue_side_e queue_side_t;
213
214 /* describe a set of journal files */
215 typedef struct {
216   char **files;
217   size_t files_num;
218 } journal_set;
219
220 /* max length of socket command or response */
221 #define CMD_MAX 4096
222 #define RBUF_SIZE (CMD_MAX*2)
223
224 /*
225  * Variables
226  */
227 static int stay_foreground = 0;
228 static uid_t daemon_uid;
229
230 static listen_socket_t *listen_fds = NULL;
231 static size_t listen_fds_num = 0;
232
233 enum {
234   RUNNING,              /* normal operation */
235   FLUSHING,             /* flushing remaining values */
236   SHUTDOWN              /* shutting down */
237 } state = RUNNING;
238
239 static pthread_t *queue_threads;
240 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
241 static int config_queue_threads = 4;
242
243 static pthread_t flush_thread;
244 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
245
246 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
247 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
248 static int connection_threads_num = 0;
249
250 /* Cache stuff */
251 static GTree          *cache_tree = NULL;
252 static cache_item_t   *cache_queue_head = NULL;
253 static cache_item_t   *cache_queue_tail = NULL;
254 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
255
256 static int config_write_interval = 300;
257 static int config_write_jitter   = 0;
258 static int config_flush_interval = 3600;
259 static int config_flush_at_shutdown = 0;
260 static char *config_pid_file = NULL;
261 static char *config_base_dir = NULL;
262 static size_t _config_base_dir_len = 0;
263 static int config_write_base_only = 0;
264 static size_t config_alloc_chunk = 1;
265
266 static listen_socket_t **config_listen_address_list = NULL;
267 static size_t config_listen_address_list_len = 0;
268
269 static uint64_t stats_queue_length = 0;
270 static uint64_t stats_updates_received = 0;
271 static uint64_t stats_flush_received = 0;
272 static uint64_t stats_updates_written = 0;
273 static uint64_t stats_data_sets_written = 0;
274 static uint64_t stats_journal_bytes = 0;
275 static uint64_t stats_journal_rotate = 0;
276 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
277
278 /* Journaled updates */
279 #define JOURNAL_REPLAY(s) ((s) == NULL)
280 #define JOURNAL_BASE "rrd.journal"
281 static journal_set *journal_cur = NULL;
282 static journal_set *journal_old = NULL;
283 static char *journal_dir = NULL;
284 static FILE *journal_fh = NULL;         /* current journal file handle */
285 static long  journal_size = 0;          /* current journal size */
286 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
287 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
288 static int journal_write(char *cmd, char *args);
289 static void journal_done(void);
290 static void journal_rotate(void);
291
292 /* prototypes for forward refernces */
293 static int handle_request_help (HANDLER_PROTO);
294
295 /* 
296  * Functions
297  */
298 static void sig_common (const char *sig) /* {{{ */
299 {
300   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
301   state = FLUSHING;
302   pthread_cond_broadcast(&flush_cond);
303   pthread_cond_broadcast(&queue_cond);
304 } /* }}} void sig_common */
305
306 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
307 {
308   sig_common("INT");
309 } /* }}} void sig_int_handler */
310
311 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
312 {
313   sig_common("TERM");
314 } /* }}} void sig_term_handler */
315
316 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
317 {
318   config_flush_at_shutdown = 1;
319   sig_common("USR1");
320 } /* }}} void sig_usr1_handler */
321
322 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
323 {
324   config_flush_at_shutdown = 0;
325   sig_common("USR2");
326 } /* }}} void sig_usr2_handler */
327
328 static void install_signal_handlers(void) /* {{{ */
329 {
330   /* These structures are static, because `sigaction' behaves weird if the are
331    * overwritten.. */
332   static struct sigaction sa_int;
333   static struct sigaction sa_term;
334   static struct sigaction sa_pipe;
335   static struct sigaction sa_usr1;
336   static struct sigaction sa_usr2;
337
338   /* Install signal handlers */
339   memset (&sa_int, 0, sizeof (sa_int));
340   sa_int.sa_handler = sig_int_handler;
341   sigaction (SIGINT, &sa_int, NULL);
342
343   memset (&sa_term, 0, sizeof (sa_term));
344   sa_term.sa_handler = sig_term_handler;
345   sigaction (SIGTERM, &sa_term, NULL);
346
347   memset (&sa_pipe, 0, sizeof (sa_pipe));
348   sa_pipe.sa_handler = SIG_IGN;
349   sigaction (SIGPIPE, &sa_pipe, NULL);
350
351   memset (&sa_pipe, 0, sizeof (sa_usr1));
352   sa_usr1.sa_handler = sig_usr1_handler;
353   sigaction (SIGUSR1, &sa_usr1, NULL);
354
355   memset (&sa_usr2, 0, sizeof (sa_usr2));
356   sa_usr2.sa_handler = sig_usr2_handler;
357   sigaction (SIGUSR2, &sa_usr2, NULL);
358
359 } /* }}} void install_signal_handlers */
360
361 static int open_pidfile(char *action, int oflag) /* {{{ */
362 {
363   int fd;
364   const char *file;
365   char *file_copy, *dir;
366
367   file = (config_pid_file != NULL)
368     ? config_pid_file
369     : LOCALSTATEDIR "/run/rrdcached.pid";
370
371   /* dirname may modify its argument */
372   file_copy = strdup(file);
373   if (file_copy == NULL)
374   {
375     fprintf(stderr, "rrdcached: strdup(): %s\n",
376         rrd_strerror(errno));
377     return -1;
378   }
379
380   dir = dirname(file_copy);
381   if (rrd_mkdir_p(dir, 0777) != 0)
382   {
383     fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
384         dir, rrd_strerror(errno));
385     return -1;
386   }
387
388   free(file_copy);
389
390   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
391   if (fd < 0)
392     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
393             action, file, rrd_strerror(errno));
394
395   return(fd);
396 } /* }}} static int open_pidfile */
397
398 /* check existing pid file to see whether a daemon is running */
399 static int check_pidfile(void)
400 {
401   int pid_fd;
402   pid_t pid;
403   char pid_str[16];
404
405   pid_fd = open_pidfile("open", O_RDWR);
406   if (pid_fd < 0)
407     return pid_fd;
408
409   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
410     return -1;
411
412   pid = atoi(pid_str);
413   if (pid <= 0)
414     return -1;
415
416   /* another running process that we can signal COULD be
417    * a competing rrdcached */
418   if (pid != getpid() && kill(pid, 0) == 0)
419   {
420     fprintf(stderr,
421             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
422     close(pid_fd);
423     return -1;
424   }
425
426   lseek(pid_fd, 0, SEEK_SET);
427   if (ftruncate(pid_fd, 0) == -1)
428   {
429     fprintf(stderr,
430             "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
431     close(pid_fd);
432     return -1;
433   }
434
435   fprintf(stderr,
436           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
437           "rrdcached: starting normally.\n", pid);
438
439   return pid_fd;
440 } /* }}} static int check_pidfile */
441
442 static int write_pidfile (int fd) /* {{{ */
443 {
444   pid_t pid;
445   FILE *fh;
446
447   pid = getpid ();
448
449   fh = fdopen (fd, "w");
450   if (fh == NULL)
451   {
452     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
453     close(fd);
454     return (-1);
455   }
456
457   fprintf (fh, "%i\n", (int) pid);
458   fclose (fh);
459
460   return (0);
461 } /* }}} int write_pidfile */
462
463 static int remove_pidfile (void) /* {{{ */
464 {
465   char *file;
466   int status;
467
468   file = (config_pid_file != NULL)
469     ? config_pid_file
470     : LOCALSTATEDIR "/run/rrdcached.pid";
471
472   status = unlink (file);
473   if (status == 0)
474     return (0);
475   return (errno);
476 } /* }}} int remove_pidfile */
477
478 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
479 {
480   char *eol;
481
482   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
483                sock->next_read - sock->next_cmd);
484
485   if (eol == NULL)
486   {
487     /* no commands left, move remainder back to front of rbuf */
488     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
489             sock->next_read - sock->next_cmd);
490     sock->next_read -= sock->next_cmd;
491     sock->next_cmd = 0;
492     *len = 0;
493     return NULL;
494   }
495   else
496   {
497     char *cmd = sock->rbuf + sock->next_cmd;
498     *eol = '\0';
499
500     sock->next_cmd = eol - sock->rbuf + 1;
501
502     if (eol > sock->rbuf && *(eol-1) == '\r')
503       *(--eol) = '\0'; /* handle "\r\n" EOL */
504
505     *len = eol - cmd;
506
507     return cmd;
508   }
509
510   /* NOTREACHED */
511   assert(1==0);
512 } /* }}} char *next_cmd */
513
514 /* add the characters directly to the write buffer */
515 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
516 {
517   char *new_buf;
518
519   assert(sock != NULL);
520
521   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
522   if (new_buf == NULL)
523   {
524     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
525     return -1;
526   }
527
528   strncpy(new_buf + sock->wbuf_len, str, len + 1);
529
530   sock->wbuf = new_buf;
531   sock->wbuf_len += len;
532
533   return 0;
534 } /* }}} static int add_to_wbuf */
535
536 /* add the text to the "extra" info that's sent after the status line */
537 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
538 {
539   va_list argp;
540   char buffer[CMD_MAX];
541   int len;
542
543   if (JOURNAL_REPLAY(sock)) return 0;
544   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
545
546   va_start(argp, fmt);
547 #ifdef HAVE_VSNPRINTF
548   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
549 #else
550   len = vsprintf(buffer, fmt, argp);
551 #endif
552   va_end(argp);
553   if (len < 0)
554   {
555     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
556     return -1;
557   }
558
559   return add_to_wbuf(sock, buffer, len);
560 } /* }}} static int add_response_info */
561
562 static int count_lines(char *str) /* {{{ */
563 {
564   int lines = 0;
565
566   if (str != NULL)
567   {
568     while ((str = strchr(str, '\n')) != NULL)
569     {
570       ++lines;
571       ++str;
572     }
573   }
574
575   return lines;
576 } /* }}} static int count_lines */
577
578 /* send the response back to the user.
579  * returns 0 on success, -1 on error
580  * write buffer is always zeroed after this call */
581 static int send_response (listen_socket_t *sock, response_code rc,
582                           char *fmt, ...) /* {{{ */
583 {
584   va_list argp;
585   char buffer[CMD_MAX];
586   int lines;
587   ssize_t wrote;
588   int rclen, len;
589
590   if (JOURNAL_REPLAY(sock)) return rc;
591
592   if (sock->batch_start)
593   {
594     if (rc == RESP_OK)
595       return rc; /* no response on success during BATCH */
596     lines = sock->batch_cmd;
597   }
598   else if (rc == RESP_OK)
599     lines = count_lines(sock->wbuf);
600   else
601     lines = -1;
602
603   rclen = sprintf(buffer, "%d ", lines);
604   va_start(argp, fmt);
605 #ifdef HAVE_VSNPRINTF
606   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
607 #else
608   len = vsprintf(buffer+rclen, fmt, argp);
609 #endif
610   va_end(argp);
611   if (len < 0)
612     return -1;
613
614   len += rclen;
615
616   /* append the result to the wbuf, don't write to the user */
617   if (sock->batch_start)
618     return add_to_wbuf(sock, buffer, len);
619
620   /* first write must be complete */
621   if (len != write(sock->fd, buffer, len))
622   {
623     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
624     return -1;
625   }
626
627   if (sock->wbuf != NULL && rc == RESP_OK)
628   {
629     wrote = 0;
630     while (wrote < sock->wbuf_len)
631     {
632       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
633       if (wb <= 0)
634       {
635         RRDD_LOG(LOG_INFO, "send_response: could not write results");
636         return -1;
637       }
638       wrote += wb;
639     }
640   }
641
642   free(sock->wbuf); sock->wbuf = NULL;
643   sock->wbuf_len = 0;
644
645   return 0;
646 } /* }}} */
647
648 static void wipe_ci_values(cache_item_t *ci, time_t when)
649 {
650   ci->values = NULL;
651   ci->values_num = 0;
652   ci->values_alloc = 0;
653
654   ci->last_flush_time = when;
655   if (config_write_jitter > 0)
656     ci->last_flush_time += (rrd_random() % config_write_jitter);
657 }
658
659 /* remove_from_queue
660  * remove a "cache_item_t" item from the queue.
661  * must hold 'cache_lock' when calling this
662  */
663 static void remove_from_queue(cache_item_t *ci) /* {{{ */
664 {
665   if (ci == NULL) return;
666   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
667
668   if (ci->prev == NULL)
669     cache_queue_head = ci->next; /* reset head */
670   else
671     ci->prev->next = ci->next;
672
673   if (ci->next == NULL)
674     cache_queue_tail = ci->prev; /* reset the tail */
675   else
676     ci->next->prev = ci->prev;
677
678   ci->next = ci->prev = NULL;
679   ci->flags &= ~CI_FLAGS_IN_QUEUE;
680
681   pthread_mutex_lock (&stats_lock);
682   assert (stats_queue_length > 0);
683   stats_queue_length--;
684   pthread_mutex_unlock (&stats_lock);
685
686 } /* }}} static void remove_from_queue */
687
688 /* free the resources associated with the cache_item_t
689  * must hold cache_lock when calling this function
690  */
691 static void *free_cache_item(cache_item_t *ci) /* {{{ */
692 {
693   if (ci == NULL) return NULL;
694
695   remove_from_queue(ci);
696
697   for (size_t i=0; i < ci->values_num; i++)
698     free(ci->values[i]);
699
700   free (ci->values);
701   free (ci->file);
702
703   /* in case anyone is waiting */
704   pthread_cond_broadcast(&ci->flushed);
705   pthread_cond_destroy(&ci->flushed);
706
707   free (ci);
708
709   return NULL;
710 } /* }}} static void *free_cache_item */
711
712 /*
713  * enqueue_cache_item:
714  * `cache_lock' must be acquired before calling this function!
715  */
716 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
717     queue_side_t side)
718 {
719   if (ci == NULL)
720     return (-1);
721
722   if (ci->values_num == 0)
723     return (0);
724
725   if (side == HEAD)
726   {
727     if (cache_queue_head == ci)
728       return 0;
729
730     /* remove if further down in queue */
731     remove_from_queue(ci);
732
733     ci->prev = NULL;
734     ci->next = cache_queue_head;
735     if (ci->next != NULL)
736       ci->next->prev = ci;
737     cache_queue_head = ci;
738
739     if (cache_queue_tail == NULL)
740       cache_queue_tail = cache_queue_head;
741   }
742   else /* (side == TAIL) */
743   {
744     /* We don't move values back in the list.. */
745     if (ci->flags & CI_FLAGS_IN_QUEUE)
746       return (0);
747
748     assert (ci->next == NULL);
749     assert (ci->prev == NULL);
750
751     ci->prev = cache_queue_tail;
752
753     if (cache_queue_tail == NULL)
754       cache_queue_head = ci;
755     else
756       cache_queue_tail->next = ci;
757
758     cache_queue_tail = ci;
759   }
760
761   ci->flags |= CI_FLAGS_IN_QUEUE;
762
763   pthread_cond_signal(&queue_cond);
764   pthread_mutex_lock (&stats_lock);
765   stats_queue_length++;
766   pthread_mutex_unlock (&stats_lock);
767
768   return (0);
769 } /* }}} int enqueue_cache_item */
770
771 /*
772  * tree_callback_flush:
773  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
774  * while this is in progress.
775  */
776 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
777     gpointer data)
778 {
779   cache_item_t *ci;
780   callback_flush_data_t *cfd;
781
782   ci = (cache_item_t *) value;
783   cfd = (callback_flush_data_t *) data;
784
785   if (ci->flags & CI_FLAGS_IN_QUEUE)
786     return FALSE;
787
788   if (ci->values_num > 0
789       && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
790   {
791     enqueue_cache_item (ci, TAIL);
792   }
793   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
794       && (ci->values_num <= 0))
795   {
796     assert ((char *) key == ci->file);
797     if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
798     {
799       RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
800       return (FALSE);
801     }
802   }
803
804   return (FALSE);
805 } /* }}} gboolean tree_callback_flush */
806
807 static int flush_old_values (int max_age)
808 {
809   callback_flush_data_t cfd;
810   size_t k;
811
812   memset (&cfd, 0, sizeof (cfd));
813   /* Pass the current time as user data so that we don't need to call
814    * `time' for each node. */
815   cfd.now = time (NULL);
816   cfd.keys = NULL;
817   cfd.keys_num = 0;
818
819   if (max_age > 0)
820     cfd.abs_timeout = cfd.now - max_age;
821   else
822     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
823
824   /* `tree_callback_flush' will return the keys of all values that haven't
825    * been touched in the last `config_flush_interval' seconds in `cfd'.
826    * The char*'s in this array point to the same memory as ci->file, so we
827    * don't need to free them separately. */
828   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
829
830   for (k = 0; k < cfd.keys_num; k++)
831   {
832     gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
833     /* should never fail, since we have held the cache_lock
834      * the entire time */
835     assert(status == TRUE);
836   }
837
838   if (cfd.keys != NULL)
839   {
840     free (cfd.keys);
841     cfd.keys = NULL;
842   }
843
844   return (0);
845 } /* int flush_old_values */
846
847 static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
848 {
849   struct timeval now;
850   struct timespec next_flush;
851   int status;
852
853   gettimeofday (&now, NULL);
854   next_flush.tv_sec = now.tv_sec + config_flush_interval;
855   next_flush.tv_nsec = 1000 * now.tv_usec;
856
857   pthread_mutex_lock(&cache_lock);
858
859   while (state == RUNNING)
860   {
861     gettimeofday (&now, NULL);
862     if ((now.tv_sec > next_flush.tv_sec)
863         || ((now.tv_sec == next_flush.tv_sec)
864           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
865     {
866       RRDD_LOG(LOG_DEBUG, "flushing old values");
867
868       /* Determine the time of the next cache flush. */
869       next_flush.tv_sec = now.tv_sec + config_flush_interval;
870
871       /* Flush all values that haven't been written in the last
872        * `config_write_interval' seconds. */
873       flush_old_values (config_write_interval);
874
875       /* unlock the cache while we rotate so we don't block incoming
876        * updates if the fsync() blocks on disk I/O */
877       pthread_mutex_unlock(&cache_lock);
878       journal_rotate();
879       pthread_mutex_lock(&cache_lock);
880     }
881
882     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
883     if (status != 0 && status != ETIMEDOUT)
884     {
885       RRDD_LOG (LOG_ERR, "flush_thread_main: "
886                 "pthread_cond_timedwait returned %i.", status);
887     }
888   }
889
890   if (config_flush_at_shutdown)
891     flush_old_values (-1); /* flush everything */
892
893   state = SHUTDOWN;
894
895   pthread_mutex_unlock(&cache_lock);
896
897   return NULL;
898 } /* void *flush_thread_main */
899
900 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
901 {
902   pthread_mutex_lock (&cache_lock);
903
904   while (state != SHUTDOWN
905          || (cache_queue_head != NULL && config_flush_at_shutdown))
906   {
907     cache_item_t *ci;
908     char *file;
909     char **values;
910     size_t values_num;
911     int status;
912
913     /* Now, check if there's something to store away. If not, wait until
914      * something comes in. */
915     if (cache_queue_head == NULL)
916     {
917       status = pthread_cond_wait (&queue_cond, &cache_lock);
918       if ((status != 0) && (status != ETIMEDOUT))
919       {
920         RRDD_LOG (LOG_ERR, "queue_thread_main: "
921             "pthread_cond_wait returned %i.", status);
922       }
923     }
924
925     /* Check if a value has arrived. This may be NULL if we timed out or there
926      * was an interrupt such as a signal. */
927     if (cache_queue_head == NULL)
928       continue;
929
930     ci = cache_queue_head;
931
932     /* copy the relevant parts */
933     file = strdup (ci->file);
934     if (file == NULL)
935     {
936       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
937       continue;
938     }
939
940     assert(ci->values != NULL);
941     assert(ci->values_num > 0);
942
943     values = ci->values;
944     values_num = ci->values_num;
945
946     wipe_ci_values(ci, time(NULL));
947     remove_from_queue(ci);
948
949     pthread_mutex_unlock (&cache_lock);
950
951     rrd_clear_error ();
952     status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
953     if (status != 0)
954     {
955       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
956           "rrd_update_r (%s) failed with status %i. (%s)",
957           file, status, rrd_get_error());
958     }
959
960     journal_write("wrote", file);
961
962     /* Search again in the tree.  It's possible someone issued a "FORGET"
963      * while we were writing the update values. */
964     pthread_mutex_lock(&cache_lock);
965     ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
966     if (ci)
967       pthread_cond_broadcast(&ci->flushed);
968     pthread_mutex_unlock(&cache_lock);
969
970     if (status == 0)
971     {
972       pthread_mutex_lock (&stats_lock);
973       stats_updates_written++;
974       stats_data_sets_written += values_num;
975       pthread_mutex_unlock (&stats_lock);
976     }
977
978     rrd_free_ptrs((void ***) &values, &values_num);
979     free(file);
980
981     pthread_mutex_lock (&cache_lock);
982   }
983   pthread_mutex_unlock (&cache_lock);
984
985   return (NULL);
986 } /* }}} void *queue_thread_main */
987
988 static int buffer_get_field (char **buffer_ret, /* {{{ */
989     size_t *buffer_size_ret, char **field_ret)
990 {
991   char *buffer;
992   size_t buffer_pos;
993   size_t buffer_size;
994   char *field;
995   size_t field_size;
996   int status;
997
998   buffer = *buffer_ret;
999   buffer_pos = 0;
1000   buffer_size = *buffer_size_ret;
1001   field = *buffer_ret;
1002   field_size = 0;
1003
1004   if (buffer_size <= 0)
1005     return (-1);
1006
1007   /* This is ensured by `handle_request'. */
1008   assert (buffer[buffer_size - 1] == '\0');
1009
1010   status = -1;
1011   while (buffer_pos < buffer_size)
1012   {
1013     /* Check for end-of-field or end-of-buffer */
1014     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1015     {
1016       field[field_size] = 0;
1017       field_size++;
1018       buffer_pos++;
1019       status = 0;
1020       break;
1021     }
1022     /* Handle escaped characters. */
1023     else if (buffer[buffer_pos] == '\\')
1024     {
1025       if (buffer_pos >= (buffer_size - 1))
1026         break;
1027       buffer_pos++;
1028       field[field_size] = buffer[buffer_pos];
1029       field_size++;
1030       buffer_pos++;
1031     }
1032     /* Normal operation */ 
1033     else
1034     {
1035       field[field_size] = buffer[buffer_pos];
1036       field_size++;
1037       buffer_pos++;
1038     }
1039   } /* while (buffer_pos < buffer_size) */
1040
1041   if (status != 0)
1042     return (status);
1043
1044   *buffer_ret = buffer + buffer_pos;
1045   *buffer_size_ret = buffer_size - buffer_pos;
1046   *field_ret = field;
1047
1048   return (0);
1049 } /* }}} int buffer_get_field */
1050
1051 /* if we're restricting writes to the base directory,
1052  * check whether the file falls within the dir
1053  * returns 1 if OK, otherwise 0
1054  */
1055 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1056 {
1057   assert(file != NULL);
1058
1059   if (!config_write_base_only
1060       || JOURNAL_REPLAY(sock)
1061       || config_base_dir == NULL)
1062     return 1;
1063
1064   if (strstr(file, "../") != NULL) goto err;
1065
1066   /* relative paths without "../" are ok */
1067   if (*file != '/') return 1;
1068
1069   /* file must be of the format base + "/" + <1+ char filename> */
1070   if (strlen(file) < _config_base_dir_len + 2) goto err;
1071   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1072   if (*(file + _config_base_dir_len) != '/') goto err;
1073
1074   return 1;
1075
1076 err:
1077   if (sock != NULL && sock->fd >= 0)
1078     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1079
1080   return 0;
1081 } /* }}} static int check_file_access */
1082
1083 /* when using a base dir, convert relative paths to absolute paths.
1084  * if necessary, modifies the "filename" pointer to point
1085  * to the new path created in "tmp".  "tmp" is provided
1086  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1087  *
1088  * this allows us to optimize for the expected case (absolute path)
1089  * with a no-op.
1090  */
1091 static void get_abs_path(char **filename, char *tmp)
1092 {
1093   assert(tmp != NULL);
1094   assert(filename != NULL && *filename != NULL);
1095
1096   if (config_base_dir == NULL || **filename == '/')
1097     return;
1098
1099   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1100   *filename = tmp;
1101 } /* }}} static int get_abs_path */
1102
1103 static int flush_file (const char *filename) /* {{{ */
1104 {
1105   cache_item_t *ci;
1106
1107   pthread_mutex_lock (&cache_lock);
1108
1109   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1110   if (ci == NULL)
1111   {
1112     pthread_mutex_unlock (&cache_lock);
1113     return (ENOENT);
1114   }
1115
1116   if (ci->values_num > 0)
1117   {
1118     /* Enqueue at head */
1119     enqueue_cache_item (ci, HEAD);
1120     pthread_cond_wait(&ci->flushed, &cache_lock);
1121   }
1122
1123   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1124    * may have been purged during our cond_wait() */
1125
1126   pthread_mutex_unlock(&cache_lock);
1127
1128   return (0);
1129 } /* }}} int flush_file */
1130
1131 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1132 {
1133   char *err = "Syntax error.\n";
1134
1135   if (cmd && cmd->syntax)
1136     err = cmd->syntax;
1137
1138   return send_response(sock, RESP_ERR, "Usage: %s", err);
1139 } /* }}} static int syntax_error() */
1140
1141 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1142 {
1143   uint64_t copy_queue_length;
1144   uint64_t copy_updates_received;
1145   uint64_t copy_flush_received;
1146   uint64_t copy_updates_written;
1147   uint64_t copy_data_sets_written;
1148   uint64_t copy_journal_bytes;
1149   uint64_t copy_journal_rotate;
1150
1151   uint64_t tree_nodes_number;
1152   uint64_t tree_depth;
1153
1154   pthread_mutex_lock (&stats_lock);
1155   copy_queue_length       = stats_queue_length;
1156   copy_updates_received   = stats_updates_received;
1157   copy_flush_received     = stats_flush_received;
1158   copy_updates_written    = stats_updates_written;
1159   copy_data_sets_written  = stats_data_sets_written;
1160   copy_journal_bytes      = stats_journal_bytes;
1161   copy_journal_rotate     = stats_journal_rotate;
1162   pthread_mutex_unlock (&stats_lock);
1163
1164   pthread_mutex_lock (&cache_lock);
1165   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1166   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1167   pthread_mutex_unlock (&cache_lock);
1168
1169   add_response_info(sock,
1170                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1171   add_response_info(sock,
1172                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1173   add_response_info(sock,
1174                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1175   add_response_info(sock,
1176                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1177   add_response_info(sock,
1178                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1179   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1180   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1181   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1182   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1183
1184   send_response(sock, RESP_OK, "Statistics follow\n");
1185
1186   return (0);
1187 } /* }}} int handle_request_stats */
1188
1189 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1190 {
1191   char *file, file_tmp[PATH_MAX];
1192   int status;
1193
1194   status = buffer_get_field (&buffer, &buffer_size, &file);
1195   if (status != 0)
1196   {
1197     return syntax_error(sock,cmd);
1198   }
1199   else
1200   {
1201     pthread_mutex_lock(&stats_lock);
1202     stats_flush_received++;
1203     pthread_mutex_unlock(&stats_lock);
1204
1205     get_abs_path(&file, file_tmp);
1206     if (!check_file_access(file, sock)) return 0;
1207
1208     status = flush_file (file);
1209     if (status == 0)
1210       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1211     else if (status == ENOENT)
1212     {
1213       /* no file in our tree; see whether it exists at all */
1214       struct stat statbuf;
1215
1216       memset(&statbuf, 0, sizeof(statbuf));
1217       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1218         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1219       else
1220         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1221     }
1222     else if (status < 0)
1223       return send_response(sock, RESP_ERR, "Internal error.\n");
1224     else
1225       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1226   }
1227
1228   /* NOTREACHED */
1229   assert(1==0);
1230 } /* }}} int handle_request_flush */
1231
1232 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1233 {
1234   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1235
1236   pthread_mutex_lock(&cache_lock);
1237   flush_old_values(-1);
1238   pthread_mutex_unlock(&cache_lock);
1239
1240   return send_response(sock, RESP_OK, "Started flush.\n");
1241 } /* }}} static int handle_request_flushall */
1242
1243 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1244 {
1245   int status;
1246   char *file, file_tmp[PATH_MAX];
1247   cache_item_t *ci;
1248
1249   status = buffer_get_field(&buffer, &buffer_size, &file);
1250   if (status != 0)
1251     return syntax_error(sock,cmd);
1252
1253   get_abs_path(&file, file_tmp);
1254
1255   pthread_mutex_lock(&cache_lock);
1256   ci = g_tree_lookup(cache_tree, file);
1257   if (ci == NULL)
1258   {
1259     pthread_mutex_unlock(&cache_lock);
1260     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1261   }
1262
1263   for (size_t i=0; i < ci->values_num; i++)
1264     add_response_info(sock, "%s\n", ci->values[i]);
1265
1266   pthread_mutex_unlock(&cache_lock);
1267   return send_response(sock, RESP_OK, "updates pending\n");
1268 } /* }}} static int handle_request_pending */
1269
1270 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1271 {
1272   int status;
1273   gboolean found;
1274   char *file, file_tmp[PATH_MAX];
1275
1276   status = buffer_get_field(&buffer, &buffer_size, &file);
1277   if (status != 0)
1278     return syntax_error(sock,cmd);
1279
1280   get_abs_path(&file, file_tmp);
1281   if (!check_file_access(file, sock)) return 0;
1282
1283   pthread_mutex_lock(&cache_lock);
1284   found = g_tree_remove(cache_tree, file);
1285   pthread_mutex_unlock(&cache_lock);
1286
1287   if (found == TRUE)
1288   {
1289     if (!JOURNAL_REPLAY(sock))
1290       journal_write("forget", file);
1291
1292     return send_response(sock, RESP_OK, "Gone!\n");
1293   }
1294   else
1295     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1296
1297   /* NOTREACHED */
1298   assert(1==0);
1299 } /* }}} static int handle_request_forget */
1300
1301 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1302 {
1303   cache_item_t *ci;
1304
1305   pthread_mutex_lock(&cache_lock);
1306
1307   ci = cache_queue_head;
1308   while (ci != NULL)
1309   {
1310     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1311     ci = ci->next;
1312   }
1313
1314   pthread_mutex_unlock(&cache_lock);
1315
1316   return send_response(sock, RESP_OK, "in queue.\n");
1317 } /* }}} int handle_request_queue */
1318
1319 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1320 {
1321   char *file, file_tmp[PATH_MAX];
1322   int values_num = 0;
1323   int status;
1324   char orig_buf[CMD_MAX];
1325
1326   cache_item_t *ci;
1327
1328   /* save it for the journal later */
1329   if (!JOURNAL_REPLAY(sock))
1330     strncpy(orig_buf, buffer, buffer_size);
1331
1332   status = buffer_get_field (&buffer, &buffer_size, &file);
1333   if (status != 0)
1334     return syntax_error(sock,cmd);
1335
1336   pthread_mutex_lock(&stats_lock);
1337   stats_updates_received++;
1338   pthread_mutex_unlock(&stats_lock);
1339
1340   get_abs_path(&file, file_tmp);
1341   if (!check_file_access(file, sock)) return 0;
1342
1343   pthread_mutex_lock (&cache_lock);
1344   ci = g_tree_lookup (cache_tree, file);
1345
1346   if (ci == NULL) /* {{{ */
1347   {
1348     struct stat statbuf;
1349     cache_item_t *tmp;
1350
1351     /* don't hold the lock while we setup; stat(2) might block */
1352     pthread_mutex_unlock(&cache_lock);
1353
1354     memset (&statbuf, 0, sizeof (statbuf));
1355     status = stat (file, &statbuf);
1356     if (status != 0)
1357     {
1358       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1359
1360       status = errno;
1361       if (status == ENOENT)
1362         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1363       else
1364         return send_response(sock, RESP_ERR,
1365                              "stat failed with error %i.\n", status);
1366     }
1367     if (!S_ISREG (statbuf.st_mode))
1368       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1369
1370     if (access(file, R_OK|W_OK) != 0)
1371       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1372                            file, rrd_strerror(errno));
1373
1374     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1375     if (ci == NULL)
1376     {
1377       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1378
1379       return send_response(sock, RESP_ERR, "malloc failed.\n");
1380     }
1381     memset (ci, 0, sizeof (cache_item_t));
1382
1383     ci->file = strdup (file);
1384     if (ci->file == NULL)
1385     {
1386       free (ci);
1387       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1388
1389       return send_response(sock, RESP_ERR, "strdup failed.\n");
1390     }
1391
1392     wipe_ci_values(ci, now);
1393     ci->flags = CI_FLAGS_IN_TREE;
1394     pthread_cond_init(&ci->flushed, NULL);
1395
1396     pthread_mutex_lock(&cache_lock);
1397
1398     /* another UPDATE might have added this entry in the meantime */
1399     tmp = g_tree_lookup (cache_tree, file);
1400     if (tmp == NULL)
1401       g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1402     else
1403     {
1404       free_cache_item (ci);
1405       ci = tmp;
1406     }
1407
1408     /* state may have changed while we were unlocked */
1409     if (state == SHUTDOWN)
1410       return -1;
1411   } /* }}} */
1412   assert (ci != NULL);
1413
1414   /* don't re-write updates in replay mode */
1415   if (!JOURNAL_REPLAY(sock))
1416     journal_write("update", orig_buf);
1417
1418   while (buffer_size > 0)
1419   {
1420     char *value;
1421     time_t stamp;
1422     char *eostamp;
1423
1424     status = buffer_get_field (&buffer, &buffer_size, &value);
1425     if (status != 0)
1426     {
1427       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1428       break;
1429     }
1430
1431     /* make sure update time is always moving forward */
1432     stamp = strtol(value, &eostamp, 10);
1433     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1434     {
1435       pthread_mutex_unlock(&cache_lock);
1436       return send_response(sock, RESP_ERR,
1437                            "Cannot find timestamp in '%s'!\n", value);
1438     }
1439     else if (stamp <= ci->last_update_stamp)
1440     {
1441       pthread_mutex_unlock(&cache_lock);
1442       return send_response(sock, RESP_ERR,
1443                            "illegal attempt to update using time %ld when last"
1444                            " update time is %ld (minimum one second step)\n",
1445                            stamp, ci->last_update_stamp);
1446     }
1447     else
1448       ci->last_update_stamp = stamp;
1449
1450     if (!rrd_add_strdup_chunk(&ci->values, &ci->values_num, value,
1451                               &ci->values_alloc, config_alloc_chunk))
1452     {
1453       RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1454       continue;
1455     }
1456
1457     values_num++;
1458   }
1459
1460   if (((now - ci->last_flush_time) >= config_write_interval)
1461       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1462       && (ci->values_num > 0))
1463   {
1464     enqueue_cache_item (ci, TAIL);
1465   }
1466
1467   pthread_mutex_unlock (&cache_lock);
1468
1469   if (values_num < 1)
1470     return send_response(sock, RESP_ERR, "No values updated.\n");
1471   else
1472     return send_response(sock, RESP_OK,
1473                          "errors, enqueued %i value(s).\n", values_num);
1474
1475   /* NOTREACHED */
1476   assert(1==0);
1477
1478 } /* }}} int handle_request_update */
1479
1480 static int handle_request_fetch (HANDLER_PROTO) /* {{{ */
1481 {
1482   char *file;
1483   char *cf;
1484
1485   char *start_str;
1486   char *end_str;
1487   rrd_time_value_t start_tv;
1488   rrd_time_value_t end_tv;
1489   time_t start_tm;
1490   time_t end_tm;
1491
1492   unsigned long step;
1493   unsigned long ds_cnt;
1494   char **ds_namv;
1495   rrd_value_t *data;
1496
1497   int status;
1498   unsigned long i;
1499   time_t t;
1500   rrd_value_t *data_ptr;
1501
1502   file = NULL;
1503   cf = NULL;
1504   start_str = NULL;
1505   end_str = NULL;
1506
1507   /* Read the arguments */
1508   do /* while (0) */
1509   {
1510     status = buffer_get_field (&buffer, &buffer_size, &file);
1511     if (status != 0)
1512       break;
1513
1514     status = buffer_get_field (&buffer, &buffer_size, &cf);
1515     if (status != 0)
1516       break;
1517
1518     status = buffer_get_field (&buffer, &buffer_size, &start_str);
1519     if (status != 0)
1520     {
1521       start_str = NULL;
1522       status = 0;
1523       break;
1524     }
1525
1526     status = buffer_get_field (&buffer, &buffer_size, &end_str);
1527     if (status != 0)
1528     {
1529       end_str = NULL;
1530       status = 0;
1531       break;
1532     }
1533   } while (0);
1534
1535   if (status != 0)
1536     return (syntax_error(sock,cmd));
1537
1538   status = flush_file (file);
1539   if ((status != 0) && (status != ENOENT))
1540     return (send_response (sock, RESP_ERR,
1541           "flush_file (%s) failed with status %i.\n", file, status));
1542
1543   /* Parse start time */
1544   if (start_str != NULL)
1545   {
1546     const char *errmsg;
1547
1548     errmsg = rrd_parsetime (start_str, &start_tv);
1549     if (errmsg != NULL)
1550       return (send_response(sock, RESP_ERR,
1551             "Cannot parse start time `%s': %s\n", start_str, errmsg));
1552   }
1553   else
1554     rrd_parsetime ("-86400", &start_tv);
1555
1556   /* Parse end time */
1557   if (end_str != NULL)
1558   {
1559     const char *errmsg;
1560
1561     errmsg = rrd_parsetime (end_str, &end_tv);
1562     if (errmsg != NULL)
1563       return (send_response(sock, RESP_ERR,
1564             "Cannot parse end time `%s': %s\n", end_str, errmsg));
1565   }
1566   else
1567     rrd_parsetime ("now", &end_tv);
1568
1569   start_tm = 0;
1570   end_tm = 0;
1571   status = rrd_proc_start_end (&start_tv, &end_tv, &start_tm, &end_tm);
1572   if (status != 0)
1573     return (send_response(sock, RESP_ERR,
1574           "rrd_proc_start_end failed: %s\n", rrd_get_error ()));
1575
1576   step = -1;
1577   ds_cnt = 0;
1578   ds_namv = NULL;
1579   data = NULL;
1580
1581   status = rrd_fetch_r (file, cf, &start_tm, &end_tm, &step,
1582       &ds_cnt, &ds_namv, &data);
1583   if (status != 0)
1584     return (send_response(sock, RESP_ERR,
1585           "rrd_fetch_r failed: %s\n", rrd_get_error ()));
1586
1587   add_response_info (sock, "FlushVersion: %lu\n", 1);
1588   add_response_info (sock, "Start: %lu\n", (unsigned long) start_tm);
1589   add_response_info (sock, "End: %lu\n", (unsigned long) end_tm);
1590   add_response_info (sock, "Step: %lu\n", step);
1591   add_response_info (sock, "DSCount: %lu\n", ds_cnt);
1592
1593 #define SSTRCAT(buffer,str,buffer_fill) do { \
1594     size_t str_len = strlen (str); \
1595     if ((buffer_fill + str_len) > sizeof (buffer)) \
1596       str_len = sizeof (buffer) - buffer_fill; \
1597     if (str_len > 0) { \
1598       strncpy (buffer + buffer_fill, str, str_len); \
1599       buffer_fill += str_len; \
1600       assert (buffer_fill <= sizeof (buffer)); \
1601       if (buffer_fill == sizeof (buffer)) \
1602         buffer[buffer_fill - 1] = 0; \
1603       else \
1604         buffer[buffer_fill] = 0; \
1605     } \
1606   } while (0)
1607
1608   { /* Add list of DS names */
1609     char linebuf[1024];
1610     size_t linebuf_fill;
1611
1612     memset (linebuf, 0, sizeof (linebuf));
1613     linebuf_fill = 0;
1614     for (i = 0; i < ds_cnt; i++)
1615     {
1616       if (i > 0)
1617         SSTRCAT (linebuf, " ", linebuf_fill);
1618       SSTRCAT (linebuf, ds_namv[i], linebuf_fill);
1619     }
1620     add_response_info (sock, "DSName: %s\n", linebuf);
1621   }
1622
1623   /* Add the actual data */
1624   assert (step > 0);
1625   data_ptr = data;
1626   for (t = start_tm + step; t <= end_tm; t += step)
1627   {
1628     char linebuf[1024];
1629     size_t linebuf_fill;
1630     char tmp[128];
1631
1632     memset (linebuf, 0, sizeof (linebuf));
1633     linebuf_fill = 0;
1634     for (i = 0; i < ds_cnt; i++)
1635     {
1636       snprintf (tmp, sizeof (tmp), " %0.10e", *data_ptr);
1637       tmp[sizeof (tmp) - 1] = 0;
1638       SSTRCAT (linebuf, tmp, linebuf_fill);
1639
1640       data_ptr++;
1641     }
1642
1643     add_response_info (sock, "%10lu:%s\n", (unsigned long) t, linebuf);
1644   } /* for (t) */
1645
1646   return (send_response (sock, RESP_OK, "Success\n"));
1647 #undef SSTRCAT
1648 } /* }}} int handle_request_fetch */
1649
1650 /* we came across a "WROTE" entry during journal replay.
1651  * throw away any values that we have accumulated for this file
1652  */
1653 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1654 {
1655   cache_item_t *ci;
1656   const char *file = buffer;
1657
1658   pthread_mutex_lock(&cache_lock);
1659
1660   ci = g_tree_lookup(cache_tree, file);
1661   if (ci == NULL)
1662   {
1663     pthread_mutex_unlock(&cache_lock);
1664     return (0);
1665   }
1666
1667   if (ci->values)
1668     rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1669
1670   wipe_ci_values(ci, now);
1671   remove_from_queue(ci);
1672
1673   pthread_mutex_unlock(&cache_lock);
1674   return (0);
1675 } /* }}} int handle_request_wrote */
1676
1677 /* start "BATCH" processing */
1678 static int batch_start (HANDLER_PROTO) /* {{{ */
1679 {
1680   int status;
1681   if (sock->batch_start)
1682     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1683
1684   status = send_response(sock, RESP_OK,
1685                          "Go ahead.  End with dot '.' on its own line.\n");
1686   sock->batch_start = time(NULL);
1687   sock->batch_cmd = 0;
1688
1689   return status;
1690 } /* }}} static int batch_start */
1691
1692 /* finish "BATCH" processing and return results to the client */
1693 static int batch_done (HANDLER_PROTO) /* {{{ */
1694 {
1695   assert(sock->batch_start);
1696   sock->batch_start = 0;
1697   sock->batch_cmd  = 0;
1698   return send_response(sock, RESP_OK, "errors\n");
1699 } /* }}} static int batch_done */
1700
1701 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1702 {
1703   return -1;
1704 } /* }}} static int handle_request_quit */
1705
1706 static command_t list_of_commands[] = { /* {{{ */
1707   {
1708     "UPDATE",
1709     handle_request_update,
1710     CMD_CONTEXT_ANY,
1711     "UPDATE <filename> <values> [<values> ...]\n"
1712     ,
1713     "Adds the given file to the internal cache if it is not yet known and\n"
1714     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1715     "for details.\n"
1716     "\n"
1717     "Each <values> has the following form:\n"
1718     "  <values> = <time>:<value>[:<value>[...]]\n"
1719     "See the rrdupdate(1) manpage for details.\n"
1720   },
1721   {
1722     "WROTE",
1723     handle_request_wrote,
1724     CMD_CONTEXT_JOURNAL,
1725     NULL,
1726     NULL
1727   },
1728   {
1729     "FLUSH",
1730     handle_request_flush,
1731     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1732     "FLUSH <filename>\n"
1733     ,
1734     "Adds the given filename to the head of the update queue and returns\n"
1735     "after it has been dequeued.\n"
1736   },
1737   {
1738     "FLUSHALL",
1739     handle_request_flushall,
1740     CMD_CONTEXT_CLIENT,
1741     "FLUSHALL\n"
1742     ,
1743     "Triggers writing of all pending updates.  Returns immediately.\n"
1744   },
1745   {
1746     "PENDING",
1747     handle_request_pending,
1748     CMD_CONTEXT_CLIENT,
1749     "PENDING <filename>\n"
1750     ,
1751     "Shows any 'pending' updates for a file, in order.\n"
1752     "The updates shown have not yet been written to the underlying RRD file.\n"
1753   },
1754   {
1755     "FORGET",
1756     handle_request_forget,
1757     CMD_CONTEXT_ANY,
1758     "FORGET <filename>\n"
1759     ,
1760     "Removes the file completely from the cache.\n"
1761     "Any pending updates for the file will be lost.\n"
1762   },
1763   {
1764     "QUEUE",
1765     handle_request_queue,
1766     CMD_CONTEXT_CLIENT,
1767     "QUEUE\n"
1768     ,
1769         "Shows all files in the output queue.\n"
1770     "The output is zero or more lines in the following format:\n"
1771     "(where <num_vals> is the number of values to be written)\n"
1772     "\n"
1773     "<num_vals> <filename>\n"
1774   },
1775   {
1776     "STATS",
1777     handle_request_stats,
1778     CMD_CONTEXT_CLIENT,
1779     "STATS\n"
1780     ,
1781     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1782     "a description of the values.\n"
1783   },
1784   {
1785     "HELP",
1786     handle_request_help,
1787     CMD_CONTEXT_CLIENT,
1788     "HELP [<command>]\n",
1789     NULL, /* special! */
1790   },
1791   {
1792     "BATCH",
1793     batch_start,
1794     CMD_CONTEXT_CLIENT,
1795     "BATCH\n"
1796     ,
1797     "The 'BATCH' command permits the client to initiate a bulk load\n"
1798     "   of commands to rrdcached.\n"
1799     "\n"
1800     "Usage:\n"
1801     "\n"
1802     "    client: BATCH\n"
1803     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1804     "    client: command #1\n"
1805     "    client: command #2\n"
1806     "    client: ... and so on\n"
1807     "    client: .\n"
1808     "    server: 2 errors\n"
1809     "    server: 7 message for command #7\n"
1810     "    server: 9 message for command #9\n"
1811     "\n"
1812     "For more information, consult the rrdcached(1) documentation.\n"
1813   },
1814   {
1815     ".",   /* BATCH terminator */
1816     batch_done,
1817     CMD_CONTEXT_BATCH,
1818     NULL,
1819     NULL
1820   },
1821   {
1822     "FETCH",
1823     handle_request_fetch,
1824     CMD_CONTEXT_CLIENT,
1825     "FETCH <file> <CF> [<start> [<end>]]\n"
1826     ,
1827     "The 'FETCH' can be used by the client to retrieve values from an RRD file.\n"
1828   },
1829   {
1830     "QUIT",
1831     handle_request_quit,
1832     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1833     "QUIT\n"
1834     ,
1835     "Disconnect from rrdcached.\n"
1836   }
1837 }; /* }}} command_t list_of_commands[] */
1838 static size_t list_of_commands_len = sizeof (list_of_commands)
1839   / sizeof (list_of_commands[0]);
1840
1841 static command_t *find_command(char *cmd)
1842 {
1843   size_t i;
1844
1845   for (i = 0; i < list_of_commands_len; i++)
1846     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1847       return (&list_of_commands[i]);
1848   return NULL;
1849 }
1850
1851 /* We currently use the index in the `list_of_commands' array as a bit position
1852  * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
1853  * outside these functions so that switching to a more elegant storage method
1854  * is easily possible. */
1855 static ssize_t find_command_index (const char *cmd) /* {{{ */
1856 {
1857   size_t i;
1858
1859   for (i = 0; i < list_of_commands_len; i++)
1860     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1861       return ((ssize_t) i);
1862   return (-1);
1863 } /* }}} ssize_t find_command_index */
1864
1865 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
1866     const char *cmd)
1867 {
1868   ssize_t i;
1869
1870   if (JOURNAL_REPLAY(sock))
1871     return (1);
1872
1873   if (cmd == NULL)
1874     return (-1);
1875
1876   if ((strcasecmp ("QUIT", cmd) == 0)
1877       || (strcasecmp ("HELP", cmd) == 0))
1878     return (1);
1879   else if (strcmp (".", cmd) == 0)
1880     cmd = "BATCH";
1881
1882   i = find_command_index (cmd);
1883   if (i < 0)
1884     return (-1);
1885   assert (i < 32);
1886
1887   if ((sock->permissions & (1 << i)) != 0)
1888     return (1);
1889   return (0);
1890 } /* }}} int socket_permission_check */
1891
1892 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
1893     const char *cmd)
1894 {
1895   ssize_t i;
1896
1897   i = find_command_index (cmd);
1898   if (i < 0)
1899     return (-1);
1900   assert (i < 32);
1901
1902   sock->permissions |= (1 << i);
1903   return (0);
1904 } /* }}} int socket_permission_add */
1905
1906 /* check whether commands are received in the expected context */
1907 static int command_check_context(listen_socket_t *sock, command_t *cmd)
1908 {
1909   if (JOURNAL_REPLAY(sock))
1910     return (cmd->context & CMD_CONTEXT_JOURNAL);
1911   else if (sock->batch_start)
1912     return (cmd->context & CMD_CONTEXT_BATCH);
1913   else
1914     return (cmd->context & CMD_CONTEXT_CLIENT);
1915
1916   /* NOTREACHED */
1917   assert(1==0);
1918 }
1919
1920 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1921 {
1922   int status;
1923   char *cmd_str;
1924   char *resp_txt;
1925   command_t *help = NULL;
1926
1927   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1928   if (status == 0)
1929     help = find_command(cmd_str);
1930
1931   if (help && (help->syntax || help->help))
1932   {
1933     char tmp[CMD_MAX];
1934
1935     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1936     resp_txt = tmp;
1937
1938     if (help->syntax)
1939       add_response_info(sock, "Usage: %s\n", help->syntax);
1940
1941     if (help->help)
1942       add_response_info(sock, "%s\n", help->help);
1943   }
1944   else
1945   {
1946     size_t i;
1947
1948     resp_txt = "Command overview\n";
1949
1950     for (i = 0; i < list_of_commands_len; i++)
1951     {
1952       if (list_of_commands[i].syntax == NULL)
1953         continue;
1954       add_response_info (sock, "%s", list_of_commands[i].syntax);
1955     }
1956   }
1957
1958   return send_response(sock, RESP_OK, resp_txt);
1959 } /* }}} int handle_request_help */
1960
1961 static int handle_request (DISPATCH_PROTO) /* {{{ */
1962 {
1963   char *buffer_ptr = buffer;
1964   char *cmd_str = NULL;
1965   command_t *cmd = NULL;
1966   int status;
1967
1968   assert (buffer[buffer_size - 1] == '\0');
1969
1970   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1971   if (status != 0)
1972   {
1973     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1974     return (-1);
1975   }
1976
1977   if (sock != NULL && sock->batch_start)
1978     sock->batch_cmd++;
1979
1980   cmd = find_command(cmd_str);
1981   if (!cmd)
1982     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1983
1984   if (!socket_permission_check (sock, cmd->cmd))
1985     return send_response(sock, RESP_ERR, "Permission denied.\n");
1986
1987   if (!command_check_context(sock, cmd))
1988     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1989
1990   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1991 } /* }}} int handle_request */
1992
1993 static void journal_set_free (journal_set *js) /* {{{ */
1994 {
1995   if (js == NULL)
1996     return;
1997
1998   rrd_free_ptrs((void ***) &js->files, &js->files_num);
1999
2000   free(js);
2001 } /* }}} journal_set_free */
2002
2003 static void journal_set_remove (journal_set *js) /* {{{ */
2004 {
2005   if (js == NULL)
2006     return;
2007
2008   for (uint i=0; i < js->files_num; i++)
2009   {
2010     RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
2011     unlink(js->files[i]);
2012   }
2013 } /* }}} journal_set_remove */
2014
2015 /* close current journal file handle.
2016  * MUST hold journal_lock before calling */
2017 static void journal_close(void) /* {{{ */
2018 {
2019   if (journal_fh != NULL)
2020   {
2021     if (fclose(journal_fh) != 0)
2022       RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
2023   }
2024
2025   journal_fh = NULL;
2026   journal_size = 0;
2027 } /* }}} journal_close */
2028
2029 /* MUST hold journal_lock before calling */
2030 static void journal_new_file(void) /* {{{ */
2031 {
2032   struct timeval now;
2033   int  new_fd;
2034   char new_file[PATH_MAX + 1];
2035
2036   assert(journal_dir != NULL);
2037   assert(journal_cur != NULL);
2038
2039   journal_close();
2040
2041   gettimeofday(&now, NULL);
2042   /* this format assures that the files sort in strcmp() order */
2043   snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
2044            journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
2045
2046   new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
2047                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
2048   if (new_fd < 0)
2049     goto error;
2050
2051   journal_fh = fdopen(new_fd, "a");
2052   if (journal_fh == NULL)
2053     goto error;
2054
2055   journal_size = ftell(journal_fh);
2056   RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
2057
2058   /* record the file in the journal set */
2059   rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
2060
2061   return;
2062
2063 error:
2064   RRDD_LOG(LOG_CRIT,
2065            "JOURNALING DISABLED: Error while trying to create %s : %s",
2066            new_file, rrd_strerror(errno));
2067   RRDD_LOG(LOG_CRIT,
2068            "JOURNALING DISABLED: All values will be flushed at shutdown");
2069
2070   close(new_fd);
2071   config_flush_at_shutdown = 1;
2072
2073 } /* }}} journal_new_file */
2074
2075 /* MUST NOT hold journal_lock before calling this */
2076 static void journal_rotate(void) /* {{{ */
2077 {
2078   journal_set *old_js = NULL;
2079
2080   if (journal_dir == NULL)
2081     return;
2082
2083   RRDD_LOG(LOG_DEBUG, "rotating journals");
2084
2085   pthread_mutex_lock(&stats_lock);
2086   ++stats_journal_rotate;
2087   pthread_mutex_unlock(&stats_lock);
2088
2089   pthread_mutex_lock(&journal_lock);
2090
2091   journal_close();
2092
2093   /* rotate the journal sets */
2094   old_js = journal_old;
2095   journal_old = journal_cur;
2096   journal_cur = calloc(1, sizeof(journal_set));
2097
2098   if (journal_cur != NULL)
2099     journal_new_file();
2100   else
2101     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2102
2103   pthread_mutex_unlock(&journal_lock);
2104
2105   journal_set_remove(old_js);
2106   journal_set_free  (old_js);
2107
2108 } /* }}} static void journal_rotate */
2109
2110 /* MUST hold journal_lock when calling */
2111 static void journal_done(void) /* {{{ */
2112 {
2113   if (journal_cur == NULL)
2114     return;
2115
2116   journal_close();
2117
2118   if (config_flush_at_shutdown)
2119   {
2120     RRDD_LOG(LOG_INFO, "removing journals");
2121     journal_set_remove(journal_old);
2122     journal_set_remove(journal_cur);
2123   }
2124   else
2125   {
2126     RRDD_LOG(LOG_INFO, "expedited shutdown; "
2127              "journals will be used at next startup");
2128   }
2129
2130   journal_set_free(journal_cur);
2131   journal_set_free(journal_old);
2132   free(journal_dir);
2133
2134 } /* }}} static void journal_done */
2135
2136 static int journal_write(char *cmd, char *args) /* {{{ */
2137 {
2138   int chars;
2139
2140   if (journal_fh == NULL)
2141     return 0;
2142
2143   pthread_mutex_lock(&journal_lock);
2144   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
2145   journal_size += chars;
2146
2147   if (journal_size > JOURNAL_MAX)
2148     journal_new_file();
2149
2150   pthread_mutex_unlock(&journal_lock);
2151
2152   if (chars > 0)
2153   {
2154     pthread_mutex_lock(&stats_lock);
2155     stats_journal_bytes += chars;
2156     pthread_mutex_unlock(&stats_lock);
2157   }
2158
2159   return chars;
2160 } /* }}} static int journal_write */
2161
2162 static int journal_replay (const char *file) /* {{{ */
2163 {
2164   FILE *fh;
2165   int entry_cnt = 0;
2166   int fail_cnt = 0;
2167   uint64_t line = 0;
2168   char entry[CMD_MAX];
2169   time_t now;
2170
2171   if (file == NULL) return 0;
2172
2173   {
2174     char *reason = "unknown error";
2175     int status = 0;
2176     struct stat statbuf;
2177
2178     memset(&statbuf, 0, sizeof(statbuf));
2179     if (stat(file, &statbuf) != 0)
2180     {
2181       reason = "stat error";
2182       status = errno;
2183     }
2184     else if (!S_ISREG(statbuf.st_mode))
2185     {
2186       reason = "not a regular file";
2187       status = EPERM;
2188     }
2189     if (statbuf.st_uid != daemon_uid)
2190     {
2191       reason = "not owned by daemon user";
2192       status = EACCES;
2193     }
2194     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2195     {
2196       reason = "must not be user/group writable";
2197       status = EACCES;
2198     }
2199
2200     if (status != 0)
2201     {
2202       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2203                file, rrd_strerror(status), reason);
2204       return 0;
2205     }
2206   }
2207
2208   fh = fopen(file, "r");
2209   if (fh == NULL)
2210   {
2211     if (errno != ENOENT)
2212       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2213                file, rrd_strerror(errno));
2214     return 0;
2215   }
2216   else
2217     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2218
2219   now = time(NULL);
2220
2221   while(!feof(fh))
2222   {
2223     size_t entry_len;
2224
2225     ++line;
2226     if (fgets(entry, sizeof(entry), fh) == NULL)
2227       break;
2228     entry_len = strlen(entry);
2229
2230     /* check \n termination in case journal writing crashed mid-line */
2231     if (entry_len == 0)
2232       continue;
2233     else if (entry[entry_len - 1] != '\n')
2234     {
2235       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2236       ++fail_cnt;
2237       continue;
2238     }
2239
2240     entry[entry_len - 1] = '\0';
2241
2242     if (handle_request(NULL, now, entry, entry_len) == 0)
2243       ++entry_cnt;
2244     else
2245       ++fail_cnt;
2246   }
2247
2248   fclose(fh);
2249
2250   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2251            entry_cnt, fail_cnt);
2252
2253   return entry_cnt > 0 ? 1 : 0;
2254 } /* }}} static int journal_replay */
2255
2256 static int journal_sort(const void *v1, const void *v2)
2257 {
2258   char **jn1 = (char **) v1;
2259   char **jn2 = (char **) v2;
2260
2261   return strcmp(*jn1,*jn2);
2262 }
2263
2264 static void journal_init(void) /* {{{ */
2265 {
2266   int had_journal = 0;
2267   DIR *dir;
2268   struct dirent *dent;
2269   char path[PATH_MAX+1];
2270
2271   if (journal_dir == NULL) return;
2272
2273   pthread_mutex_lock(&journal_lock);
2274
2275   journal_cur = calloc(1, sizeof(journal_set));
2276   if (journal_cur == NULL)
2277   {
2278     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2279     return;
2280   }
2281
2282   RRDD_LOG(LOG_INFO, "checking for journal files");
2283
2284   /* Handle old journal files during transition.  This gives them the
2285    * correct sort order.  TODO: remove after first release
2286    */
2287   {
2288     char old_path[PATH_MAX+1];
2289     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2290     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2291     rename(old_path, path);
2292
2293     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE        );
2294     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2295     rename(old_path, path);
2296   }
2297
2298   dir = opendir(journal_dir);
2299   while ((dent = readdir(dir)) != NULL)
2300   {
2301     /* looks like a journal file? */
2302     if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2303       continue;
2304
2305     snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2306
2307     if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2308     {
2309       RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2310                dent->d_name);
2311       break;
2312     }
2313   }
2314   closedir(dir);
2315
2316   qsort(journal_cur->files, journal_cur->files_num,
2317         sizeof(journal_cur->files[0]), journal_sort);
2318
2319   for (uint i=0; i < journal_cur->files_num; i++)
2320     had_journal += journal_replay(journal_cur->files[i]);
2321
2322   journal_new_file();
2323
2324   /* it must have been a crash.  start a flush */
2325   if (had_journal && config_flush_at_shutdown)
2326     flush_old_values(-1);
2327
2328   pthread_mutex_unlock(&journal_lock);
2329
2330   RRDD_LOG(LOG_INFO, "journal processing complete");
2331
2332 } /* }}} static void journal_init */
2333
2334 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2335 {
2336   assert(sock != NULL);
2337
2338   free(sock->rbuf);  sock->rbuf = NULL;
2339   free(sock->wbuf);  sock->wbuf = NULL;
2340   free(sock);
2341 } /* }}} void free_listen_socket */
2342
2343 static void close_connection(listen_socket_t *sock) /* {{{ */
2344 {
2345   if (sock->fd >= 0)
2346   {
2347     close(sock->fd);
2348     sock->fd = -1;
2349   }
2350
2351   free_listen_socket(sock);
2352
2353 } /* }}} void close_connection */
2354
2355 static void *connection_thread_main (void *args) /* {{{ */
2356 {
2357   listen_socket_t *sock;
2358   int fd;
2359
2360   sock = (listen_socket_t *) args;
2361   fd = sock->fd;
2362
2363   /* init read buffers */
2364   sock->next_read = sock->next_cmd = 0;
2365   sock->rbuf = malloc(RBUF_SIZE);
2366   if (sock->rbuf == NULL)
2367   {
2368     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2369     close_connection(sock);
2370     return NULL;
2371   }
2372
2373   pthread_mutex_lock (&connection_threads_lock);
2374   connection_threads_num++;
2375   pthread_mutex_unlock (&connection_threads_lock);
2376
2377   while (state == RUNNING)
2378   {
2379     char *cmd;
2380     ssize_t cmd_len;
2381     ssize_t rbytes;
2382     time_t now;
2383
2384     struct pollfd pollfd;
2385     int status;
2386
2387     pollfd.fd = fd;
2388     pollfd.events = POLLIN | POLLPRI;
2389     pollfd.revents = 0;
2390
2391     status = poll (&pollfd, 1, /* timeout = */ 500);
2392     if (state != RUNNING)
2393       break;
2394     else if (status == 0) /* timeout */
2395       continue;
2396     else if (status < 0) /* error */
2397     {
2398       status = errno;
2399       if (status != EINTR)
2400         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2401       continue;
2402     }
2403
2404     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2405       break;
2406     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2407     {
2408       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2409           "poll(2) returned something unexpected: %#04hx",
2410           pollfd.revents);
2411       break;
2412     }
2413
2414     rbytes = read(fd, sock->rbuf + sock->next_read,
2415                   RBUF_SIZE - sock->next_read);
2416     if (rbytes < 0)
2417     {
2418       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2419       break;
2420     }
2421     else if (rbytes == 0)
2422       break; /* eof */
2423
2424     sock->next_read += rbytes;
2425
2426     if (sock->batch_start)
2427       now = sock->batch_start;
2428     else
2429       now = time(NULL);
2430
2431     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2432     {
2433       status = handle_request (sock, now, cmd, cmd_len+1);
2434       if (status != 0)
2435         goto out_close;
2436     }
2437   }
2438
2439 out_close:
2440   close_connection(sock);
2441
2442   /* Remove this thread from the connection threads list */
2443   pthread_mutex_lock (&connection_threads_lock);
2444   connection_threads_num--;
2445   if (connection_threads_num <= 0)
2446     pthread_cond_broadcast(&connection_threads_done);
2447   pthread_mutex_unlock (&connection_threads_lock);
2448
2449   return (NULL);
2450 } /* }}} void *connection_thread_main */
2451
2452 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2453 {
2454   int fd;
2455   struct sockaddr_un sa;
2456   listen_socket_t *temp;
2457   int status;
2458   const char *path;
2459   char *path_copy, *dir;
2460
2461   path = sock->addr;
2462   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2463     path += strlen("unix:");
2464
2465   /* dirname may modify its argument */
2466   path_copy = strdup(path);
2467   if (path_copy == NULL)
2468   {
2469     fprintf(stderr, "rrdcached: strdup(): %s\n",
2470         rrd_strerror(errno));
2471     return (-1);
2472   }
2473
2474   dir = dirname(path_copy);
2475   if (rrd_mkdir_p(dir, 0777) != 0)
2476   {
2477     fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2478         dir, rrd_strerror(errno));
2479     return (-1);
2480   }
2481
2482   free(path_copy);
2483
2484   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2485       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2486   if (temp == NULL)
2487   {
2488     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2489     return (-1);
2490   }
2491   listen_fds = temp;
2492   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2493
2494   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2495   if (fd < 0)
2496   {
2497     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2498              rrd_strerror(errno));
2499     return (-1);
2500   }
2501
2502   memset (&sa, 0, sizeof (sa));
2503   sa.sun_family = AF_UNIX;
2504   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2505
2506   /* if we've gotten this far, we own the pid file.  any daemon started
2507    * with the same args must not be alive.  therefore, ensure that we can
2508    * create the socket...
2509    */
2510   unlink(path);
2511
2512   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2513   if (status != 0)
2514   {
2515     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2516              path, rrd_strerror(errno));
2517     close (fd);
2518     return (-1);
2519   }
2520
2521   /* tweak the sockets group ownership */
2522   if (sock->socket_group != (gid_t)-1)
2523   {
2524     if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2525          (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2526     {
2527       fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2528     }
2529   }
2530
2531   if (sock->socket_permissions != (mode_t)-1)
2532   {
2533     if (chmod(path, sock->socket_permissions) != 0)
2534       fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2535           (unsigned int)sock->socket_permissions, strerror(errno));
2536   }
2537
2538   status = listen (fd, /* backlog = */ 10);
2539   if (status != 0)
2540   {
2541     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2542              path, rrd_strerror(errno));
2543     close (fd);
2544     unlink (path);
2545     return (-1);
2546   }
2547
2548   listen_fds[listen_fds_num].fd = fd;
2549   listen_fds[listen_fds_num].family = PF_UNIX;
2550   strncpy(listen_fds[listen_fds_num].addr, path,
2551           sizeof (listen_fds[listen_fds_num].addr) - 1);
2552   listen_fds_num++;
2553
2554   return (0);
2555 } /* }}} int open_listen_socket_unix */
2556
2557 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2558 {
2559   struct addrinfo ai_hints;
2560   struct addrinfo *ai_res;
2561   struct addrinfo *ai_ptr;
2562   char addr_copy[NI_MAXHOST];
2563   char *addr;
2564   char *port;
2565   int status;
2566
2567   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2568   addr_copy[sizeof (addr_copy) - 1] = 0;
2569   addr = addr_copy;
2570
2571   memset (&ai_hints, 0, sizeof (ai_hints));
2572   ai_hints.ai_flags = 0;
2573 #ifdef AI_ADDRCONFIG
2574   ai_hints.ai_flags |= AI_ADDRCONFIG;
2575 #endif
2576   ai_hints.ai_family = AF_UNSPEC;
2577   ai_hints.ai_socktype = SOCK_STREAM;
2578
2579   port = NULL;
2580   if (*addr == '[') /* IPv6+port format */
2581   {
2582     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2583     addr++;
2584
2585     port = strchr (addr, ']');
2586     if (port == NULL)
2587     {
2588       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2589       return (-1);
2590     }
2591     *port = 0;
2592     port++;
2593
2594     if (*port == ':')
2595       port++;
2596     else if (*port == 0)
2597       port = NULL;
2598     else
2599     {
2600       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2601       return (-1);
2602     }
2603   } /* if (*addr == '[') */
2604   else
2605   {
2606     port = rindex(addr, ':');
2607     if (port != NULL)
2608     {
2609       *port = 0;
2610       port++;
2611     }
2612   }
2613   ai_res = NULL;
2614   status = getaddrinfo (addr,
2615                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2616                         &ai_hints, &ai_res);
2617   if (status != 0)
2618   {
2619     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2620              addr, gai_strerror (status));
2621     return (-1);
2622   }
2623
2624   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2625   {
2626     int fd;
2627     listen_socket_t *temp;
2628     int one = 1;
2629
2630     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2631         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2632     if (temp == NULL)
2633     {
2634       fprintf (stderr,
2635                "rrdcached: open_listen_socket_network: realloc failed.\n");
2636       continue;
2637     }
2638     listen_fds = temp;
2639     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2640
2641     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2642     if (fd < 0)
2643     {
2644       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2645                rrd_strerror(errno));
2646       continue;
2647     }
2648
2649     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2650
2651     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2652     if (status != 0)
2653     {
2654       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2655                sock->addr, rrd_strerror(errno));
2656       close (fd);
2657       continue;
2658     }
2659
2660     status = listen (fd, /* backlog = */ 10);
2661     if (status != 0)
2662     {
2663       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2664                sock->addr, rrd_strerror(errno));
2665       close (fd);
2666       freeaddrinfo(ai_res);
2667       return (-1);
2668     }
2669
2670     listen_fds[listen_fds_num].fd = fd;
2671     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2672     listen_fds_num++;
2673   } /* for (ai_ptr) */
2674
2675   freeaddrinfo(ai_res);
2676   return (0);
2677 } /* }}} static int open_listen_socket_network */
2678
2679 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2680 {
2681   assert(sock != NULL);
2682   assert(sock->addr != NULL);
2683
2684   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2685       || sock->addr[0] == '/')
2686     return (open_listen_socket_unix(sock));
2687   else
2688     return (open_listen_socket_network(sock));
2689 } /* }}} int open_listen_socket */
2690
2691 static int close_listen_sockets (void) /* {{{ */
2692 {
2693   size_t i;
2694
2695   for (i = 0; i < listen_fds_num; i++)
2696   {
2697     close (listen_fds[i].fd);
2698
2699     if (listen_fds[i].family == PF_UNIX)
2700       unlink(listen_fds[i].addr);
2701   }
2702
2703   free (listen_fds);
2704   listen_fds = NULL;
2705   listen_fds_num = 0;
2706
2707   return (0);
2708 } /* }}} int close_listen_sockets */
2709
2710 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2711 {
2712   struct pollfd *pollfds;
2713   int pollfds_num;
2714   int status;
2715   int i;
2716
2717   if (listen_fds_num < 1)
2718   {
2719     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2720     return (NULL);
2721   }
2722
2723   pollfds_num = listen_fds_num;
2724   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2725   if (pollfds == NULL)
2726   {
2727     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2728     return (NULL);
2729   }
2730   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2731
2732   RRDD_LOG(LOG_INFO, "listening for connections");
2733
2734   while (state == RUNNING)
2735   {
2736     for (i = 0; i < pollfds_num; i++)
2737     {
2738       pollfds[i].fd = listen_fds[i].fd;
2739       pollfds[i].events = POLLIN | POLLPRI;
2740       pollfds[i].revents = 0;
2741     }
2742
2743     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2744     if (state != RUNNING)
2745       break;
2746     else if (status == 0) /* timeout */
2747       continue;
2748     else if (status < 0) /* error */
2749     {
2750       status = errno;
2751       if (status != EINTR)
2752       {
2753         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2754       }
2755       continue;
2756     }
2757
2758     for (i = 0; i < pollfds_num; i++)
2759     {
2760       listen_socket_t *client_sock;
2761       struct sockaddr_storage client_sa;
2762       socklen_t client_sa_size;
2763       pthread_t tid;
2764       pthread_attr_t attr;
2765
2766       if (pollfds[i].revents == 0)
2767         continue;
2768
2769       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2770       {
2771         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2772             "poll(2) returned something unexpected for listen FD #%i.",
2773             pollfds[i].fd);
2774         continue;
2775       }
2776
2777       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2778       if (client_sock == NULL)
2779       {
2780         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2781         continue;
2782       }
2783       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2784
2785       client_sa_size = sizeof (client_sa);
2786       client_sock->fd = accept (pollfds[i].fd,
2787           (struct sockaddr *) &client_sa, &client_sa_size);
2788       if (client_sock->fd < 0)
2789       {
2790         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2791         free(client_sock);
2792         continue;
2793       }
2794
2795       pthread_attr_init (&attr);
2796       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2797
2798       status = pthread_create (&tid, &attr, connection_thread_main,
2799                                client_sock);
2800       if (status != 0)
2801       {
2802         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2803         close_connection(client_sock);
2804         continue;
2805       }
2806     } /* for (pollfds_num) */
2807   } /* while (state == RUNNING) */
2808
2809   RRDD_LOG(LOG_INFO, "starting shutdown");
2810
2811   close_listen_sockets ();
2812
2813   pthread_mutex_lock (&connection_threads_lock);
2814   while (connection_threads_num > 0)
2815     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2816   pthread_mutex_unlock (&connection_threads_lock);
2817
2818   free(pollfds);
2819
2820   return (NULL);
2821 } /* }}} void *listen_thread_main */
2822
2823 static int daemonize (void) /* {{{ */
2824 {
2825   int pid_fd;
2826   char *base_dir;
2827
2828   daemon_uid = geteuid();
2829
2830   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2831   if (pid_fd < 0)
2832     pid_fd = check_pidfile();
2833   if (pid_fd < 0)
2834     return pid_fd;
2835
2836   /* open all the listen sockets */
2837   if (config_listen_address_list_len > 0)
2838   {
2839     for (size_t i = 0; i < config_listen_address_list_len; i++)
2840       open_listen_socket (config_listen_address_list[i]);
2841
2842     rrd_free_ptrs((void ***) &config_listen_address_list,
2843                   &config_listen_address_list_len);
2844   }
2845   else
2846   {
2847     listen_socket_t sock;
2848     memset(&sock, 0, sizeof(sock));
2849     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2850     open_listen_socket (&sock);
2851   }
2852
2853   if (listen_fds_num < 1)
2854   {
2855     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2856     goto error;
2857   }
2858
2859   if (!stay_foreground)
2860   {
2861     pid_t child;
2862
2863     child = fork ();
2864     if (child < 0)
2865     {
2866       fprintf (stderr, "daemonize: fork(2) failed.\n");
2867       goto error;
2868     }
2869     else if (child > 0)
2870       exit(0);
2871
2872     /* Become session leader */
2873     setsid ();
2874
2875     /* Open the first three file descriptors to /dev/null */
2876     close (2);
2877     close (1);
2878     close (0);
2879
2880     open ("/dev/null", O_RDWR);
2881     if (dup(0) == -1 || dup(0) == -1){
2882         RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2883     }
2884   } /* if (!stay_foreground) */
2885
2886   /* Change into the /tmp directory. */
2887   base_dir = (config_base_dir != NULL)
2888     ? config_base_dir
2889     : "/tmp";
2890
2891   if (chdir (base_dir) != 0)
2892   {
2893     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2894     goto error;
2895   }
2896
2897   install_signal_handlers();
2898
2899   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2900   RRDD_LOG(LOG_INFO, "starting up");
2901
2902   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2903                                 (GDestroyNotify) free_cache_item);
2904   if (cache_tree == NULL)
2905   {
2906     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2907     goto error;
2908   }
2909
2910   return write_pidfile (pid_fd);
2911
2912 error:
2913   remove_pidfile();
2914   return -1;
2915 } /* }}} int daemonize */
2916
2917 static int cleanup (void) /* {{{ */
2918 {
2919   pthread_cond_broadcast (&flush_cond);
2920   pthread_join (flush_thread, NULL);
2921
2922   pthread_cond_broadcast (&queue_cond);
2923   for (int i = 0; i < config_queue_threads; i++)
2924     pthread_join (queue_threads[i], NULL);
2925
2926   if (config_flush_at_shutdown)
2927   {
2928     assert(cache_queue_head == NULL);
2929     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2930   }
2931
2932   free(queue_threads);
2933   free(config_base_dir);
2934
2935   pthread_mutex_lock(&cache_lock);
2936   g_tree_destroy(cache_tree);
2937
2938   pthread_mutex_lock(&journal_lock);
2939   journal_done();
2940
2941   RRDD_LOG(LOG_INFO, "goodbye");
2942   closelog ();
2943
2944   remove_pidfile ();
2945   free(config_pid_file);
2946
2947   return (0);
2948 } /* }}} int cleanup */
2949
2950 static int read_options (int argc, char **argv) /* {{{ */
2951 {
2952   int option;
2953   int status = 0;
2954
2955   char **permissions = NULL;
2956   size_t permissions_len = 0;
2957
2958   gid_t  socket_group = (gid_t)-1;
2959   mode_t socket_permissions = (mode_t)-1;
2960
2961   while ((option = getopt(argc, argv, "gl:s:m:P:f:w:z:t:Bb:p:Fj:a:h?")) != -1)
2962   {
2963     switch (option)
2964     {
2965       case 'g':
2966         stay_foreground=1;
2967         break;
2968
2969       case 'l':
2970       {
2971         listen_socket_t *new;
2972
2973         new = malloc(sizeof(listen_socket_t));
2974         if (new == NULL)
2975         {
2976           fprintf(stderr, "read_options: malloc failed.\n");
2977           return(2);
2978         }
2979         memset(new, 0, sizeof(listen_socket_t));
2980
2981         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2982
2983         /* Add permissions to the socket {{{ */
2984         if (permissions_len != 0)
2985         {
2986           size_t i;
2987           for (i = 0; i < permissions_len; i++)
2988           {
2989             status = socket_permission_add (new, permissions[i]);
2990             if (status != 0)
2991             {
2992               fprintf (stderr, "read_options: Adding permission \"%s\" to "
2993                   "socket failed. Most likely, this permission doesn't "
2994                   "exist. Check your command line.\n", permissions[i]);
2995               status = 4;
2996             }
2997           }
2998         }
2999         else /* if (permissions_len == 0) */
3000         {
3001           /* Add permission for ALL commands to the socket. */
3002           size_t i;
3003           for (i = 0; i < list_of_commands_len; i++)
3004           {
3005             status = socket_permission_add (new, list_of_commands[i].cmd);
3006             if (status != 0)
3007             {
3008               fprintf (stderr, "read_options: Adding permission \"%s\" to "
3009                   "socket failed. This should never happen, ever! Sorry.\n",
3010                   permissions[i]);
3011               status = 4;
3012             }
3013           }
3014         }
3015         /* }}} Done adding permissions. */
3016
3017         new->socket_group = socket_group;
3018         new->socket_permissions = socket_permissions;
3019
3020         if (!rrd_add_ptr((void ***)&config_listen_address_list,
3021                          &config_listen_address_list_len, new))
3022         {
3023           fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
3024           return (2);
3025         }
3026       }
3027       break;
3028
3029       /* set socket group permissions */
3030       case 's':
3031       {
3032         gid_t group_gid;
3033         struct group *grp;
3034
3035         group_gid = strtoul(optarg, NULL, 10);
3036         if (errno != EINVAL && group_gid>0)
3037         {
3038           /* we were passed a number */
3039           grp = getgrgid(group_gid);
3040         }
3041         else
3042         {
3043           grp = getgrnam(optarg);
3044         }
3045
3046         if (grp)
3047         {
3048           socket_group = grp->gr_gid;
3049         }
3050         else
3051         {
3052           /* no idea what the user wanted... */
3053           fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
3054           return (5);
3055         }
3056       }
3057       break;
3058
3059       /* set socket file permissions */
3060       case 'm':
3061       {
3062         long  tmp;
3063         char *endptr = NULL;
3064
3065         tmp = strtol (optarg, &endptr, 8);
3066         if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
3067             || (tmp > 07777) || (tmp < 0)) {
3068           fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
3069               optarg);
3070           return (5);
3071         }
3072
3073         socket_permissions = (mode_t)tmp;
3074       }
3075       break;
3076
3077       case 'P':
3078       {
3079         char *optcopy;
3080         char *saveptr;
3081         char *dummy;
3082         char *ptr;
3083
3084         rrd_free_ptrs ((void *) &permissions, &permissions_len);
3085
3086         optcopy = strdup (optarg);
3087         dummy = optcopy;
3088         saveptr = NULL;
3089         while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
3090         {
3091           dummy = NULL;
3092           rrd_add_strdup ((void *) &permissions, &permissions_len, ptr);
3093         }
3094
3095         free (optcopy);
3096       }
3097       break;
3098
3099       case 'f':
3100       {
3101         int temp;
3102
3103         temp = atoi (optarg);
3104         if (temp > 0)
3105           config_flush_interval = temp;
3106         else
3107         {
3108           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
3109           status = 3;
3110         }
3111       }
3112       break;
3113
3114       case 'w':
3115       {
3116         int temp;
3117
3118         temp = atoi (optarg);
3119         if (temp > 0)
3120           config_write_interval = temp;
3121         else
3122         {
3123           fprintf (stderr, "Invalid write interval: %s\n", optarg);
3124           status = 2;
3125         }
3126       }
3127       break;
3128
3129       case 'z':
3130       {
3131         int temp;
3132
3133         temp = atoi(optarg);
3134         if (temp > 0)
3135           config_write_jitter = temp;
3136         else
3137         {
3138           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
3139           status = 2;
3140         }
3141
3142         break;
3143       }
3144
3145       case 't':
3146       {
3147         int threads;
3148         threads = atoi(optarg);
3149         if (threads >= 1)
3150           config_queue_threads = threads;
3151         else
3152         {
3153           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
3154           return 1;
3155         }
3156       }
3157       break;
3158
3159       case 'B':
3160         config_write_base_only = 1;
3161         break;
3162
3163       case 'b':
3164       {
3165         size_t len;
3166         char base_realpath[PATH_MAX];
3167
3168         if (config_base_dir != NULL)
3169           free (config_base_dir);
3170         config_base_dir = strdup (optarg);
3171         if (config_base_dir == NULL)
3172         {
3173           fprintf (stderr, "read_options: strdup failed.\n");
3174           return (3);
3175         }
3176
3177         if (rrd_mkdir_p (config_base_dir, 0777) != 0)
3178         {
3179           fprintf (stderr, "Failed to create base directory '%s': %s\n",
3180               config_base_dir, rrd_strerror (errno));
3181           return (3);
3182         }
3183
3184         /* make sure that the base directory is not resolved via
3185          * symbolic links.  this makes some performance-enhancing
3186          * assumptions possible (we don't have to resolve paths
3187          * that start with a "/")
3188          */
3189         if (realpath(config_base_dir, base_realpath) == NULL)
3190         {
3191           fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3192               "%s\n", config_base_dir, rrd_strerror(errno));
3193           return 5;
3194         }
3195
3196         len = strlen (config_base_dir);
3197         while ((len > 0) && (config_base_dir[len - 1] == '/'))
3198         {
3199           config_base_dir[len - 1] = 0;
3200           len--;
3201         }
3202
3203         if (len < 1)
3204         {
3205           fprintf (stderr, "Invalid base directory: %s\n", optarg);
3206           return (4);
3207         }
3208
3209         _config_base_dir_len = len;
3210
3211         len = strlen (base_realpath);
3212         while ((len > 0) && (base_realpath[len - 1] == '/'))
3213         {
3214           base_realpath[len - 1] = '\0';
3215           len--;
3216         }
3217
3218         if (strncmp(config_base_dir,
3219                          base_realpath, sizeof(base_realpath)) != 0)
3220         {
3221           fprintf(stderr,
3222                   "Base directory (-b) resolved via file system links!\n"
3223                   "Please consult rrdcached '-b' documentation!\n"
3224                   "Consider specifying the real directory (%s)\n",
3225                   base_realpath);
3226           return 5;
3227         }
3228       }
3229       break;
3230
3231       case 'p':
3232       {
3233         if (config_pid_file != NULL)
3234           free (config_pid_file);
3235         config_pid_file = strdup (optarg);
3236         if (config_pid_file == NULL)
3237         {
3238           fprintf (stderr, "read_options: strdup failed.\n");
3239           return (3);
3240         }
3241       }
3242       break;
3243
3244       case 'F':
3245         config_flush_at_shutdown = 1;
3246         break;
3247
3248       case 'j':
3249       {
3250         const char *dir = journal_dir = strdup(optarg);
3251
3252         status = rrd_mkdir_p(dir, 0777);
3253         if (status != 0)
3254         {
3255           fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3256               dir, rrd_strerror(errno));
3257           return 6;
3258         }
3259
3260         if (access(dir, R_OK|W_OK|X_OK) != 0)
3261         {
3262           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3263                   errno ? rrd_strerror(errno) : "");
3264           return 6;
3265         }
3266       }
3267       break;
3268
3269       case 'a':
3270       {
3271         int temp = atoi(optarg);
3272         if (temp > 0)
3273           config_alloc_chunk = temp;
3274         else
3275         {
3276           fprintf(stderr, "Invalid allocation size: %s\n", optarg);
3277           return 10;
3278         }
3279       }
3280       break;
3281
3282       case 'h':
3283       case '?':
3284         printf ("RRDCacheD %s\n"
3285             "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3286             "\n"
3287             "Usage: rrdcached [options]\n"
3288             "\n"
3289             "Valid options are:\n"
3290             "  -l <address>  Socket address to listen to.\n"
3291             "  -P <perms>    Sets the permissions to assign to all following "
3292                             "sockets\n"
3293             "  -w <seconds>  Interval in which to write data.\n"
3294             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
3295             "  -t <threads>  Number of write threads.\n"
3296             "  -f <seconds>  Interval in which to flush dead data.\n"
3297             "  -p <file>     Location of the PID-file.\n"
3298             "  -b <dir>      Base directory to change to.\n"
3299             "  -B            Restrict file access to paths within -b <dir>\n"
3300             "  -g            Do not fork and run in the foreground.\n"
3301             "  -j <dir>      Directory in which to create the journal files.\n"
3302             "  -F            Always flush all updates at shutdown\n"
3303             "  -s <id|name>  Group owner of all following UNIX sockets\n"
3304             "                (the socket will also have read/write permissions "
3305                             "for that group)\n"
3306             "  -m <mode>     File permissions (octal) of all following UNIX "
3307                             "sockets\n"
3308             "  -a <size>     Memory allocation chunk size. Default is 1."
3309             "\n"
3310             "For more information and a detailed description of all options "
3311             "please refer\n"
3312             "to the rrdcached(1) manual page.\n",
3313             VERSION);
3314         if (option == 'h')
3315           status = -1;
3316         else
3317           status = 1;
3318         break;
3319     } /* switch (option) */
3320   } /* while (getopt) */
3321
3322   /* advise the user when values are not sane */
3323   if (config_flush_interval < 2 * config_write_interval)
3324     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3325             " 2x write interval (-w) !\n");
3326   if (config_write_jitter > config_write_interval)
3327     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3328             " write interval (-w) !\n");
3329
3330   if (config_write_base_only && config_base_dir == NULL)
3331     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3332             "  Consult the rrdcached documentation\n");
3333
3334   if (journal_dir == NULL)
3335     config_flush_at_shutdown = 1;
3336
3337   rrd_free_ptrs ((void *) &permissions, &permissions_len);
3338
3339   return (status);
3340 } /* }}} int read_options */
3341
3342 int main (int argc, char **argv)
3343 {
3344   int status;
3345
3346   status = read_options (argc, argv);
3347   if (status != 0)
3348   {
3349     if (status < 0)
3350       status = 0;
3351     return (status);
3352   }
3353
3354   status = daemonize ();
3355   if (status != 0)
3356   {
3357     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3358     return (1);
3359   }
3360
3361   journal_init();
3362
3363   /* start the queue threads */
3364   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3365   if (queue_threads == NULL)
3366   {
3367     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3368     cleanup();
3369     return (1);
3370   }
3371   for (int i = 0; i < config_queue_threads; i++)
3372   {
3373     memset (&queue_threads[i], 0, sizeof (*queue_threads));
3374     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3375     if (status != 0)
3376     {
3377       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3378       cleanup();
3379       return (1);
3380     }
3381   }
3382
3383   /* start the flush thread */
3384   memset(&flush_thread, 0, sizeof(flush_thread));
3385   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3386   if (status != 0)
3387   {
3388     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3389     cleanup();
3390     return (1);
3391   }
3392
3393   listen_thread_main (NULL);
3394   cleanup ();
3395
3396   return (0);
3397 } /* int main */
3398
3399 /*
3400  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3401  */