rrdcached: Let the -s, -m and -P options affect the default socket as well -- Sebasti...
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008-2010 Florian octo Forster
4  * Copyright (C) 2008,2009 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
73
74 #include "rrd.h"
75 #include "rrd_client.h"
76 #include "unused.h"
77
78 #include <stdlib.h>
79
80 #ifndef WIN32
81 #ifdef HAVE_STDINT_H
82 #  include <stdint.h>
83 #endif
84 #include <unistd.h>
85 #include <strings.h>
86 #include <inttypes.h>
87 #include <sys/socket.h>
88
89 #else
90
91 #endif
92 #include <stdio.h>
93 #include <string.h>
94
95 #include <sys/types.h>
96 #include <sys/stat.h>
97 #include <dirent.h>
98 #include <fcntl.h>
99 #include <signal.h>
100 #include <sys/un.h>
101 #include <netdb.h>
102 #include <poll.h>
103 #include <syslog.h>
104 #include <pthread.h>
105 #include <errno.h>
106 #include <assert.h>
107 #include <sys/time.h>
108 #include <time.h>
109 #include <libgen.h>
110 #include <grp.h>
111
112 #include <glib-2.0/glib.h>
113 /* }}} */
114
115 #define RRDD_LOG(severity, ...) \
116   do { \
117     if (stay_foreground) { \
118       fprintf(stderr, __VA_ARGS__); \
119       fprintf(stderr, "\n"); } \
120     syslog ((severity), __VA_ARGS__); \
121   } while (0)
122
123 /*
124  * Types
125  */
126 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
127
128 struct listen_socket_s
129 {
130   int fd;
131   char addr[PATH_MAX + 1];
132   int family;
133
134   /* state for BATCH processing */
135   time_t batch_start;
136   int batch_cmd;
137
138   /* buffered IO */
139   char *rbuf;
140   off_t next_cmd;
141   off_t next_read;
142
143   char *wbuf;
144   ssize_t wbuf_len;
145
146   uint32_t permissions;
147
148   gid_t  socket_group;
149   mode_t socket_permissions;
150 };
151 typedef struct listen_socket_s listen_socket_t;
152
153 struct command_s;
154 typedef struct command_s command_t;
155 /* note: guard against "unused" warnings in the handlers */
156 #define DISPATCH_PROTO  listen_socket_t UNUSED(*sock),\
157                         time_t UNUSED(now),\
158                         char  UNUSED(*buffer),\
159                         size_t UNUSED(buffer_size)
160
161 #define HANDLER_PROTO   command_t UNUSED(*cmd),\
162                         DISPATCH_PROTO
163
164 struct command_s {
165   char   *cmd;
166   int (*handler)(HANDLER_PROTO);
167
168   char  context;                /* where we expect to see it */
169 #define CMD_CONTEXT_CLIENT      (1<<0)
170 #define CMD_CONTEXT_BATCH       (1<<1)
171 #define CMD_CONTEXT_JOURNAL     (1<<2)
172 #define CMD_CONTEXT_ANY         (0x7f)
173
174   char *syntax;
175   char *help;
176 };
177
178 struct cache_item_s;
179 typedef struct cache_item_s cache_item_t;
180 struct cache_item_s
181 {
182   char *file;
183   char **values;
184   size_t values_num;            /* number of valid pointers */
185   size_t values_alloc;          /* number of allocated pointers */
186   time_t last_flush_time;
187   time_t last_update_stamp;
188 #define CI_FLAGS_IN_TREE  (1<<0)
189 #define CI_FLAGS_IN_QUEUE (1<<1)
190   int flags;
191   pthread_cond_t  flushed;
192   cache_item_t *prev;
193   cache_item_t *next;
194 };
195
196 struct callback_flush_data_s
197 {
198   time_t now;
199   time_t abs_timeout;
200   char **keys;
201   size_t keys_num;
202 };
203 typedef struct callback_flush_data_s callback_flush_data_t;
204
205 enum queue_side_e
206 {
207   HEAD,
208   TAIL
209 };
210 typedef enum queue_side_e queue_side_t;
211
212 /* describe a set of journal files */
213 typedef struct {
214   char **files;
215   size_t files_num;
216 } journal_set;
217
218 /* max length of socket command or response */
219 #define CMD_MAX 4096
220 #define RBUF_SIZE (CMD_MAX*2)
221
222 /*
223  * Variables
224  */
225 static int stay_foreground = 0;
226 static uid_t daemon_uid;
227
228 static listen_socket_t *listen_fds = NULL;
229 static size_t listen_fds_num = 0;
230
231 static listen_socket_t default_socket;
232
233 enum {
234   RUNNING,              /* normal operation */
235   FLUSHING,             /* flushing remaining values */
236   SHUTDOWN              /* shutting down */
237 } state = RUNNING;
238
239 static pthread_t *queue_threads;
240 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
241 static int config_queue_threads = 4;
242
243 static pthread_t flush_thread;
244 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
245
246 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
247 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
248 static int connection_threads_num = 0;
249
250 /* Cache stuff */
251 static GTree          *cache_tree = NULL;
252 static cache_item_t   *cache_queue_head = NULL;
253 static cache_item_t   *cache_queue_tail = NULL;
254 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
255
256 static int config_write_interval = 300;
257 static int config_write_jitter   = 0;
258 static int config_flush_interval = 3600;
259 static int config_flush_at_shutdown = 0;
260 static char *config_pid_file = NULL;
261 static char *config_base_dir = NULL;
262 static size_t _config_base_dir_len = 0;
263 static int config_write_base_only = 0;
264 static size_t config_alloc_chunk = 1;
265
266 static listen_socket_t **config_listen_address_list = NULL;
267 static size_t config_listen_address_list_len = 0;
268
269 static uint64_t stats_queue_length = 0;
270 static uint64_t stats_updates_received = 0;
271 static uint64_t stats_flush_received = 0;
272 static uint64_t stats_updates_written = 0;
273 static uint64_t stats_data_sets_written = 0;
274 static uint64_t stats_journal_bytes = 0;
275 static uint64_t stats_journal_rotate = 0;
276 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
277
278 /* Journaled updates */
279 #define JOURNAL_REPLAY(s) ((s) == NULL)
280 #define JOURNAL_BASE "rrd.journal"
281 static journal_set *journal_cur = NULL;
282 static journal_set *journal_old = NULL;
283 static char *journal_dir = NULL;
284 static FILE *journal_fh = NULL;         /* current journal file handle */
285 static long  journal_size = 0;          /* current journal size */
286 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
287 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
288 static int journal_write(char *cmd, char *args);
289 static void journal_done(void);
290 static void journal_rotate(void);
291
292 /* prototypes for forward refernces */
293 static int handle_request_help (HANDLER_PROTO);
294
295 /* 
296  * Functions
297  */
298 static void sig_common (const char *sig) /* {{{ */
299 {
300   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
301   state = FLUSHING;
302   pthread_cond_broadcast(&flush_cond);
303   pthread_cond_broadcast(&queue_cond);
304 } /* }}} void sig_common */
305
306 static void sig_int_handler (int UNUSED(s)) /* {{{ */
307 {
308   sig_common("INT");
309 } /* }}} void sig_int_handler */
310
311 static void sig_term_handler (int UNUSED(s)) /* {{{ */
312 {
313   sig_common("TERM");
314 } /* }}} void sig_term_handler */
315
316 static void sig_usr1_handler (int UNUSED(s)) /* {{{ */
317 {
318   config_flush_at_shutdown = 1;
319   sig_common("USR1");
320 } /* }}} void sig_usr1_handler */
321
322 static void sig_usr2_handler (int UNUSED(s)) /* {{{ */
323 {
324   config_flush_at_shutdown = 0;
325   sig_common("USR2");
326 } /* }}} void sig_usr2_handler */
327
328 static void install_signal_handlers(void) /* {{{ */
329 {
330   /* These structures are static, because `sigaction' behaves weird if the are
331    * overwritten.. */
332   static struct sigaction sa_int;
333   static struct sigaction sa_term;
334   static struct sigaction sa_pipe;
335   static struct sigaction sa_usr1;
336   static struct sigaction sa_usr2;
337
338   /* Install signal handlers */
339   memset (&sa_int, 0, sizeof (sa_int));
340   sa_int.sa_handler = sig_int_handler;
341   sigaction (SIGINT, &sa_int, NULL);
342
343   memset (&sa_term, 0, sizeof (sa_term));
344   sa_term.sa_handler = sig_term_handler;
345   sigaction (SIGTERM, &sa_term, NULL);
346
347   memset (&sa_pipe, 0, sizeof (sa_pipe));
348   sa_pipe.sa_handler = SIG_IGN;
349   sigaction (SIGPIPE, &sa_pipe, NULL);
350
351   memset (&sa_pipe, 0, sizeof (sa_usr1));
352   sa_usr1.sa_handler = sig_usr1_handler;
353   sigaction (SIGUSR1, &sa_usr1, NULL);
354
355   memset (&sa_usr2, 0, sizeof (sa_usr2));
356   sa_usr2.sa_handler = sig_usr2_handler;
357   sigaction (SIGUSR2, &sa_usr2, NULL);
358
359 } /* }}} void install_signal_handlers */
360
361 static int open_pidfile(char *action, int oflag) /* {{{ */
362 {
363   int fd;
364   const char *file;
365   char *file_copy, *dir;
366
367   file = (config_pid_file != NULL)
368     ? config_pid_file
369     : LOCALSTATEDIR "/run/rrdcached.pid";
370
371   /* dirname may modify its argument */
372   file_copy = strdup(file);
373   if (file_copy == NULL)
374   {
375     fprintf(stderr, "rrdcached: strdup(): %s\n",
376         rrd_strerror(errno));
377     return -1;
378   }
379
380   dir = dirname(file_copy);
381   if (rrd_mkdir_p(dir, 0777) != 0)
382   {
383     fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
384         dir, rrd_strerror(errno));
385     return -1;
386   }
387
388   free(file_copy);
389
390   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
391   if (fd < 0)
392     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
393             action, file, rrd_strerror(errno));
394
395   return(fd);
396 } /* }}} static int open_pidfile */
397
398 /* check existing pid file to see whether a daemon is running */
399 static int check_pidfile(void)
400 {
401   int pid_fd;
402   pid_t pid;
403   char pid_str[16];
404
405   pid_fd = open_pidfile("open", O_RDWR);
406   if (pid_fd < 0)
407     return pid_fd;
408
409   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
410     return -1;
411
412   pid = atoi(pid_str);
413   if (pid <= 0)
414     return -1;
415
416   /* another running process that we can signal COULD be
417    * a competing rrdcached */
418   if (pid != getpid() && kill(pid, 0) == 0)
419   {
420     fprintf(stderr,
421             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
422     close(pid_fd);
423     return -1;
424   }
425
426   lseek(pid_fd, 0, SEEK_SET);
427   if (ftruncate(pid_fd, 0) == -1)
428   {
429     fprintf(stderr,
430             "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
431     close(pid_fd);
432     return -1;
433   }
434
435   fprintf(stderr,
436           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
437           "rrdcached: starting normally.\n", pid);
438
439   return pid_fd;
440 } /* }}} static int check_pidfile */
441
442 static int write_pidfile (int fd) /* {{{ */
443 {
444   pid_t pid;
445   FILE *fh;
446
447   pid = getpid ();
448
449   fh = fdopen (fd, "w");
450   if (fh == NULL)
451   {
452     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
453     close(fd);
454     return (-1);
455   }
456
457   fprintf (fh, "%i\n", (int) pid);
458   fclose (fh);
459
460   return (0);
461 } /* }}} int write_pidfile */
462
463 static int remove_pidfile (void) /* {{{ */
464 {
465   char *file;
466   int status;
467
468   file = (config_pid_file != NULL)
469     ? config_pid_file
470     : LOCALSTATEDIR "/run/rrdcached.pid";
471
472   status = unlink (file);
473   if (status == 0)
474     return (0);
475   return (errno);
476 } /* }}} int remove_pidfile */
477
478 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
479 {
480   char *eol;
481
482   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
483                sock->next_read - sock->next_cmd);
484
485   if (eol == NULL)
486   {
487     /* no commands left, move remainder back to front of rbuf */
488     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
489             sock->next_read - sock->next_cmd);
490     sock->next_read -= sock->next_cmd;
491     sock->next_cmd = 0;
492     *len = 0;
493     return NULL;
494   }
495   else
496   {
497     char *cmd = sock->rbuf + sock->next_cmd;
498     *eol = '\0';
499
500     sock->next_cmd = eol - sock->rbuf + 1;
501
502     if (eol > sock->rbuf && *(eol-1) == '\r')
503       *(--eol) = '\0'; /* handle "\r\n" EOL */
504
505     *len = eol - cmd;
506
507     return cmd;
508   }
509
510   /* NOTREACHED */
511   assert(1==0);
512 } /* }}} char *next_cmd */
513
514 /* add the characters directly to the write buffer */
515 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
516 {
517   char *new_buf;
518
519   assert(sock != NULL);
520
521   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
522   if (new_buf == NULL)
523   {
524     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
525     return -1;
526   }
527
528   strncpy(new_buf + sock->wbuf_len, str, len + 1);
529
530   sock->wbuf = new_buf;
531   sock->wbuf_len += len;
532
533   return 0;
534 } /* }}} static int add_to_wbuf */
535
536 /* add the text to the "extra" info that's sent after the status line */
537 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
538 {
539   va_list argp;
540   char buffer[CMD_MAX];
541   int len;
542
543   if (JOURNAL_REPLAY(sock)) return 0;
544   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
545
546   va_start(argp, fmt);
547 #ifdef HAVE_VSNPRINTF
548   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
549 #else
550   len = vsprintf(buffer, fmt, argp);
551 #endif
552   va_end(argp);
553   if (len < 0)
554   {
555     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
556     return -1;
557   }
558
559   return add_to_wbuf(sock, buffer, len);
560 } /* }}} static int add_response_info */
561
562 static int count_lines(char *str) /* {{{ */
563 {
564   int lines = 0;
565
566   if (str != NULL)
567   {
568     while ((str = strchr(str, '\n')) != NULL)
569     {
570       ++lines;
571       ++str;
572     }
573   }
574
575   return lines;
576 } /* }}} static int count_lines */
577
578 /* send the response back to the user.
579  * returns 0 on success, -1 on error
580  * write buffer is always zeroed after this call */
581 static int send_response (listen_socket_t *sock, response_code rc,
582                           char *fmt, ...) /* {{{ */
583 {
584   va_list argp;
585   char buffer[CMD_MAX];
586   int lines;
587   ssize_t wrote;
588   int rclen, len;
589
590   if (JOURNAL_REPLAY(sock)) return rc;
591
592   if (sock->batch_start)
593   {
594     if (rc == RESP_OK)
595       return rc; /* no response on success during BATCH */
596     lines = sock->batch_cmd;
597   }
598   else if (rc == RESP_OK)
599     lines = count_lines(sock->wbuf);
600   else
601     lines = -1;
602
603   rclen = sprintf(buffer, "%d ", lines);
604   va_start(argp, fmt);
605 #ifdef HAVE_VSNPRINTF
606   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
607 #else
608   len = vsprintf(buffer+rclen, fmt, argp);
609 #endif
610   va_end(argp);
611   if (len < 0)
612     return -1;
613
614   len += rclen;
615
616   /* append the result to the wbuf, don't write to the user */
617   if (sock->batch_start)
618     return add_to_wbuf(sock, buffer, len);
619
620   /* first write must be complete */
621   if (len != write(sock->fd, buffer, len))
622   {
623     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
624     return -1;
625   }
626
627   if (sock->wbuf != NULL && rc == RESP_OK)
628   {
629     wrote = 0;
630     while (wrote < sock->wbuf_len)
631     {
632       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
633       if (wb <= 0)
634       {
635         RRDD_LOG(LOG_INFO, "send_response: could not write results");
636         return -1;
637       }
638       wrote += wb;
639     }
640   }
641
642   free(sock->wbuf); sock->wbuf = NULL;
643   sock->wbuf_len = 0;
644
645   return 0;
646 } /* }}} */
647
648 static void wipe_ci_values(cache_item_t *ci, time_t when)
649 {
650   ci->values = NULL;
651   ci->values_num = 0;
652   ci->values_alloc = 0;
653
654   ci->last_flush_time = when;
655   if (config_write_jitter > 0)
656     ci->last_flush_time += (rrd_random() % config_write_jitter);
657 }
658
659 /* remove_from_queue
660  * remove a "cache_item_t" item from the queue.
661  * must hold 'cache_lock' when calling this
662  */
663 static void remove_from_queue(cache_item_t *ci) /* {{{ */
664 {
665   if (ci == NULL) return;
666   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
667
668   if (ci->prev == NULL)
669     cache_queue_head = ci->next; /* reset head */
670   else
671     ci->prev->next = ci->next;
672
673   if (ci->next == NULL)
674     cache_queue_tail = ci->prev; /* reset the tail */
675   else
676     ci->next->prev = ci->prev;
677
678   ci->next = ci->prev = NULL;
679   ci->flags &= ~CI_FLAGS_IN_QUEUE;
680
681   pthread_mutex_lock (&stats_lock);
682   assert (stats_queue_length > 0);
683   stats_queue_length--;
684   pthread_mutex_unlock (&stats_lock);
685
686 } /* }}} static void remove_from_queue */
687
688 /* free the resources associated with the cache_item_t
689  * must hold cache_lock when calling this function
690  */
691 static void *free_cache_item(cache_item_t *ci) /* {{{ */
692 {
693   if (ci == NULL) return NULL;
694
695   remove_from_queue(ci);
696
697   for (size_t i=0; i < ci->values_num; i++)
698     free(ci->values[i]);
699
700   free (ci->values);
701   free (ci->file);
702
703   /* in case anyone is waiting */
704   pthread_cond_broadcast(&ci->flushed);
705   pthread_cond_destroy(&ci->flushed);
706
707   free (ci);
708
709   return NULL;
710 } /* }}} static void *free_cache_item */
711
712 /*
713  * enqueue_cache_item:
714  * `cache_lock' must be acquired before calling this function!
715  */
716 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
717     queue_side_t side)
718 {
719   if (ci == NULL)
720     return (-1);
721
722   if (ci->values_num == 0)
723     return (0);
724
725   if (side == HEAD)
726   {
727     if (cache_queue_head == ci)
728       return 0;
729
730     /* remove if further down in queue */
731     remove_from_queue(ci);
732
733     ci->prev = NULL;
734     ci->next = cache_queue_head;
735     if (ci->next != NULL)
736       ci->next->prev = ci;
737     cache_queue_head = ci;
738
739     if (cache_queue_tail == NULL)
740       cache_queue_tail = cache_queue_head;
741   }
742   else /* (side == TAIL) */
743   {
744     /* We don't move values back in the list.. */
745     if (ci->flags & CI_FLAGS_IN_QUEUE)
746       return (0);
747
748     assert (ci->next == NULL);
749     assert (ci->prev == NULL);
750
751     ci->prev = cache_queue_tail;
752
753     if (cache_queue_tail == NULL)
754       cache_queue_head = ci;
755     else
756       cache_queue_tail->next = ci;
757
758     cache_queue_tail = ci;
759   }
760
761   ci->flags |= CI_FLAGS_IN_QUEUE;
762
763   pthread_cond_signal(&queue_cond);
764   pthread_mutex_lock (&stats_lock);
765   stats_queue_length++;
766   pthread_mutex_unlock (&stats_lock);
767
768   return (0);
769 } /* }}} int enqueue_cache_item */
770
771 /*
772  * tree_callback_flush:
773  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
774  * while this is in progress.
775  */
776 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
777     gpointer data)
778 {
779   cache_item_t *ci;
780   callback_flush_data_t *cfd;
781
782   ci = (cache_item_t *) value;
783   cfd = (callback_flush_data_t *) data;
784
785   if (ci->flags & CI_FLAGS_IN_QUEUE)
786     return FALSE;
787
788   if (ci->values_num > 0
789       && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
790   {
791     enqueue_cache_item (ci, TAIL);
792   }
793   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
794       && (ci->values_num <= 0))
795   {
796     assert ((char *) key == ci->file);
797     if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
798     {
799       RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
800       return (FALSE);
801     }
802   }
803
804   return (FALSE);
805 } /* }}} gboolean tree_callback_flush */
806
807 static int flush_old_values (int max_age)
808 {
809   callback_flush_data_t cfd;
810   size_t k;
811
812   memset (&cfd, 0, sizeof (cfd));
813   /* Pass the current time as user data so that we don't need to call
814    * `time' for each node. */
815   cfd.now = time (NULL);
816   cfd.keys = NULL;
817   cfd.keys_num = 0;
818
819   if (max_age > 0)
820     cfd.abs_timeout = cfd.now - max_age;
821   else
822     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
823
824   /* `tree_callback_flush' will return the keys of all values that haven't
825    * been touched in the last `config_flush_interval' seconds in `cfd'.
826    * The char*'s in this array point to the same memory as ci->file, so we
827    * don't need to free them separately. */
828   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
829
830   for (k = 0; k < cfd.keys_num; k++)
831   {
832     gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
833     /* should never fail, since we have held the cache_lock
834      * the entire time */
835     assert(status == TRUE);
836   }
837
838   if (cfd.keys != NULL)
839   {
840     free (cfd.keys);
841     cfd.keys = NULL;
842   }
843
844   return (0);
845 } /* int flush_old_values */
846
847 static void *flush_thread_main (void UNUSED(*args)) /* {{{ */
848 {
849   struct timeval now;
850   struct timespec next_flush;
851   int status;
852
853   gettimeofday (&now, NULL);
854   next_flush.tv_sec = now.tv_sec + config_flush_interval;
855   next_flush.tv_nsec = 1000 * now.tv_usec;
856
857   pthread_mutex_lock(&cache_lock);
858
859   while (state == RUNNING)
860   {
861     gettimeofday (&now, NULL);
862     if ((now.tv_sec > next_flush.tv_sec)
863         || ((now.tv_sec == next_flush.tv_sec)
864           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
865     {
866       RRDD_LOG(LOG_DEBUG, "flushing old values");
867
868       /* Determine the time of the next cache flush. */
869       next_flush.tv_sec = now.tv_sec + config_flush_interval;
870
871       /* Flush all values that haven't been written in the last
872        * `config_write_interval' seconds. */
873       flush_old_values (config_write_interval);
874
875       /* unlock the cache while we rotate so we don't block incoming
876        * updates if the fsync() blocks on disk I/O */
877       pthread_mutex_unlock(&cache_lock);
878       journal_rotate();
879       pthread_mutex_lock(&cache_lock);
880     }
881
882     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
883     if (status != 0 && status != ETIMEDOUT)
884     {
885       RRDD_LOG (LOG_ERR, "flush_thread_main: "
886                 "pthread_cond_timedwait returned %i.", status);
887     }
888   }
889
890   if (config_flush_at_shutdown)
891     flush_old_values (-1); /* flush everything */
892
893   state = SHUTDOWN;
894
895   pthread_mutex_unlock(&cache_lock);
896
897   return NULL;
898 } /* void *flush_thread_main */
899
900 static void *queue_thread_main (void UNUSED(*args)) /* {{{ */
901 {
902   pthread_mutex_lock (&cache_lock);
903
904   while (state != SHUTDOWN
905          || (cache_queue_head != NULL && config_flush_at_shutdown))
906   {
907     cache_item_t *ci;
908     char *file;
909     char **values;
910     size_t values_num;
911     int status;
912
913     /* Now, check if there's something to store away. If not, wait until
914      * something comes in. */
915     if (cache_queue_head == NULL)
916     {
917       status = pthread_cond_wait (&queue_cond, &cache_lock);
918       if ((status != 0) && (status != ETIMEDOUT))
919       {
920         RRDD_LOG (LOG_ERR, "queue_thread_main: "
921             "pthread_cond_wait returned %i.", status);
922       }
923     }
924
925     /* Check if a value has arrived. This may be NULL if we timed out or there
926      * was an interrupt such as a signal. */
927     if (cache_queue_head == NULL)
928       continue;
929
930     ci = cache_queue_head;
931
932     /* copy the relevant parts */
933     file = strdup (ci->file);
934     if (file == NULL)
935     {
936       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
937       continue;
938     }
939
940     assert(ci->values != NULL);
941     assert(ci->values_num > 0);
942
943     values = ci->values;
944     values_num = ci->values_num;
945
946     wipe_ci_values(ci, time(NULL));
947     remove_from_queue(ci);
948
949     pthread_mutex_unlock (&cache_lock);
950
951     rrd_clear_error ();
952     status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
953     if (status != 0)
954     {
955       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
956           "rrd_update_r (%s) failed with status %i. (%s)",
957           file, status, rrd_get_error());
958     }
959
960     journal_write("wrote", file);
961
962     /* Search again in the tree.  It's possible someone issued a "FORGET"
963      * while we were writing the update values. */
964     pthread_mutex_lock(&cache_lock);
965     ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
966     if (ci)
967       pthread_cond_broadcast(&ci->flushed);
968     pthread_mutex_unlock(&cache_lock);
969
970     if (status == 0)
971     {
972       pthread_mutex_lock (&stats_lock);
973       stats_updates_written++;
974       stats_data_sets_written += values_num;
975       pthread_mutex_unlock (&stats_lock);
976     }
977
978     rrd_free_ptrs((void ***) &values, &values_num);
979     free(file);
980
981     pthread_mutex_lock (&cache_lock);
982   }
983   pthread_mutex_unlock (&cache_lock);
984
985   return (NULL);
986 } /* }}} void *queue_thread_main */
987
988 static int buffer_get_field (char **buffer_ret, /* {{{ */
989     size_t *buffer_size_ret, char **field_ret)
990 {
991   char *buffer;
992   size_t buffer_pos;
993   size_t buffer_size;
994   char *field;
995   size_t field_size;
996   int status;
997
998   buffer = *buffer_ret;
999   buffer_pos = 0;
1000   buffer_size = *buffer_size_ret;
1001   field = *buffer_ret;
1002   field_size = 0;
1003
1004   if (buffer_size <= 0)
1005     return (-1);
1006
1007   /* This is ensured by `handle_request'. */
1008   assert (buffer[buffer_size - 1] == '\0');
1009
1010   status = -1;
1011   while (buffer_pos < buffer_size)
1012   {
1013     /* Check for end-of-field or end-of-buffer */
1014     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1015     {
1016       field[field_size] = 0;
1017       field_size++;
1018       buffer_pos++;
1019       status = 0;
1020       break;
1021     }
1022     /* Handle escaped characters. */
1023     else if (buffer[buffer_pos] == '\\')
1024     {
1025       if (buffer_pos >= (buffer_size - 1))
1026         break;
1027       buffer_pos++;
1028       field[field_size] = buffer[buffer_pos];
1029       field_size++;
1030       buffer_pos++;
1031     }
1032     /* Normal operation */ 
1033     else
1034     {
1035       field[field_size] = buffer[buffer_pos];
1036       field_size++;
1037       buffer_pos++;
1038     }
1039   } /* while (buffer_pos < buffer_size) */
1040
1041   if (status != 0)
1042     return (status);
1043
1044   *buffer_ret = buffer + buffer_pos;
1045   *buffer_size_ret = buffer_size - buffer_pos;
1046   *field_ret = field;
1047
1048   return (0);
1049 } /* }}} int buffer_get_field */
1050
1051 /* if we're restricting writes to the base directory,
1052  * check whether the file falls within the dir
1053  * returns 1 if OK, otherwise 0
1054  */
1055 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1056 {
1057   assert(file != NULL);
1058
1059   if (!config_write_base_only
1060       || JOURNAL_REPLAY(sock)
1061       || config_base_dir == NULL)
1062     return 1;
1063
1064   if (strstr(file, "../") != NULL) goto err;
1065
1066   /* relative paths without "../" are ok */
1067   if (*file != '/') return 1;
1068
1069   /* file must be of the format base + "/" + <1+ char filename> */
1070   if (strlen(file) < _config_base_dir_len + 2) goto err;
1071   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1072   if (*(file + _config_base_dir_len) != '/') goto err;
1073
1074   return 1;
1075
1076 err:
1077   if (sock != NULL && sock->fd >= 0)
1078     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1079
1080   return 0;
1081 } /* }}} static int check_file_access */
1082
1083 /* when using a base dir, convert relative paths to absolute paths.
1084  * if necessary, modifies the "filename" pointer to point
1085  * to the new path created in "tmp".  "tmp" is provided
1086  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1087  *
1088  * this allows us to optimize for the expected case (absolute path)
1089  * with a no-op.
1090  */
1091 static void get_abs_path(char **filename, char *tmp)
1092 {
1093   assert(tmp != NULL);
1094   assert(filename != NULL && *filename != NULL);
1095
1096   if (config_base_dir == NULL || **filename == '/')
1097     return;
1098
1099   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1100   *filename = tmp;
1101 } /* }}} static int get_abs_path */
1102
1103 static int flush_file (const char *filename) /* {{{ */
1104 {
1105   cache_item_t *ci;
1106
1107   pthread_mutex_lock (&cache_lock);
1108
1109   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1110   if (ci == NULL)
1111   {
1112     pthread_mutex_unlock (&cache_lock);
1113     return (ENOENT);
1114   }
1115
1116   if (ci->values_num > 0)
1117   {
1118     /* Enqueue at head */
1119     enqueue_cache_item (ci, HEAD);
1120     pthread_cond_wait(&ci->flushed, &cache_lock);
1121   }
1122
1123   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1124    * may have been purged during our cond_wait() */
1125
1126   pthread_mutex_unlock(&cache_lock);
1127
1128   return (0);
1129 } /* }}} int flush_file */
1130
1131 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1132 {
1133   char *err = "Syntax error.\n";
1134
1135   if (cmd && cmd->syntax)
1136     err = cmd->syntax;
1137
1138   return send_response(sock, RESP_ERR, "Usage: %s", err);
1139 } /* }}} static int syntax_error() */
1140
1141 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1142 {
1143   uint64_t copy_queue_length;
1144   uint64_t copy_updates_received;
1145   uint64_t copy_flush_received;
1146   uint64_t copy_updates_written;
1147   uint64_t copy_data_sets_written;
1148   uint64_t copy_journal_bytes;
1149   uint64_t copy_journal_rotate;
1150
1151   uint64_t tree_nodes_number;
1152   uint64_t tree_depth;
1153
1154   pthread_mutex_lock (&stats_lock);
1155   copy_queue_length       = stats_queue_length;
1156   copy_updates_received   = stats_updates_received;
1157   copy_flush_received     = stats_flush_received;
1158   copy_updates_written    = stats_updates_written;
1159   copy_data_sets_written  = stats_data_sets_written;
1160   copy_journal_bytes      = stats_journal_bytes;
1161   copy_journal_rotate     = stats_journal_rotate;
1162   pthread_mutex_unlock (&stats_lock);
1163
1164   pthread_mutex_lock (&cache_lock);
1165   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1166   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1167   pthread_mutex_unlock (&cache_lock);
1168
1169   add_response_info(sock,
1170                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1171   add_response_info(sock,
1172                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1173   add_response_info(sock,
1174                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1175   add_response_info(sock,
1176                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1177   add_response_info(sock,
1178                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1179   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1180   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1181   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1182   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1183
1184   send_response(sock, RESP_OK, "Statistics follow\n");
1185
1186   return (0);
1187 } /* }}} int handle_request_stats */
1188
1189 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1190 {
1191   char *file, file_tmp[PATH_MAX];
1192   int status;
1193
1194   status = buffer_get_field (&buffer, &buffer_size, &file);
1195   if (status != 0)
1196   {
1197     return syntax_error(sock,cmd);
1198   }
1199   else
1200   {
1201     pthread_mutex_lock(&stats_lock);
1202     stats_flush_received++;
1203     pthread_mutex_unlock(&stats_lock);
1204
1205     get_abs_path(&file, file_tmp);
1206     if (!check_file_access(file, sock)) return 0;
1207
1208     status = flush_file (file);
1209     if (status == 0)
1210       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1211     else if (status == ENOENT)
1212     {
1213       /* no file in our tree; see whether it exists at all */
1214       struct stat statbuf;
1215
1216       memset(&statbuf, 0, sizeof(statbuf));
1217       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1218         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1219       else
1220         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1221     }
1222     else if (status < 0)
1223       return send_response(sock, RESP_ERR, "Internal error.\n");
1224     else
1225       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1226   }
1227
1228   /* NOTREACHED */
1229   assert(1==0);
1230 } /* }}} int handle_request_flush */
1231
1232 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1233 {
1234   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1235
1236   pthread_mutex_lock(&cache_lock);
1237   flush_old_values(-1);
1238   pthread_mutex_unlock(&cache_lock);
1239
1240   return send_response(sock, RESP_OK, "Started flush.\n");
1241 } /* }}} static int handle_request_flushall */
1242
1243 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1244 {
1245   int status;
1246   char *file, file_tmp[PATH_MAX];
1247   cache_item_t *ci;
1248
1249   status = buffer_get_field(&buffer, &buffer_size, &file);
1250   if (status != 0)
1251     return syntax_error(sock,cmd);
1252
1253   get_abs_path(&file, file_tmp);
1254
1255   pthread_mutex_lock(&cache_lock);
1256   ci = g_tree_lookup(cache_tree, file);
1257   if (ci == NULL)
1258   {
1259     pthread_mutex_unlock(&cache_lock);
1260     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1261   }
1262
1263   for (size_t i=0; i < ci->values_num; i++)
1264     add_response_info(sock, "%s\n", ci->values[i]);
1265
1266   pthread_mutex_unlock(&cache_lock);
1267   return send_response(sock, RESP_OK, "updates pending\n");
1268 } /* }}} static int handle_request_pending */
1269
1270 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1271 {
1272   int status;
1273   gboolean found;
1274   char *file, file_tmp[PATH_MAX];
1275
1276   status = buffer_get_field(&buffer, &buffer_size, &file);
1277   if (status != 0)
1278     return syntax_error(sock,cmd);
1279
1280   get_abs_path(&file, file_tmp);
1281   if (!check_file_access(file, sock)) return 0;
1282
1283   pthread_mutex_lock(&cache_lock);
1284   found = g_tree_remove(cache_tree, file);
1285   pthread_mutex_unlock(&cache_lock);
1286
1287   if (found == TRUE)
1288   {
1289     if (!JOURNAL_REPLAY(sock))
1290       journal_write("forget", file);
1291
1292     return send_response(sock, RESP_OK, "Gone!\n");
1293   }
1294   else
1295     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1296
1297   /* NOTREACHED */
1298   assert(1==0);
1299 } /* }}} static int handle_request_forget */
1300
1301 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1302 {
1303   cache_item_t *ci;
1304
1305   pthread_mutex_lock(&cache_lock);
1306
1307   ci = cache_queue_head;
1308   while (ci != NULL)
1309   {
1310     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1311     ci = ci->next;
1312   }
1313
1314   pthread_mutex_unlock(&cache_lock);
1315
1316   return send_response(sock, RESP_OK, "in queue.\n");
1317 } /* }}} int handle_request_queue */
1318
1319 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1320 {
1321   char *file, file_tmp[PATH_MAX];
1322   int values_num = 0;
1323   int status;
1324   char orig_buf[CMD_MAX];
1325
1326   cache_item_t *ci;
1327
1328   /* save it for the journal later */
1329   if (!JOURNAL_REPLAY(sock))
1330     strncpy(orig_buf, buffer, buffer_size);
1331
1332   status = buffer_get_field (&buffer, &buffer_size, &file);
1333   if (status != 0)
1334     return syntax_error(sock,cmd);
1335
1336   pthread_mutex_lock(&stats_lock);
1337   stats_updates_received++;
1338   pthread_mutex_unlock(&stats_lock);
1339
1340   get_abs_path(&file, file_tmp);
1341   if (!check_file_access(file, sock)) return 0;
1342
1343   pthread_mutex_lock (&cache_lock);
1344   ci = g_tree_lookup (cache_tree, file);
1345
1346   if (ci == NULL) /* {{{ */
1347   {
1348     struct stat statbuf;
1349     cache_item_t *tmp;
1350
1351     /* don't hold the lock while we setup; stat(2) might block */
1352     pthread_mutex_unlock(&cache_lock);
1353
1354     memset (&statbuf, 0, sizeof (statbuf));
1355     status = stat (file, &statbuf);
1356     if (status != 0)
1357     {
1358       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1359
1360       status = errno;
1361       if (status == ENOENT)
1362         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1363       else
1364         return send_response(sock, RESP_ERR,
1365                              "stat failed with error %i.\n", status);
1366     }
1367     if (!S_ISREG (statbuf.st_mode))
1368       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1369
1370     if (access(file, R_OK|W_OK) != 0)
1371       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1372                            file, rrd_strerror(errno));
1373
1374     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1375     if (ci == NULL)
1376     {
1377       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1378
1379       return send_response(sock, RESP_ERR, "malloc failed.\n");
1380     }
1381     memset (ci, 0, sizeof (cache_item_t));
1382
1383     ci->file = strdup (file);
1384     if (ci->file == NULL)
1385     {
1386       free (ci);
1387       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1388
1389       return send_response(sock, RESP_ERR, "strdup failed.\n");
1390     }
1391
1392     wipe_ci_values(ci, now);
1393     ci->flags = CI_FLAGS_IN_TREE;
1394     pthread_cond_init(&ci->flushed, NULL);
1395
1396     pthread_mutex_lock(&cache_lock);
1397
1398     /* another UPDATE might have added this entry in the meantime */
1399     tmp = g_tree_lookup (cache_tree, file);
1400     if (tmp == NULL)
1401       g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1402     else
1403     {
1404       free_cache_item (ci);
1405       ci = tmp;
1406     }
1407
1408     /* state may have changed while we were unlocked */
1409     if (state == SHUTDOWN)
1410       return -1;
1411   } /* }}} */
1412   assert (ci != NULL);
1413
1414   /* don't re-write updates in replay mode */
1415   if (!JOURNAL_REPLAY(sock))
1416     journal_write("update", orig_buf);
1417
1418   while (buffer_size > 0)
1419   {
1420     char *value;
1421     time_t stamp;
1422     char *eostamp;
1423
1424     status = buffer_get_field (&buffer, &buffer_size, &value);
1425     if (status != 0)
1426     {
1427       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1428       break;
1429     }
1430
1431     /* make sure update time is always moving forward */
1432     stamp = strtol(value, &eostamp, 10);
1433     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1434     {
1435       pthread_mutex_unlock(&cache_lock);
1436       return send_response(sock, RESP_ERR,
1437                            "Cannot find timestamp in '%s'!\n", value);
1438     }
1439     else if (stamp <= ci->last_update_stamp)
1440     {
1441       pthread_mutex_unlock(&cache_lock);
1442       return send_response(sock, RESP_ERR,
1443                            "illegal attempt to update using time %ld when last"
1444                            " update time is %ld (minimum one second step)\n",
1445                            stamp, ci->last_update_stamp);
1446     }
1447     else
1448       ci->last_update_stamp = stamp;
1449
1450     if (!rrd_add_strdup_chunk(&ci->values, &ci->values_num, value,
1451                               &ci->values_alloc, config_alloc_chunk))
1452     {
1453       RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1454       continue;
1455     }
1456
1457     values_num++;
1458   }
1459
1460   if (((now - ci->last_flush_time) >= config_write_interval)
1461       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1462       && (ci->values_num > 0))
1463   {
1464     enqueue_cache_item (ci, TAIL);
1465   }
1466
1467   pthread_mutex_unlock (&cache_lock);
1468
1469   if (values_num < 1)
1470     return send_response(sock, RESP_ERR, "No values updated.\n");
1471   else
1472     return send_response(sock, RESP_OK,
1473                          "errors, enqueued %i value(s).\n", values_num);
1474
1475   /* NOTREACHED */
1476   assert(1==0);
1477
1478 } /* }}} int handle_request_update */
1479
1480 static int handle_request_fetch (HANDLER_PROTO) /* {{{ */
1481 {
1482   char *file, file_tmp[PATH_MAX];
1483   char *cf;
1484
1485   char *start_str;
1486   char *end_str;
1487   time_t start_tm;
1488   time_t end_tm;
1489
1490   unsigned long step;
1491   unsigned long ds_cnt;
1492   char **ds_namv;
1493   rrd_value_t *data;
1494
1495   int status;
1496   unsigned long i;
1497   time_t t;
1498   rrd_value_t *data_ptr;
1499
1500   file = NULL;
1501   cf = NULL;
1502   start_str = NULL;
1503   end_str = NULL;
1504
1505   /* Read the arguments */
1506   do /* while (0) */
1507   {
1508     status = buffer_get_field (&buffer, &buffer_size, &file);
1509     if (status != 0)
1510       break;
1511
1512     status = buffer_get_field (&buffer, &buffer_size, &cf);
1513     if (status != 0)
1514       break;
1515
1516     status = buffer_get_field (&buffer, &buffer_size, &start_str);
1517     if (status != 0)
1518     {
1519       start_str = NULL;
1520       status = 0;
1521       break;
1522     }
1523
1524     status = buffer_get_field (&buffer, &buffer_size, &end_str);
1525     if (status != 0)
1526     {
1527       end_str = NULL;
1528       status = 0;
1529       break;
1530     }
1531   } while (0);
1532
1533   if (status != 0)
1534     return (syntax_error(sock,cmd));
1535
1536   get_abs_path(&file, file_tmp);
1537   if (!check_file_access(file, sock)) return 0;
1538
1539   status = flush_file (file);
1540   if ((status != 0) && (status != ENOENT))
1541     return (send_response (sock, RESP_ERR,
1542           "flush_file (%s) failed with status %i.\n", file, status));
1543
1544   t = time (NULL); /* "now" */
1545
1546   /* Parse start time */
1547   if (start_str != NULL)
1548   {
1549     char *endptr;
1550     long value;
1551
1552     endptr = NULL;
1553     errno = 0;
1554     value = strtol (start_str, &endptr, /* base = */ 0);
1555     if ((endptr == start_str) || (errno != 0))
1556       return (send_response(sock, RESP_ERR,
1557             "Cannot parse start time `%s': Only simple integers are allowed.\n",
1558             start_str));
1559
1560     if (value > 0)
1561       start_tm = (time_t) value;
1562     else
1563       start_tm = (time_t) (t + value);
1564   }
1565   else
1566   {
1567     start_tm = t - 86400;
1568   }
1569
1570   /* Parse end time */
1571   if (end_str != NULL)
1572   {
1573     char *endptr;
1574     long value;
1575
1576     endptr = NULL;
1577     errno = 0;
1578     value = strtol (end_str, &endptr, /* base = */ 0);
1579     if ((endptr == end_str) || (errno != 0))
1580       return (send_response(sock, RESP_ERR,
1581             "Cannot parse end time `%s': Only simple integers are allowed.\n",
1582             end_str));
1583
1584     if (value > 0)
1585       end_tm = (time_t) value;
1586     else
1587       end_tm = (time_t) (t + value);
1588   }
1589   else
1590   {
1591     end_tm = t;
1592   }
1593
1594   step = -1;
1595   ds_cnt = 0;
1596   ds_namv = NULL;
1597   data = NULL;
1598
1599   status = rrd_fetch_r (file, cf, &start_tm, &end_tm, &step,
1600       &ds_cnt, &ds_namv, &data);
1601   if (status != 0)
1602     return (send_response(sock, RESP_ERR,
1603           "rrd_fetch_r failed: %s\n", rrd_get_error ()));
1604
1605   add_response_info (sock, "FlushVersion: %lu\n", 1);
1606   add_response_info (sock, "Start: %lu\n", (unsigned long) start_tm);
1607   add_response_info (sock, "End: %lu\n", (unsigned long) end_tm);
1608   add_response_info (sock, "Step: %lu\n", step);
1609   add_response_info (sock, "DSCount: %lu\n", ds_cnt);
1610
1611 #define SSTRCAT(buffer,str,buffer_fill) do { \
1612     size_t str_len = strlen (str); \
1613     if ((buffer_fill + str_len) > sizeof (buffer)) \
1614       str_len = sizeof (buffer) - buffer_fill; \
1615     if (str_len > 0) { \
1616       strncpy (buffer + buffer_fill, str, str_len); \
1617       buffer_fill += str_len; \
1618       assert (buffer_fill <= sizeof (buffer)); \
1619       if (buffer_fill == sizeof (buffer)) \
1620         buffer[buffer_fill - 1] = 0; \
1621       else \
1622         buffer[buffer_fill] = 0; \
1623     } \
1624   } while (0)
1625
1626   { /* Add list of DS names */
1627     char linebuf[1024];
1628     size_t linebuf_fill;
1629
1630     memset (linebuf, 0, sizeof (linebuf));
1631     linebuf_fill = 0;
1632     for (i = 0; i < ds_cnt; i++)
1633     {
1634       if (i > 0)
1635         SSTRCAT (linebuf, " ", linebuf_fill);
1636       SSTRCAT (linebuf, ds_namv[i], linebuf_fill);
1637       rrd_freemem(ds_namv[i]);
1638     }
1639     rrd_freemem(ds_namv);
1640     add_response_info (sock, "DSName: %s\n", linebuf);
1641   }
1642
1643   /* Add the actual data */
1644   assert (step > 0);
1645   data_ptr = data;
1646   for (t = start_tm + step; t <= end_tm; t += step)
1647   {
1648     char linebuf[1024];
1649     size_t linebuf_fill;
1650     char tmp[128];
1651
1652     memset (linebuf, 0, sizeof (linebuf));
1653     linebuf_fill = 0;
1654     for (i = 0; i < ds_cnt; i++)
1655     {
1656       snprintf (tmp, sizeof (tmp), " %0.10e", *data_ptr);
1657       tmp[sizeof (tmp) - 1] = 0;
1658       SSTRCAT (linebuf, tmp, linebuf_fill);
1659
1660       data_ptr++;
1661     }
1662
1663     add_response_info (sock, "%10lu:%s\n", (unsigned long) t, linebuf);
1664   } /* for (t) */
1665   rrd_freemem(data);
1666
1667   return (send_response (sock, RESP_OK, "Success\n"));
1668 #undef SSTRCAT
1669 } /* }}} int handle_request_fetch */
1670
1671 /* we came across a "WROTE" entry during journal replay.
1672  * throw away any values that we have accumulated for this file
1673  */
1674 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1675 {
1676   cache_item_t *ci;
1677   const char *file = buffer;
1678
1679   pthread_mutex_lock(&cache_lock);
1680
1681   ci = g_tree_lookup(cache_tree, file);
1682   if (ci == NULL)
1683   {
1684     pthread_mutex_unlock(&cache_lock);
1685     return (0);
1686   }
1687
1688   if (ci->values)
1689     rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1690
1691   wipe_ci_values(ci, now);
1692   remove_from_queue(ci);
1693
1694   pthread_mutex_unlock(&cache_lock);
1695   return (0);
1696 } /* }}} int handle_request_wrote */
1697
1698 /* start "BATCH" processing */
1699 static int batch_start (HANDLER_PROTO) /* {{{ */
1700 {
1701   int status;
1702   if (sock->batch_start)
1703     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1704
1705   status = send_response(sock, RESP_OK,
1706                          "Go ahead.  End with dot '.' on its own line.\n");
1707   sock->batch_start = time(NULL);
1708   sock->batch_cmd = 0;
1709
1710   return status;
1711 } /* }}} static int batch_start */
1712
1713 /* finish "BATCH" processing and return results to the client */
1714 static int batch_done (HANDLER_PROTO) /* {{{ */
1715 {
1716   assert(sock->batch_start);
1717   sock->batch_start = 0;
1718   sock->batch_cmd  = 0;
1719   return send_response(sock, RESP_OK, "errors\n");
1720 } /* }}} static int batch_done */
1721
1722 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1723 {
1724   return -1;
1725 } /* }}} static int handle_request_quit */
1726
1727 static command_t list_of_commands[] = { /* {{{ */
1728   {
1729     "UPDATE",
1730     handle_request_update,
1731     CMD_CONTEXT_ANY,
1732     "UPDATE <filename> <values> [<values> ...]\n"
1733     ,
1734     "Adds the given file to the internal cache if it is not yet known and\n"
1735     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1736     "for details.\n"
1737     "\n"
1738     "Each <values> has the following form:\n"
1739     "  <values> = <time>:<value>[:<value>[...]]\n"
1740     "See the rrdupdate(1) manpage for details.\n"
1741   },
1742   {
1743     "WROTE",
1744     handle_request_wrote,
1745     CMD_CONTEXT_JOURNAL,
1746     NULL,
1747     NULL
1748   },
1749   {
1750     "FLUSH",
1751     handle_request_flush,
1752     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1753     "FLUSH <filename>\n"
1754     ,
1755     "Adds the given filename to the head of the update queue and returns\n"
1756     "after it has been dequeued.\n"
1757   },
1758   {
1759     "FLUSHALL",
1760     handle_request_flushall,
1761     CMD_CONTEXT_CLIENT,
1762     "FLUSHALL\n"
1763     ,
1764     "Triggers writing of all pending updates.  Returns immediately.\n"
1765   },
1766   {
1767     "PENDING",
1768     handle_request_pending,
1769     CMD_CONTEXT_CLIENT,
1770     "PENDING <filename>\n"
1771     ,
1772     "Shows any 'pending' updates for a file, in order.\n"
1773     "The updates shown have not yet been written to the underlying RRD file.\n"
1774   },
1775   {
1776     "FORGET",
1777     handle_request_forget,
1778     CMD_CONTEXT_ANY,
1779     "FORGET <filename>\n"
1780     ,
1781     "Removes the file completely from the cache.\n"
1782     "Any pending updates for the file will be lost.\n"
1783   },
1784   {
1785     "QUEUE",
1786     handle_request_queue,
1787     CMD_CONTEXT_CLIENT,
1788     "QUEUE\n"
1789     ,
1790         "Shows all files in the output queue.\n"
1791     "The output is zero or more lines in the following format:\n"
1792     "(where <num_vals> is the number of values to be written)\n"
1793     "\n"
1794     "<num_vals> <filename>\n"
1795   },
1796   {
1797     "STATS",
1798     handle_request_stats,
1799     CMD_CONTEXT_CLIENT,
1800     "STATS\n"
1801     ,
1802     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1803     "a description of the values.\n"
1804   },
1805   {
1806     "HELP",
1807     handle_request_help,
1808     CMD_CONTEXT_CLIENT,
1809     "HELP [<command>]\n",
1810     NULL, /* special! */
1811   },
1812   {
1813     "BATCH",
1814     batch_start,
1815     CMD_CONTEXT_CLIENT,
1816     "BATCH\n"
1817     ,
1818     "The 'BATCH' command permits the client to initiate a bulk load\n"
1819     "   of commands to rrdcached.\n"
1820     "\n"
1821     "Usage:\n"
1822     "\n"
1823     "    client: BATCH\n"
1824     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1825     "    client: command #1\n"
1826     "    client: command #2\n"
1827     "    client: ... and so on\n"
1828     "    client: .\n"
1829     "    server: 2 errors\n"
1830     "    server: 7 message for command #7\n"
1831     "    server: 9 message for command #9\n"
1832     "\n"
1833     "For more information, consult the rrdcached(1) documentation.\n"
1834   },
1835   {
1836     ".",   /* BATCH terminator */
1837     batch_done,
1838     CMD_CONTEXT_BATCH,
1839     NULL,
1840     NULL
1841   },
1842   {
1843     "FETCH",
1844     handle_request_fetch,
1845     CMD_CONTEXT_CLIENT,
1846     "FETCH <file> <CF> [<start> [<end>]]\n"
1847     ,
1848     "The 'FETCH' can be used by the client to retrieve values from an RRD file.\n"
1849   },
1850   {
1851     "QUIT",
1852     handle_request_quit,
1853     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1854     "QUIT\n"
1855     ,
1856     "Disconnect from rrdcached.\n"
1857   }
1858 }; /* }}} command_t list_of_commands[] */
1859 static size_t list_of_commands_len = sizeof (list_of_commands)
1860   / sizeof (list_of_commands[0]);
1861
1862 static command_t *find_command(char *cmd)
1863 {
1864   size_t i;
1865
1866   for (i = 0; i < list_of_commands_len; i++)
1867     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1868       return (&list_of_commands[i]);
1869   return NULL;
1870 }
1871
1872 /* We currently use the index in the `list_of_commands' array as a bit position
1873  * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
1874  * outside these functions so that switching to a more elegant storage method
1875  * is easily possible. */
1876 static ssize_t find_command_index (const char *cmd) /* {{{ */
1877 {
1878   size_t i;
1879
1880   for (i = 0; i < list_of_commands_len; i++)
1881     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1882       return ((ssize_t) i);
1883   return (-1);
1884 } /* }}} ssize_t find_command_index */
1885
1886 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
1887     const char *cmd)
1888 {
1889   ssize_t i;
1890
1891   if (JOURNAL_REPLAY(sock))
1892     return (1);
1893
1894   if (cmd == NULL)
1895     return (-1);
1896
1897   if ((strcasecmp ("QUIT", cmd) == 0)
1898       || (strcasecmp ("HELP", cmd) == 0))
1899     return (1);
1900   else if (strcmp (".", cmd) == 0)
1901     cmd = "BATCH";
1902
1903   i = find_command_index (cmd);
1904   if (i < 0)
1905     return (-1);
1906   assert (i < 32);
1907
1908   if ((sock->permissions & (1 << i)) != 0)
1909     return (1);
1910   return (0);
1911 } /* }}} int socket_permission_check */
1912
1913 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
1914     const char *cmd)
1915 {
1916   ssize_t i;
1917
1918   i = find_command_index (cmd);
1919   if (i < 0)
1920     return (-1);
1921   assert (i < 32);
1922
1923   sock->permissions |= (1 << i);
1924   return (0);
1925 } /* }}} int socket_permission_add */
1926
1927 static void socket_permission_clear (listen_socket_t *sock) /* {{{ */
1928 {
1929   sock->permissions = 0;
1930 } /* }}} socket_permission_clear */
1931
1932 static void socket_permission_copy (listen_socket_t *dest, /* {{{ */
1933     listen_socket_t *src)
1934 {
1935   dest->permissions = src->permissions;
1936 } /* }}} socket_permission_copy */
1937
1938 /* check whether commands are received in the expected context */
1939 static int command_check_context(listen_socket_t *sock, command_t *cmd)
1940 {
1941   if (JOURNAL_REPLAY(sock))
1942     return (cmd->context & CMD_CONTEXT_JOURNAL);
1943   else if (sock->batch_start)
1944     return (cmd->context & CMD_CONTEXT_BATCH);
1945   else
1946     return (cmd->context & CMD_CONTEXT_CLIENT);
1947
1948   /* NOTREACHED */
1949   assert(1==0);
1950 }
1951
1952 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1953 {
1954   int status;
1955   char *cmd_str;
1956   char *resp_txt;
1957   command_t *help = NULL;
1958
1959   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1960   if (status == 0)
1961     help = find_command(cmd_str);
1962
1963   if (help && (help->syntax || help->help))
1964   {
1965     char tmp[CMD_MAX];
1966
1967     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1968     resp_txt = tmp;
1969
1970     if (help->syntax)
1971       add_response_info(sock, "Usage: %s\n", help->syntax);
1972
1973     if (help->help)
1974       add_response_info(sock, "%s\n", help->help);
1975   }
1976   else
1977   {
1978     size_t i;
1979
1980     resp_txt = "Command overview\n";
1981
1982     for (i = 0; i < list_of_commands_len; i++)
1983     {
1984       if (list_of_commands[i].syntax == NULL)
1985         continue;
1986       add_response_info (sock, "%s", list_of_commands[i].syntax);
1987     }
1988   }
1989
1990   return send_response(sock, RESP_OK, resp_txt);
1991 } /* }}} int handle_request_help */
1992
1993 static int handle_request (DISPATCH_PROTO) /* {{{ */
1994 {
1995   char *buffer_ptr = buffer;
1996   char *cmd_str = NULL;
1997   command_t *cmd = NULL;
1998   int status;
1999
2000   assert (buffer[buffer_size - 1] == '\0');
2001
2002   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
2003   if (status != 0)
2004   {
2005     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
2006     return (-1);
2007   }
2008
2009   if (sock != NULL && sock->batch_start)
2010     sock->batch_cmd++;
2011
2012   cmd = find_command(cmd_str);
2013   if (!cmd)
2014     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
2015
2016   if (!socket_permission_check (sock, cmd->cmd))
2017     return send_response(sock, RESP_ERR, "Permission denied.\n");
2018
2019   if (!command_check_context(sock, cmd))
2020     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
2021
2022   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
2023 } /* }}} int handle_request */
2024
2025 static void journal_set_free (journal_set *js) /* {{{ */
2026 {
2027   if (js == NULL)
2028     return;
2029
2030   rrd_free_ptrs((void ***) &js->files, &js->files_num);
2031
2032   free(js);
2033 } /* }}} journal_set_free */
2034
2035 static void journal_set_remove (journal_set *js) /* {{{ */
2036 {
2037   if (js == NULL)
2038     return;
2039
2040   for (uint i=0; i < js->files_num; i++)
2041   {
2042     RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
2043     unlink(js->files[i]);
2044   }
2045 } /* }}} journal_set_remove */
2046
2047 /* close current journal file handle.
2048  * MUST hold journal_lock before calling */
2049 static void journal_close(void) /* {{{ */
2050 {
2051   if (journal_fh != NULL)
2052   {
2053     if (fclose(journal_fh) != 0)
2054       RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
2055   }
2056
2057   journal_fh = NULL;
2058   journal_size = 0;
2059 } /* }}} journal_close */
2060
2061 /* MUST hold journal_lock before calling */
2062 static void journal_new_file(void) /* {{{ */
2063 {
2064   struct timeval now;
2065   int  new_fd;
2066   char new_file[PATH_MAX + 1];
2067
2068   assert(journal_dir != NULL);
2069   assert(journal_cur != NULL);
2070
2071   journal_close();
2072
2073   gettimeofday(&now, NULL);
2074   /* this format assures that the files sort in strcmp() order */
2075   snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
2076            journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
2077
2078   new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
2079                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
2080   if (new_fd < 0)
2081     goto error;
2082
2083   journal_fh = fdopen(new_fd, "a");
2084   if (journal_fh == NULL)
2085     goto error;
2086
2087   journal_size = ftell(journal_fh);
2088   RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
2089
2090   /* record the file in the journal set */
2091   rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
2092
2093   return;
2094
2095 error:
2096   RRDD_LOG(LOG_CRIT,
2097            "JOURNALING DISABLED: Error while trying to create %s : %s",
2098            new_file, rrd_strerror(errno));
2099   RRDD_LOG(LOG_CRIT,
2100            "JOURNALING DISABLED: All values will be flushed at shutdown");
2101
2102   close(new_fd);
2103   config_flush_at_shutdown = 1;
2104
2105 } /* }}} journal_new_file */
2106
2107 /* MUST NOT hold journal_lock before calling this */
2108 static void journal_rotate(void) /* {{{ */
2109 {
2110   journal_set *old_js = NULL;
2111
2112   if (journal_dir == NULL)
2113     return;
2114
2115   RRDD_LOG(LOG_DEBUG, "rotating journals");
2116
2117   pthread_mutex_lock(&stats_lock);
2118   ++stats_journal_rotate;
2119   pthread_mutex_unlock(&stats_lock);
2120
2121   pthread_mutex_lock(&journal_lock);
2122
2123   journal_close();
2124
2125   /* rotate the journal sets */
2126   old_js = journal_old;
2127   journal_old = journal_cur;
2128   journal_cur = calloc(1, sizeof(journal_set));
2129
2130   if (journal_cur != NULL)
2131     journal_new_file();
2132   else
2133     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2134
2135   pthread_mutex_unlock(&journal_lock);
2136
2137   journal_set_remove(old_js);
2138   journal_set_free  (old_js);
2139
2140 } /* }}} static void journal_rotate */
2141
2142 /* MUST hold journal_lock when calling */
2143 static void journal_done(void) /* {{{ */
2144 {
2145   if (journal_cur == NULL)
2146     return;
2147
2148   journal_close();
2149
2150   if (config_flush_at_shutdown)
2151   {
2152     RRDD_LOG(LOG_INFO, "removing journals");
2153     journal_set_remove(journal_old);
2154     journal_set_remove(journal_cur);
2155   }
2156   else
2157   {
2158     RRDD_LOG(LOG_INFO, "expedited shutdown; "
2159              "journals will be used at next startup");
2160   }
2161
2162   journal_set_free(journal_cur);
2163   journal_set_free(journal_old);
2164   free(journal_dir);
2165
2166 } /* }}} static void journal_done */
2167
2168 static int journal_write(char *cmd, char *args) /* {{{ */
2169 {
2170   int chars;
2171
2172   if (journal_fh == NULL)
2173     return 0;
2174
2175   pthread_mutex_lock(&journal_lock);
2176   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
2177   journal_size += chars;
2178
2179   if (journal_size > JOURNAL_MAX)
2180     journal_new_file();
2181
2182   pthread_mutex_unlock(&journal_lock);
2183
2184   if (chars > 0)
2185   {
2186     pthread_mutex_lock(&stats_lock);
2187     stats_journal_bytes += chars;
2188     pthread_mutex_unlock(&stats_lock);
2189   }
2190
2191   return chars;
2192 } /* }}} static int journal_write */
2193
2194 static int journal_replay (const char *file) /* {{{ */
2195 {
2196   FILE *fh;
2197   int entry_cnt = 0;
2198   int fail_cnt = 0;
2199   uint64_t line = 0;
2200   char entry[CMD_MAX];
2201   time_t now;
2202
2203   if (file == NULL) return 0;
2204
2205   {
2206     char *reason = "unknown error";
2207     int status = 0;
2208     struct stat statbuf;
2209
2210     memset(&statbuf, 0, sizeof(statbuf));
2211     if (stat(file, &statbuf) != 0)
2212     {
2213       reason = "stat error";
2214       status = errno;
2215     }
2216     else if (!S_ISREG(statbuf.st_mode))
2217     {
2218       reason = "not a regular file";
2219       status = EPERM;
2220     }
2221     if (statbuf.st_uid != daemon_uid)
2222     {
2223       reason = "not owned by daemon user";
2224       status = EACCES;
2225     }
2226     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2227     {
2228       reason = "must not be user/group writable";
2229       status = EACCES;
2230     }
2231
2232     if (status != 0)
2233     {
2234       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2235                file, rrd_strerror(status), reason);
2236       return 0;
2237     }
2238   }
2239
2240   fh = fopen(file, "r");
2241   if (fh == NULL)
2242   {
2243     if (errno != ENOENT)
2244       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2245                file, rrd_strerror(errno));
2246     return 0;
2247   }
2248   else
2249     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2250
2251   now = time(NULL);
2252
2253   while(!feof(fh))
2254   {
2255     size_t entry_len;
2256
2257     ++line;
2258     if (fgets(entry, sizeof(entry), fh) == NULL)
2259       break;
2260     entry_len = strlen(entry);
2261
2262     /* check \n termination in case journal writing crashed mid-line */
2263     if (entry_len == 0)
2264       continue;
2265     else if (entry[entry_len - 1] != '\n')
2266     {
2267       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2268       ++fail_cnt;
2269       continue;
2270     }
2271
2272     entry[entry_len - 1] = '\0';
2273
2274     if (handle_request(NULL, now, entry, entry_len) == 0)
2275       ++entry_cnt;
2276     else
2277       ++fail_cnt;
2278   }
2279
2280   fclose(fh);
2281
2282   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2283            entry_cnt, fail_cnt);
2284
2285   return entry_cnt > 0 ? 1 : 0;
2286 } /* }}} static int journal_replay */
2287
2288 static int journal_sort(const void *v1, const void *v2)
2289 {
2290   char **jn1 = (char **) v1;
2291   char **jn2 = (char **) v2;
2292
2293   return strcmp(*jn1,*jn2);
2294 }
2295
2296 static void journal_init(void) /* {{{ */
2297 {
2298   int had_journal = 0;
2299   DIR *dir;
2300   struct dirent *dent;
2301   char path[PATH_MAX+1];
2302
2303   if (journal_dir == NULL) return;
2304
2305   pthread_mutex_lock(&journal_lock);
2306
2307   journal_cur = calloc(1, sizeof(journal_set));
2308   if (journal_cur == NULL)
2309   {
2310     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2311     return;
2312   }
2313
2314   RRDD_LOG(LOG_INFO, "checking for journal files");
2315
2316   /* Handle old journal files during transition.  This gives them the
2317    * correct sort order.  TODO: remove after first release
2318    */
2319   {
2320     char old_path[PATH_MAX+1];
2321     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2322     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2323     rename(old_path, path);
2324
2325     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE        );
2326     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2327     rename(old_path, path);
2328   }
2329
2330   dir = opendir(journal_dir);
2331   if (!dir) {
2332     RRDD_LOG(LOG_CRIT, "journal_init: opendir(%s) failed\n", journal_dir);
2333     return;
2334   }
2335   while ((dent = readdir(dir)) != NULL)
2336   {
2337     /* looks like a journal file? */
2338     if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2339       continue;
2340
2341     snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2342
2343     if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2344     {
2345       RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2346                dent->d_name);
2347       break;
2348     }
2349   }
2350   closedir(dir);
2351
2352   qsort(journal_cur->files, journal_cur->files_num,
2353         sizeof(journal_cur->files[0]), journal_sort);
2354
2355   for (uint i=0; i < journal_cur->files_num; i++)
2356     had_journal += journal_replay(journal_cur->files[i]);
2357
2358   journal_new_file();
2359
2360   /* it must have been a crash.  start a flush */
2361   if (had_journal && config_flush_at_shutdown)
2362     flush_old_values(-1);
2363
2364   pthread_mutex_unlock(&journal_lock);
2365
2366   RRDD_LOG(LOG_INFO, "journal processing complete");
2367
2368 } /* }}} static void journal_init */
2369
2370 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2371 {
2372   assert(sock != NULL);
2373
2374   free(sock->rbuf);  sock->rbuf = NULL;
2375   free(sock->wbuf);  sock->wbuf = NULL;
2376   free(sock);
2377 } /* }}} void free_listen_socket */
2378
2379 static void close_connection(listen_socket_t *sock) /* {{{ */
2380 {
2381   if (sock->fd >= 0)
2382   {
2383     close(sock->fd);
2384     sock->fd = -1;
2385   }
2386
2387   free_listen_socket(sock);
2388
2389 } /* }}} void close_connection */
2390
2391 static void *connection_thread_main (void *args) /* {{{ */
2392 {
2393   listen_socket_t *sock;
2394   int fd;
2395
2396   sock = (listen_socket_t *) args;
2397   fd = sock->fd;
2398
2399   /* init read buffers */
2400   sock->next_read = sock->next_cmd = 0;
2401   sock->rbuf = malloc(RBUF_SIZE);
2402   if (sock->rbuf == NULL)
2403   {
2404     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2405     close_connection(sock);
2406     return NULL;
2407   }
2408
2409   pthread_mutex_lock (&connection_threads_lock);
2410   connection_threads_num++;
2411   pthread_mutex_unlock (&connection_threads_lock);
2412
2413   while (state == RUNNING)
2414   {
2415     char *cmd;
2416     ssize_t cmd_len;
2417     ssize_t rbytes;
2418     time_t now;
2419
2420     struct pollfd pollfd;
2421     int status;
2422
2423     pollfd.fd = fd;
2424     pollfd.events = POLLIN | POLLPRI;
2425     pollfd.revents = 0;
2426
2427     status = poll (&pollfd, 1, /* timeout = */ 500);
2428     if (state != RUNNING)
2429       break;
2430     else if (status == 0) /* timeout */
2431       continue;
2432     else if (status < 0) /* error */
2433     {
2434       status = errno;
2435       if (status != EINTR)
2436         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2437       continue;
2438     }
2439
2440     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2441       break;
2442     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2443     {
2444       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2445           "poll(2) returned something unexpected: %#04hx",
2446           pollfd.revents);
2447       break;
2448     }
2449
2450     rbytes = read(fd, sock->rbuf + sock->next_read,
2451                   RBUF_SIZE - sock->next_read);
2452     if (rbytes < 0)
2453     {
2454       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2455       break;
2456     }
2457     else if (rbytes == 0)
2458       break; /* eof */
2459
2460     sock->next_read += rbytes;
2461
2462     if (sock->batch_start)
2463       now = sock->batch_start;
2464     else
2465       now = time(NULL);
2466
2467     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2468     {
2469       status = handle_request (sock, now, cmd, cmd_len+1);
2470       if (status != 0)
2471         goto out_close;
2472     }
2473   }
2474
2475 out_close:
2476   close_connection(sock);
2477
2478   /* Remove this thread from the connection threads list */
2479   pthread_mutex_lock (&connection_threads_lock);
2480   connection_threads_num--;
2481   if (connection_threads_num <= 0)
2482     pthread_cond_broadcast(&connection_threads_done);
2483   pthread_mutex_unlock (&connection_threads_lock);
2484
2485   return (NULL);
2486 } /* }}} void *connection_thread_main */
2487
2488 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2489 {
2490   int fd;
2491   struct sockaddr_un sa;
2492   listen_socket_t *temp;
2493   int status;
2494   const char *path;
2495   char *path_copy, *dir;
2496
2497   path = sock->addr;
2498   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2499     path += strlen("unix:");
2500
2501   /* dirname may modify its argument */
2502   path_copy = strdup(path);
2503   if (path_copy == NULL)
2504   {
2505     fprintf(stderr, "rrdcached: strdup(): %s\n",
2506         rrd_strerror(errno));
2507     return (-1);
2508   }
2509
2510   dir = dirname(path_copy);
2511   if (rrd_mkdir_p(dir, 0777) != 0)
2512   {
2513     fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2514         dir, rrd_strerror(errno));
2515     return (-1);
2516   }
2517
2518   free(path_copy);
2519
2520   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2521       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2522   if (temp == NULL)
2523   {
2524     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2525     return (-1);
2526   }
2527   listen_fds = temp;
2528   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2529
2530   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2531   if (fd < 0)
2532   {
2533     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2534              rrd_strerror(errno));
2535     return (-1);
2536   }
2537
2538   memset (&sa, 0, sizeof (sa));
2539   sa.sun_family = AF_UNIX;
2540   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2541
2542   /* if we've gotten this far, we own the pid file.  any daemon started
2543    * with the same args must not be alive.  therefore, ensure that we can
2544    * create the socket...
2545    */
2546   unlink(path);
2547
2548   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2549   if (status != 0)
2550   {
2551     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2552              path, rrd_strerror(errno));
2553     close (fd);
2554     return (-1);
2555   }
2556
2557   /* tweak the sockets group ownership */
2558   if (sock->socket_group != (gid_t)-1)
2559   {
2560     if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2561          (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2562     {
2563       fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2564     }
2565   }
2566
2567   if (sock->socket_permissions != (mode_t)-1)
2568   {
2569     if (chmod(path, sock->socket_permissions) != 0)
2570       fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2571           (unsigned int)sock->socket_permissions, strerror(errno));
2572   }
2573
2574   status = listen (fd, /* backlog = */ 10);
2575   if (status != 0)
2576   {
2577     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2578              path, rrd_strerror(errno));
2579     close (fd);
2580     unlink (path);
2581     return (-1);
2582   }
2583
2584   listen_fds[listen_fds_num].fd = fd;
2585   listen_fds[listen_fds_num].family = PF_UNIX;
2586   strncpy(listen_fds[listen_fds_num].addr, path,
2587           sizeof (listen_fds[listen_fds_num].addr) - 1);
2588   listen_fds_num++;
2589
2590   return (0);
2591 } /* }}} int open_listen_socket_unix */
2592
2593 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2594 {
2595   struct addrinfo ai_hints;
2596   struct addrinfo *ai_res;
2597   struct addrinfo *ai_ptr;
2598   char addr_copy[NI_MAXHOST];
2599   char *addr;
2600   char *port;
2601   int status;
2602
2603   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2604   addr_copy[sizeof (addr_copy) - 1] = 0;
2605   addr = addr_copy;
2606
2607   memset (&ai_hints, 0, sizeof (ai_hints));
2608   ai_hints.ai_flags = 0;
2609 #ifdef AI_ADDRCONFIG
2610   ai_hints.ai_flags |= AI_ADDRCONFIG;
2611 #endif
2612   ai_hints.ai_family = AF_UNSPEC;
2613   ai_hints.ai_socktype = SOCK_STREAM;
2614
2615   port = NULL;
2616   if (*addr == '[') /* IPv6+port format */
2617   {
2618     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2619     addr++;
2620
2621     port = strchr (addr, ']');
2622     if (port == NULL)
2623     {
2624       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2625       return (-1);
2626     }
2627     *port = 0;
2628     port++;
2629
2630     if (*port == ':')
2631       port++;
2632     else if (*port == 0)
2633       port = NULL;
2634     else
2635     {
2636       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2637       return (-1);
2638     }
2639   } /* if (*addr == '[') */
2640   else
2641   {
2642     port = rindex(addr, ':');
2643     if (port != NULL)
2644     {
2645       *port = 0;
2646       port++;
2647     }
2648   }
2649   ai_res = NULL;
2650   status = getaddrinfo (addr,
2651                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2652                         &ai_hints, &ai_res);
2653   if (status != 0)
2654   {
2655     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2656              addr, gai_strerror (status));
2657     return (-1);
2658   }
2659
2660   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2661   {
2662     int fd;
2663     listen_socket_t *temp;
2664     int one = 1;
2665
2666     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2667         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2668     if (temp == NULL)
2669     {
2670       fprintf (stderr,
2671                "rrdcached: open_listen_socket_network: realloc failed.\n");
2672       continue;
2673     }
2674     listen_fds = temp;
2675     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2676
2677     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2678     if (fd < 0)
2679     {
2680       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2681                rrd_strerror(errno));
2682       continue;
2683     }
2684
2685     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2686
2687     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2688     if (status != 0)
2689     {
2690       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2691                sock->addr, rrd_strerror(errno));
2692       close (fd);
2693       continue;
2694     }
2695
2696     status = listen (fd, /* backlog = */ 10);
2697     if (status != 0)
2698     {
2699       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2700                sock->addr, rrd_strerror(errno));
2701       close (fd);
2702       freeaddrinfo(ai_res);
2703       return (-1);
2704     }
2705
2706     listen_fds[listen_fds_num].fd = fd;
2707     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2708     listen_fds_num++;
2709   } /* for (ai_ptr) */
2710
2711   freeaddrinfo(ai_res);
2712   return (0);
2713 } /* }}} static int open_listen_socket_network */
2714
2715 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2716 {
2717   assert(sock != NULL);
2718   assert(sock->addr != NULL);
2719
2720   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2721       || sock->addr[0] == '/')
2722     return (open_listen_socket_unix(sock));
2723   else
2724     return (open_listen_socket_network(sock));
2725 } /* }}} int open_listen_socket */
2726
2727 static int close_listen_sockets (void) /* {{{ */
2728 {
2729   size_t i;
2730
2731   for (i = 0; i < listen_fds_num; i++)
2732   {
2733     close (listen_fds[i].fd);
2734
2735     if (listen_fds[i].family == PF_UNIX)
2736       unlink(listen_fds[i].addr);
2737   }
2738
2739   free (listen_fds);
2740   listen_fds = NULL;
2741   listen_fds_num = 0;
2742
2743   return (0);
2744 } /* }}} int close_listen_sockets */
2745
2746 static void *listen_thread_main (void UNUSED(*args)) /* {{{ */
2747 {
2748   struct pollfd *pollfds;
2749   int pollfds_num;
2750   int status;
2751   int i;
2752
2753   if (listen_fds_num < 1)
2754   {
2755     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2756     return (NULL);
2757   }
2758
2759   pollfds_num = listen_fds_num;
2760   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2761   if (pollfds == NULL)
2762   {
2763     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2764     return (NULL);
2765   }
2766   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2767
2768   RRDD_LOG(LOG_INFO, "listening for connections");
2769
2770   while (state == RUNNING)
2771   {
2772     for (i = 0; i < pollfds_num; i++)
2773     {
2774       pollfds[i].fd = listen_fds[i].fd;
2775       pollfds[i].events = POLLIN | POLLPRI;
2776       pollfds[i].revents = 0;
2777     }
2778
2779     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2780     if (state != RUNNING)
2781       break;
2782     else if (status == 0) /* timeout */
2783       continue;
2784     else if (status < 0) /* error */
2785     {
2786       status = errno;
2787       if (status != EINTR)
2788       {
2789         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2790       }
2791       continue;
2792     }
2793
2794     for (i = 0; i < pollfds_num; i++)
2795     {
2796       listen_socket_t *client_sock;
2797       struct sockaddr_storage client_sa;
2798       socklen_t client_sa_size;
2799       pthread_t tid;
2800       pthread_attr_t attr;
2801
2802       if (pollfds[i].revents == 0)
2803         continue;
2804
2805       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2806       {
2807         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2808             "poll(2) returned something unexpected for listen FD #%i.",
2809             pollfds[i].fd);
2810         continue;
2811       }
2812
2813       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2814       if (client_sock == NULL)
2815       {
2816         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2817         continue;
2818       }
2819       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2820
2821       client_sa_size = sizeof (client_sa);
2822       client_sock->fd = accept (pollfds[i].fd,
2823           (struct sockaddr *) &client_sa, &client_sa_size);
2824       if (client_sock->fd < 0)
2825       {
2826         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2827         free(client_sock);
2828         continue;
2829       }
2830
2831       pthread_attr_init (&attr);
2832       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2833
2834       status = pthread_create (&tid, &attr, connection_thread_main,
2835                                client_sock);
2836       if (status != 0)
2837       {
2838         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2839         close_connection(client_sock);
2840         continue;
2841       }
2842     } /* for (pollfds_num) */
2843   } /* while (state == RUNNING) */
2844
2845   RRDD_LOG(LOG_INFO, "starting shutdown");
2846
2847   close_listen_sockets ();
2848
2849   pthread_mutex_lock (&connection_threads_lock);
2850   while (connection_threads_num > 0)
2851     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2852   pthread_mutex_unlock (&connection_threads_lock);
2853
2854   free(pollfds);
2855
2856   return (NULL);
2857 } /* }}} void *listen_thread_main */
2858
2859 static int daemonize (void) /* {{{ */
2860 {
2861   int pid_fd;
2862   char *base_dir;
2863
2864   daemon_uid = geteuid();
2865
2866   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2867   if (pid_fd < 0)
2868     pid_fd = check_pidfile();
2869   if (pid_fd < 0)
2870     return pid_fd;
2871
2872   /* open all the listen sockets */
2873   if (config_listen_address_list_len > 0)
2874   {
2875     for (size_t i = 0; i < config_listen_address_list_len; i++)
2876       open_listen_socket (config_listen_address_list[i]);
2877
2878     rrd_free_ptrs((void ***) &config_listen_address_list,
2879                   &config_listen_address_list_len);
2880   }
2881   else
2882   {
2883     strncpy(default_socket.addr, RRDCACHED_DEFAULT_ADDRESS,
2884         sizeof(default_socket.addr) - 1);
2885     default_socket.addr[sizeof(default_socket.addr) - 1] = '\0';
2886     open_listen_socket (&default_socket);
2887   }
2888
2889   if (listen_fds_num < 1)
2890   {
2891     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2892     goto error;
2893   }
2894
2895   if (!stay_foreground)
2896   {
2897     pid_t child;
2898
2899     child = fork ();
2900     if (child < 0)
2901     {
2902       fprintf (stderr, "daemonize: fork(2) failed.\n");
2903       goto error;
2904     }
2905     else if (child > 0)
2906       exit(0);
2907
2908     /* Become session leader */
2909     setsid ();
2910
2911     /* Open the first three file descriptors to /dev/null */
2912     close (2);
2913     close (1);
2914     close (0);
2915
2916     open ("/dev/null", O_RDWR);
2917     if (dup(0) == -1 || dup(0) == -1){
2918         RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2919     }
2920   } /* if (!stay_foreground) */
2921
2922   /* Change into the /tmp directory. */
2923   base_dir = (config_base_dir != NULL)
2924     ? config_base_dir
2925     : "/tmp";
2926
2927   if (chdir (base_dir) != 0)
2928   {
2929     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2930     goto error;
2931   }
2932
2933   install_signal_handlers();
2934
2935   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2936   RRDD_LOG(LOG_INFO, "starting up");
2937
2938   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2939                                 (GDestroyNotify) free_cache_item);
2940   if (cache_tree == NULL)
2941   {
2942     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2943     goto error;
2944   }
2945
2946   return write_pidfile (pid_fd);
2947
2948 error:
2949   remove_pidfile();
2950   return -1;
2951 } /* }}} int daemonize */
2952
2953 static int cleanup (void) /* {{{ */
2954 {
2955   pthread_cond_broadcast (&flush_cond);
2956   pthread_join (flush_thread, NULL);
2957
2958   pthread_cond_broadcast (&queue_cond);
2959   for (int i = 0; i < config_queue_threads; i++)
2960     pthread_join (queue_threads[i], NULL);
2961
2962   if (config_flush_at_shutdown)
2963   {
2964     assert(cache_queue_head == NULL);
2965     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2966   }
2967
2968   free(queue_threads);
2969   free(config_base_dir);
2970
2971   pthread_mutex_lock(&cache_lock);
2972   g_tree_destroy(cache_tree);
2973
2974   pthread_mutex_lock(&journal_lock);
2975   journal_done();
2976
2977   RRDD_LOG(LOG_INFO, "goodbye");
2978   closelog ();
2979
2980   remove_pidfile ();
2981   free(config_pid_file);
2982
2983   return (0);
2984 } /* }}} int cleanup */
2985
2986 static int read_options (int argc, char **argv) /* {{{ */
2987 {
2988   int option;
2989   int status = 0;
2990
2991   socket_permission_clear (&default_socket);
2992
2993   default_socket.socket_group = (gid_t)-1;
2994   default_socket.socket_permissions = (mode_t)-1;
2995
2996   while ((option = getopt(argc, argv, "gl:s:m:P:f:w:z:t:Bb:p:Fj:a:h?")) != -1)
2997   {
2998     switch (option)
2999     {
3000       case 'g':
3001         stay_foreground=1;
3002         break;
3003
3004       case 'l':
3005       {
3006         listen_socket_t *new;
3007
3008         new = malloc(sizeof(listen_socket_t));
3009         if (new == NULL)
3010         {
3011           fprintf(stderr, "read_options: malloc failed.\n");
3012           return(2);
3013         }
3014         memset(new, 0, sizeof(listen_socket_t));
3015
3016         strncpy(new->addr, optarg, sizeof(new->addr)-1);
3017
3018         /* Add permissions to the socket {{{ */
3019         if (default_socket.permissions != 0)
3020         {
3021           socket_permission_copy (new, &default_socket);
3022         }
3023         else /* if (default_socket.permissions == 0) */
3024         {
3025           /* Add permission for ALL commands to the socket. */
3026           size_t i;
3027           for (i = 0; i < list_of_commands_len; i++)
3028           {
3029             status = socket_permission_add (new, list_of_commands[i].cmd);
3030             if (status != 0)
3031             {
3032               fprintf (stderr, "read_options: Adding permission \"%s\" to "
3033                   "socket failed. This should never happen, ever! Sorry.\n",
3034                   list_of_commands[i].cmd);
3035               status = 4;
3036             }
3037           }
3038         }
3039         /* }}} Done adding permissions. */
3040
3041         new->socket_group = default_socket.socket_group;
3042         new->socket_permissions = default_socket.socket_permissions;
3043
3044         if (!rrd_add_ptr((void ***)&config_listen_address_list,
3045                          &config_listen_address_list_len, new))
3046         {
3047           fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
3048           return (2);
3049         }
3050       }
3051       break;
3052
3053       /* set socket group permissions */
3054       case 's':
3055       {
3056         gid_t group_gid;
3057         struct group *grp;
3058
3059         group_gid = strtoul(optarg, NULL, 10);
3060         if (errno != EINVAL && group_gid>0)
3061         {
3062           /* we were passed a number */
3063           grp = getgrgid(group_gid);
3064         }
3065         else
3066         {
3067           grp = getgrnam(optarg);
3068         }
3069
3070         if (grp)
3071         {
3072           default_socket.socket_group = grp->gr_gid;
3073         }
3074         else
3075         {
3076           /* no idea what the user wanted... */
3077           fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
3078           return (5);
3079         }
3080       }
3081       break;
3082
3083       /* set socket file permissions */
3084       case 'm':
3085       {
3086         long  tmp;
3087         char *endptr = NULL;
3088
3089         tmp = strtol (optarg, &endptr, 8);
3090         if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
3091             || (tmp > 07777) || (tmp < 0)) {
3092           fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
3093               optarg);
3094           return (5);
3095         }
3096
3097         default_socket.socket_permissions = (mode_t)tmp;
3098       }
3099       break;
3100
3101       case 'P':
3102       {
3103         char *optcopy;
3104         char *saveptr;
3105         char *dummy;
3106         char *ptr;
3107
3108         socket_permission_clear (&default_socket);
3109
3110         optcopy = strdup (optarg);
3111         dummy = optcopy;
3112         saveptr = NULL;
3113         while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
3114         {
3115           dummy = NULL;
3116           status = socket_permission_add (&default_socket, ptr);
3117           if (status != 0)
3118           {
3119             fprintf (stderr, "read_options: Adding permission \"%s\" to "
3120                 "socket failed. Most likely, this permission doesn't "
3121                 "exist. Check your command line.\n", ptr);
3122             status = 4;
3123           }
3124         }
3125
3126         free (optcopy);
3127       }
3128       break;
3129
3130       case 'f':
3131       {
3132         int temp;
3133
3134         temp = atoi (optarg);
3135         if (temp > 0)
3136           config_flush_interval = temp;
3137         else
3138         {
3139           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
3140           status = 3;
3141         }
3142       }
3143       break;
3144
3145       case 'w':
3146       {
3147         int temp;
3148
3149         temp = atoi (optarg);
3150         if (temp > 0)
3151           config_write_interval = temp;
3152         else
3153         {
3154           fprintf (stderr, "Invalid write interval: %s\n", optarg);
3155           status = 2;
3156         }
3157       }
3158       break;
3159
3160       case 'z':
3161       {
3162         int temp;
3163
3164         temp = atoi(optarg);
3165         if (temp > 0)
3166           config_write_jitter = temp;
3167         else
3168         {
3169           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
3170           status = 2;
3171         }
3172
3173         break;
3174       }
3175
3176       case 't':
3177       {
3178         int threads;
3179         threads = atoi(optarg);
3180         if (threads >= 1)
3181           config_queue_threads = threads;
3182         else
3183         {
3184           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
3185           return 1;
3186         }
3187       }
3188       break;
3189
3190       case 'B':
3191         config_write_base_only = 1;
3192         break;
3193
3194       case 'b':
3195       {
3196         size_t len;
3197         char base_realpath[PATH_MAX];
3198
3199         if (config_base_dir != NULL)
3200           free (config_base_dir);
3201         config_base_dir = strdup (optarg);
3202         if (config_base_dir == NULL)
3203         {
3204           fprintf (stderr, "read_options: strdup failed.\n");
3205           return (3);
3206         }
3207
3208         if (rrd_mkdir_p (config_base_dir, 0777) != 0)
3209         {
3210           fprintf (stderr, "Failed to create base directory '%s': %s\n",
3211               config_base_dir, rrd_strerror (errno));
3212           return (3);
3213         }
3214
3215         /* make sure that the base directory is not resolved via
3216          * symbolic links.  this makes some performance-enhancing
3217          * assumptions possible (we don't have to resolve paths
3218          * that start with a "/")
3219          */
3220         if (realpath(config_base_dir, base_realpath) == NULL)
3221         {
3222           fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3223               "%s\n", config_base_dir, rrd_strerror(errno));
3224           return 5;
3225         }
3226
3227         len = strlen (config_base_dir);
3228         while ((len > 0) && (config_base_dir[len - 1] == '/'))
3229         {
3230           config_base_dir[len - 1] = 0;
3231           len--;
3232         }
3233
3234         if (len < 1)
3235         {
3236           fprintf (stderr, "Invalid base directory: %s\n", optarg);
3237           return (4);
3238         }
3239
3240         _config_base_dir_len = len;
3241
3242         len = strlen (base_realpath);
3243         while ((len > 0) && (base_realpath[len - 1] == '/'))
3244         {
3245           base_realpath[len - 1] = '\0';
3246           len--;
3247         }
3248
3249         if (strncmp(config_base_dir,
3250                          base_realpath, sizeof(base_realpath)) != 0)
3251         {
3252           fprintf(stderr,
3253                   "Base directory (-b) resolved via file system links!\n"
3254                   "Please consult rrdcached '-b' documentation!\n"
3255                   "Consider specifying the real directory (%s)\n",
3256                   base_realpath);
3257           return 5;
3258         }
3259       }
3260       break;
3261
3262       case 'p':
3263       {
3264         if (config_pid_file != NULL)
3265           free (config_pid_file);
3266         config_pid_file = strdup (optarg);
3267         if (config_pid_file == NULL)
3268         {
3269           fprintf (stderr, "read_options: strdup failed.\n");
3270           return (3);
3271         }
3272       }
3273       break;
3274
3275       case 'F':
3276         config_flush_at_shutdown = 1;
3277         break;
3278
3279       case 'j':
3280       {
3281         char journal_dir_actual[PATH_MAX];
3282         const char *dir;
3283         dir = journal_dir = strdup(realpath((const char *)optarg, journal_dir_actual));
3284
3285         status = rrd_mkdir_p(dir, 0777);
3286         if (status != 0)
3287         {
3288           fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3289               dir, rrd_strerror(errno));
3290           return 6;
3291         }
3292
3293         if (access(dir, R_OK|W_OK|X_OK) != 0)
3294         {
3295           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3296                   errno ? rrd_strerror(errno) : "");
3297           return 6;
3298         }
3299       }
3300       break;
3301
3302       case 'a':
3303       {
3304         int temp = atoi(optarg);
3305         if (temp > 0)
3306           config_alloc_chunk = temp;
3307         else
3308         {
3309           fprintf(stderr, "Invalid allocation size: %s\n", optarg);
3310           return 10;
3311         }
3312       }
3313       break;
3314
3315       case 'h':
3316       case '?':
3317         printf ("RRDCacheD %s\n"
3318             "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3319             "\n"
3320             "Usage: rrdcached [options]\n"
3321             "\n"
3322             "Valid options are:\n"
3323             "  -l <address>  Socket address to listen to.\n"
3324             "  -P <perms>    Sets the permissions to assign to all following "
3325                             "sockets\n"
3326             "  -w <seconds>  Interval in which to write data.\n"
3327             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
3328             "  -t <threads>  Number of write threads.\n"
3329             "  -f <seconds>  Interval in which to flush dead data.\n"
3330             "  -p <file>     Location of the PID-file.\n"
3331             "  -b <dir>      Base directory to change to.\n"
3332             "  -B            Restrict file access to paths within -b <dir>\n"
3333             "  -g            Do not fork and run in the foreground.\n"
3334             "  -j <dir>      Directory in which to create the journal files.\n"
3335             "  -F            Always flush all updates at shutdown\n"
3336             "  -s <id|name>  Group owner of all following UNIX sockets\n"
3337             "                (the socket will also have read/write permissions "
3338                             "for that group)\n"
3339             "  -m <mode>     File permissions (octal) of all following UNIX "
3340                             "sockets\n"
3341             "  -a <size>     Memory allocation chunk size. Default is 1."
3342             "\n"
3343             "For more information and a detailed description of all options "
3344             "please refer\n"
3345             "to the rrdcached(1) manual page.\n",
3346             VERSION);
3347         if (option == 'h')
3348           status = -1;
3349         else
3350           status = 1;
3351         break;
3352     } /* switch (option) */
3353   } /* while (getopt) */
3354
3355   /* advise the user when values are not sane */
3356   if (config_flush_interval < 2 * config_write_interval)
3357     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3358             " 2x write interval (-w) !\n");
3359   if (config_write_jitter > config_write_interval)
3360     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3361             " write interval (-w) !\n");
3362
3363   if (config_write_base_only && config_base_dir == NULL)
3364     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3365             "  Consult the rrdcached documentation\n");
3366
3367   if (journal_dir == NULL)
3368     config_flush_at_shutdown = 1;
3369
3370   return (status);
3371 } /* }}} int read_options */
3372
3373 int main (int argc, char **argv)
3374 {
3375   int status;
3376
3377   status = read_options (argc, argv);
3378   if (status != 0)
3379   {
3380     if (status < 0)
3381       status = 0;
3382     return (status);
3383   }
3384
3385   status = daemonize ();
3386   if (status != 0)
3387   {
3388     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3389     return (1);
3390   }
3391
3392   journal_init();
3393
3394   /* start the queue threads */
3395   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3396   if (queue_threads == NULL)
3397   {
3398     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3399     cleanup();
3400     return (1);
3401   }
3402   for (int i = 0; i < config_queue_threads; i++)
3403   {
3404     memset (&queue_threads[i], 0, sizeof (*queue_threads));
3405     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3406     if (status != 0)
3407     {
3408       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3409       cleanup();
3410       return (1);
3411     }
3412   }
3413
3414   /* start the flush thread */
3415   memset(&flush_thread, 0, sizeof(flush_thread));
3416   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3417   if (status != 0)
3418   {
3419     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3420     cleanup();
3421     return (1);
3422   }
3423
3424   listen_thread_main (NULL);
3425   cleanup ();
3426
3427   return (0);
3428 } /* int main */
3429
3430 /*
3431  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3432  */