2eb69301d179f8b5653ed599c3f99d80d4c7eb94
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008-2010 Florian octo Forster
4  * Copyright (C) 2008,2009 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
73
74 #include "rrd_tool.h"
75 #include "rrd_client.h"
76 #include "unused.h"
77
78 #include <stdlib.h>
79
80 #ifndef WIN32
81 #ifdef HAVE_STDINT_H
82 #  include <stdint.h>
83 #endif
84 #include <unistd.h>
85 #include <strings.h>
86 #include <inttypes.h>
87 #include <sys/socket.h>
88
89 #else
90
91 #endif
92 #include <stdio.h>
93 #include <string.h>
94
95 #include <sys/types.h>
96 #include <sys/stat.h>
97 #include <dirent.h>
98 #include <fcntl.h>
99 #include <signal.h>
100 #include <sys/un.h>
101 #include <netdb.h>
102 #include <poll.h>
103 #include <syslog.h>
104 #include <pthread.h>
105 #include <errno.h>
106 #include <assert.h>
107 #include <sys/time.h>
108 #include <time.h>
109 #include <libgen.h>
110 #include <grp.h>
111
112 #ifdef HAVE_LIBWRAP
113 #include <tcpd.h>
114 #endif /* HAVE_LIBWRAP */
115
116 #include <glib-2.0/glib.h>
117 /* }}} */
118
119 #define RRDD_LOG(severity, ...) \
120   do { \
121     if (stay_foreground) { \
122       fprintf(stderr, __VA_ARGS__); \
123       fprintf(stderr, "\n"); } \
124     syslog ((severity), __VA_ARGS__); \
125   } while (0)
126
127 /*
128  * Types
129  */
130 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
131
132 struct listen_socket_s
133 {
134   int fd;
135   char addr[PATH_MAX + 1];
136   int family;
137
138   /* state for BATCH processing */
139   time_t batch_start;
140   int batch_cmd;
141
142   /* buffered IO */
143   char *rbuf;
144   off_t next_cmd;
145   off_t next_read;
146
147   char *wbuf;
148   ssize_t wbuf_len;
149
150   uint32_t permissions;
151
152   gid_t  socket_group;
153   mode_t socket_permissions;
154 };
155 typedef struct listen_socket_s listen_socket_t;
156
157 struct command_s;
158 typedef struct command_s command_t;
159 /* note: guard against "unused" warnings in the handlers */
160 #define DISPATCH_PROTO  listen_socket_t UNUSED(*sock),\
161                         time_t UNUSED(now),\
162                         char  UNUSED(*buffer),\
163                         size_t UNUSED(buffer_size)
164
165 #define HANDLER_PROTO   command_t UNUSED(*cmd),\
166                         DISPATCH_PROTO
167
168 struct command_s {
169   char   *cmd;
170   int (*handler)(HANDLER_PROTO);
171
172   char  context;                /* where we expect to see it */
173 #define CMD_CONTEXT_CLIENT      (1<<0)
174 #define CMD_CONTEXT_BATCH       (1<<1)
175 #define CMD_CONTEXT_JOURNAL     (1<<2)
176 #define CMD_CONTEXT_ANY         (0x7f)
177
178   char *syntax;
179   char *help;
180 };
181
182 struct cache_item_s;
183 typedef struct cache_item_s cache_item_t;
184 struct cache_item_s
185 {
186   char *file;
187   char **values;
188   size_t values_num;            /* number of valid pointers */
189   size_t values_alloc;          /* number of allocated pointers */
190   time_t last_flush_time;
191   double last_update_stamp;
192 #define CI_FLAGS_IN_TREE  (1<<0)
193 #define CI_FLAGS_IN_QUEUE (1<<1)
194   int flags;
195   pthread_cond_t  flushed;
196   cache_item_t *prev;
197   cache_item_t *next;
198 };
199
200 struct callback_flush_data_s
201 {
202   time_t now;
203   time_t abs_timeout;
204   char **keys;
205   size_t keys_num;
206 };
207 typedef struct callback_flush_data_s callback_flush_data_t;
208
209 enum queue_side_e
210 {
211   HEAD,
212   TAIL
213 };
214 typedef enum queue_side_e queue_side_t;
215
216 /* describe a set of journal files */
217 typedef struct {
218   char **files;
219   size_t files_num;
220 } journal_set;
221
222 #define RBUF_SIZE (RRD_CMD_MAX*2)
223
224 /*
225  * Variables
226  */
227 static int stay_foreground = 0;
228 static uid_t daemon_uid;
229
230 static listen_socket_t *listen_fds = NULL;
231 static size_t listen_fds_num = 0;
232
233 static listen_socket_t default_socket;
234
235 enum {
236   RUNNING,              /* normal operation */
237   FLUSHING,             /* flushing remaining values */
238   SHUTDOWN              /* shutting down */
239 } state = RUNNING;
240
241 static pthread_t *queue_threads;
242 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
243 static int config_queue_threads = 4;
244
245 static pthread_t flush_thread;
246 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
247
248 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
249 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
250 static int connection_threads_num = 0;
251
252 /* Cache stuff */
253 static GTree          *cache_tree = NULL;
254 static cache_item_t   *cache_queue_head = NULL;
255 static cache_item_t   *cache_queue_tail = NULL;
256 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
257
258 static int config_write_interval = 300;
259 static int config_write_jitter   = 0;
260 static int config_flush_interval = 3600;
261 static int config_flush_at_shutdown = 0;
262 static char *config_pid_file = NULL;
263 static char *config_base_dir = NULL;
264 static size_t _config_base_dir_len = 0;
265 static int config_write_base_only = 0;
266 static size_t config_alloc_chunk = 1;
267
268 static listen_socket_t **config_listen_address_list = NULL;
269 static size_t config_listen_address_list_len = 0;
270
271 static uint64_t stats_queue_length = 0;
272 static uint64_t stats_updates_received = 0;
273 static uint64_t stats_flush_received = 0;
274 static uint64_t stats_updates_written = 0;
275 static uint64_t stats_data_sets_written = 0;
276 static uint64_t stats_journal_bytes = 0;
277 static uint64_t stats_journal_rotate = 0;
278 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
279
280 static int opt_no_overwrite = 0; /* default for the daemon */
281
282 /* Journaled updates */
283 #define JOURNAL_REPLAY(s) ((s) == NULL)
284 #define JOURNAL_BASE "rrd.journal"
285 static journal_set *journal_cur = NULL;
286 static journal_set *journal_old = NULL;
287 static char *journal_dir = NULL;
288 static FILE *journal_fh = NULL;         /* current journal file handle */
289 static long  journal_size = 0;          /* current journal size */
290 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
291 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
292 static int journal_write(char *cmd, char *args);
293 static void journal_done(void);
294 static void journal_rotate(void);
295
296 /* prototypes for forward refernces */
297 static int handle_request_help (HANDLER_PROTO);
298
299 /* 
300  * Functions
301  */
302 static void sig_common (const char *sig) /* {{{ */
303 {
304   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
305   state = FLUSHING;
306   pthread_cond_broadcast(&flush_cond);
307   pthread_cond_broadcast(&queue_cond);
308 } /* }}} void sig_common */
309
310 static void sig_int_handler (int UNUSED(s)) /* {{{ */
311 {
312   sig_common("INT");
313 } /* }}} void sig_int_handler */
314
315 static void sig_term_handler (int UNUSED(s)) /* {{{ */
316 {
317   sig_common("TERM");
318 } /* }}} void sig_term_handler */
319
320 static void sig_usr1_handler (int UNUSED(s)) /* {{{ */
321 {
322   config_flush_at_shutdown = 1;
323   sig_common("USR1");
324 } /* }}} void sig_usr1_handler */
325
326 static void sig_usr2_handler (int UNUSED(s)) /* {{{ */
327 {
328   config_flush_at_shutdown = 0;
329   sig_common("USR2");
330 } /* }}} void sig_usr2_handler */
331
332 static void install_signal_handlers(void) /* {{{ */
333 {
334   /* These structures are static, because `sigaction' behaves weird if the are
335    * overwritten.. */
336   static struct sigaction sa_int;
337   static struct sigaction sa_term;
338   static struct sigaction sa_pipe;
339   static struct sigaction sa_usr1;
340   static struct sigaction sa_usr2;
341
342   /* Install signal handlers */
343   memset (&sa_int, 0, sizeof (sa_int));
344   sa_int.sa_handler = sig_int_handler;
345   sigaction (SIGINT, &sa_int, NULL);
346
347   memset (&sa_term, 0, sizeof (sa_term));
348   sa_term.sa_handler = sig_term_handler;
349   sigaction (SIGTERM, &sa_term, NULL);
350
351   memset (&sa_pipe, 0, sizeof (sa_pipe));
352   sa_pipe.sa_handler = SIG_IGN;
353   sigaction (SIGPIPE, &sa_pipe, NULL);
354
355   memset (&sa_pipe, 0, sizeof (sa_usr1));
356   sa_usr1.sa_handler = sig_usr1_handler;
357   sigaction (SIGUSR1, &sa_usr1, NULL);
358
359   memset (&sa_usr2, 0, sizeof (sa_usr2));
360   sa_usr2.sa_handler = sig_usr2_handler;
361   sigaction (SIGUSR2, &sa_usr2, NULL);
362
363 } /* }}} void install_signal_handlers */
364
365 static int open_pidfile(char *action, int oflag) /* {{{ */
366 {
367   int fd;
368   const char *file;
369   char *file_copy, *dir;
370
371   file = (config_pid_file != NULL)
372     ? config_pid_file
373     : LOCALSTATEDIR "/run/rrdcached.pid";
374
375   /* dirname may modify its argument */
376   file_copy = strdup(file);
377   if (file_copy == NULL)
378   {
379     fprintf(stderr, "rrdcached: strdup(): %s\n",
380         rrd_strerror(errno));
381     return -1;
382   }
383
384   dir = dirname(file_copy);
385   if (rrd_mkdir_p(dir, 0777) != 0)
386   {
387     fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
388         dir, rrd_strerror(errno));
389     return -1;
390   }
391
392   free(file_copy);
393
394   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
395   if (fd < 0)
396     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
397             action, file, rrd_strerror(errno));
398
399   return(fd);
400 } /* }}} static int open_pidfile */
401
402 /* check existing pid file to see whether a daemon is running */
403 static int check_pidfile(void)
404 {
405   int pid_fd;
406   pid_t pid;
407   char pid_str[16];
408
409   pid_fd = open_pidfile("open", O_RDWR);
410   if (pid_fd < 0)
411     return pid_fd;
412
413   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
414     return -1;
415
416   pid = atoi(pid_str);
417   if (pid <= 0)
418     return -1;
419
420   /* another running process that we can signal COULD be
421    * a competing rrdcached */
422   if (pid != getpid() && kill(pid, 0) == 0)
423   {
424     fprintf(stderr,
425             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
426     close(pid_fd);
427     return -1;
428   }
429
430   lseek(pid_fd, 0, SEEK_SET);
431   if (ftruncate(pid_fd, 0) == -1)
432   {
433     fprintf(stderr,
434             "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
435     close(pid_fd);
436     return -1;
437   }
438
439   fprintf(stderr,
440           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
441           "rrdcached: starting normally.\n", pid);
442
443   return pid_fd;
444 } /* }}} static int check_pidfile */
445
446 static int write_pidfile (int fd) /* {{{ */
447 {
448   pid_t pid;
449   FILE *fh;
450
451   pid = getpid ();
452
453   fh = fdopen (fd, "w");
454   if (fh == NULL)
455   {
456     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
457     close(fd);
458     return (-1);
459   }
460
461   fprintf (fh, "%i\n", (int) pid);
462   fclose (fh);
463
464   return (0);
465 } /* }}} int write_pidfile */
466
467 static int remove_pidfile (void) /* {{{ */
468 {
469   char *file;
470   int status;
471
472   file = (config_pid_file != NULL)
473     ? config_pid_file
474     : LOCALSTATEDIR "/run/rrdcached.pid";
475
476   status = unlink (file);
477   if (status == 0)
478     return (0);
479   return (errno);
480 } /* }}} int remove_pidfile */
481
482 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
483 {
484   char *eol;
485
486   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
487                sock->next_read - sock->next_cmd);
488
489   if (eol == NULL)
490   {
491     /* no commands left, move remainder back to front of rbuf */
492     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
493             sock->next_read - sock->next_cmd);
494     sock->next_read -= sock->next_cmd;
495     sock->next_cmd = 0;
496     *len = 0;
497     return NULL;
498   }
499   else
500   {
501     char *cmd = sock->rbuf + sock->next_cmd;
502     *eol = '\0';
503
504     sock->next_cmd = eol - sock->rbuf + 1;
505
506     if (eol > sock->rbuf && *(eol-1) == '\r')
507       *(--eol) = '\0'; /* handle "\r\n" EOL */
508
509     *len = eol - cmd;
510
511     return cmd;
512   }
513
514   /* NOTREACHED */
515   assert(1==0);
516 } /* }}} char *next_cmd */
517
518 /* add the characters directly to the write buffer */
519 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
520 {
521   char *new_buf;
522
523   assert(sock != NULL);
524
525   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
526   if (new_buf == NULL)
527   {
528     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
529     return -1;
530   }
531
532   strncpy(new_buf + sock->wbuf_len, str, len + 1);
533
534   sock->wbuf = new_buf;
535   sock->wbuf_len += len;
536
537   return 0;
538 } /* }}} static int add_to_wbuf */
539
540 /* add the text to the "extra" info that's sent after the status line */
541 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
542 {
543   va_list argp;
544   char buffer[RRD_CMD_MAX];
545   int len;
546
547   if (JOURNAL_REPLAY(sock)) return 0;
548   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
549
550   va_start(argp, fmt);
551 #ifdef HAVE_VSNPRINTF
552   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
553 #else
554   len = vsprintf(buffer, fmt, argp);
555 #endif
556   va_end(argp);
557   if (len < 0)
558   {
559     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
560     return -1;
561   }
562
563   return add_to_wbuf(sock, buffer, len);
564 } /* }}} static int add_response_info */
565
566 static int count_lines(char *str) /* {{{ */
567 {
568   int lines = 0;
569
570   if (str != NULL)
571   {
572     while ((str = strchr(str, '\n')) != NULL)
573     {
574       ++lines;
575       ++str;
576     }
577   }
578
579   return lines;
580 } /* }}} static int count_lines */
581
582 /* send the response back to the user.
583  * returns 0 on success, -1 on error
584  * write buffer is always zeroed after this call */
585 static int send_response (listen_socket_t *sock, response_code rc,
586                           char *fmt, ...) /* {{{ */
587 {
588   va_list argp;
589   char buffer[RRD_CMD_MAX];
590   int lines;
591   ssize_t wrote;
592   int rclen, len;
593
594   if (JOURNAL_REPLAY(sock)) return rc;
595
596   if (sock->batch_start)
597   {
598     if (rc == RESP_OK)
599       return rc; /* no response on success during BATCH */
600     lines = sock->batch_cmd;
601   }
602   else if (rc == RESP_OK)
603     lines = count_lines(sock->wbuf);
604   else
605     lines = -1;
606
607   rclen = sprintf(buffer, "%d ", lines);
608   va_start(argp, fmt);
609 #ifdef HAVE_VSNPRINTF
610   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
611 #else
612   len = vsprintf(buffer+rclen, fmt, argp);
613 #endif
614   va_end(argp);
615   if (len < 0)
616     return -1;
617
618   len += rclen;
619
620   /* append the result to the wbuf, don't write to the user */
621   if (sock->batch_start)
622     return add_to_wbuf(sock, buffer, len);
623
624   /* first write must be complete */
625   if (len != write(sock->fd, buffer, len))
626   {
627     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
628     return -1;
629   }
630
631   if (sock->wbuf != NULL && rc == RESP_OK)
632   {
633     wrote = 0;
634     while (wrote < sock->wbuf_len)
635     {
636       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
637       if (wb <= 0)
638       {
639         RRDD_LOG(LOG_INFO, "send_response: could not write results");
640         return -1;
641       }
642       wrote += wb;
643     }
644   }
645
646   free(sock->wbuf); sock->wbuf = NULL;
647   sock->wbuf_len = 0;
648
649   return 0;
650 } /* }}} */
651
652 static void wipe_ci_values(cache_item_t *ci, time_t when)
653 {
654   ci->values = NULL;
655   ci->values_num = 0;
656   ci->values_alloc = 0;
657
658   ci->last_flush_time = when;
659   if (config_write_jitter > 0)
660     ci->last_flush_time += (rrd_random() % config_write_jitter);
661 }
662
663 /* remove_from_queue
664  * remove a "cache_item_t" item from the queue.
665  * must hold 'cache_lock' when calling this
666  */
667 static void remove_from_queue(cache_item_t *ci) /* {{{ */
668 {
669   if (ci == NULL) return;
670   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
671
672   if (ci->prev == NULL)
673     cache_queue_head = ci->next; /* reset head */
674   else
675     ci->prev->next = ci->next;
676
677   if (ci->next == NULL)
678     cache_queue_tail = ci->prev; /* reset the tail */
679   else
680     ci->next->prev = ci->prev;
681
682   ci->next = ci->prev = NULL;
683   ci->flags &= ~CI_FLAGS_IN_QUEUE;
684
685   pthread_mutex_lock (&stats_lock);
686   assert (stats_queue_length > 0);
687   stats_queue_length--;
688   pthread_mutex_unlock (&stats_lock);
689
690 } /* }}} static void remove_from_queue */
691
692 /* free the resources associated with the cache_item_t
693  * must hold cache_lock when calling this function
694  */
695 static void *free_cache_item(cache_item_t *ci) /* {{{ */
696 {
697   if (ci == NULL) return NULL;
698
699   remove_from_queue(ci);
700
701   for (size_t i=0; i < ci->values_num; i++)
702     free(ci->values[i]);
703
704   free (ci->values);
705   free (ci->file);
706
707   /* in case anyone is waiting */
708   pthread_cond_broadcast(&ci->flushed);
709   pthread_cond_destroy(&ci->flushed);
710
711   free (ci);
712
713   return NULL;
714 } /* }}} static void *free_cache_item */
715
716 /*
717  * enqueue_cache_item:
718  * `cache_lock' must be acquired before calling this function!
719  */
720 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
721     queue_side_t side)
722 {
723   if (ci == NULL)
724     return (-1);
725
726   if (ci->values_num == 0)
727     return (0);
728
729   if (side == HEAD)
730   {
731     if (cache_queue_head == ci)
732       return 0;
733
734     /* remove if further down in queue */
735     remove_from_queue(ci);
736
737     ci->prev = NULL;
738     ci->next = cache_queue_head;
739     if (ci->next != NULL)
740       ci->next->prev = ci;
741     cache_queue_head = ci;
742
743     if (cache_queue_tail == NULL)
744       cache_queue_tail = cache_queue_head;
745   }
746   else /* (side == TAIL) */
747   {
748     /* We don't move values back in the list.. */
749     if (ci->flags & CI_FLAGS_IN_QUEUE)
750       return (0);
751
752     assert (ci->next == NULL);
753     assert (ci->prev == NULL);
754
755     ci->prev = cache_queue_tail;
756
757     if (cache_queue_tail == NULL)
758       cache_queue_head = ci;
759     else
760       cache_queue_tail->next = ci;
761
762     cache_queue_tail = ci;
763   }
764
765   ci->flags |= CI_FLAGS_IN_QUEUE;
766
767   pthread_cond_signal(&queue_cond);
768   pthread_mutex_lock (&stats_lock);
769   stats_queue_length++;
770   pthread_mutex_unlock (&stats_lock);
771
772   return (0);
773 } /* }}} int enqueue_cache_item */
774
775 /*
776  * tree_callback_flush:
777  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
778  * while this is in progress.
779  */
780 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
781     gpointer data)
782 {
783   cache_item_t *ci;
784   callback_flush_data_t *cfd;
785
786   ci = (cache_item_t *) value;
787   cfd = (callback_flush_data_t *) data;
788
789   if (ci->flags & CI_FLAGS_IN_QUEUE)
790     return FALSE;
791
792   if (ci->values_num > 0
793       && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
794   {
795     enqueue_cache_item (ci, TAIL);
796   }
797   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
798       && (ci->values_num <= 0))
799   {
800     assert ((char *) key == ci->file);
801     if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
802     {
803       RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
804       return (FALSE);
805     }
806   }
807
808   return (FALSE);
809 } /* }}} gboolean tree_callback_flush */
810
811 static int flush_old_values (int max_age)
812 {
813   callback_flush_data_t cfd;
814   size_t k;
815
816   memset (&cfd, 0, sizeof (cfd));
817   /* Pass the current time as user data so that we don't need to call
818    * `time' for each node. */
819   cfd.now = time (NULL);
820   cfd.keys = NULL;
821   cfd.keys_num = 0;
822
823   if (max_age > 0)
824     cfd.abs_timeout = cfd.now - max_age;
825   else
826     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
827
828   /* `tree_callback_flush' will return the keys of all values that haven't
829    * been touched in the last `config_flush_interval' seconds in `cfd'.
830    * The char*'s in this array point to the same memory as ci->file, so we
831    * don't need to free them separately. */
832   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
833
834   for (k = 0; k < cfd.keys_num; k++)
835   {
836     gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
837     /* should never fail, since we have held the cache_lock
838      * the entire time */
839     assert(status == TRUE);
840   }
841
842   if (cfd.keys != NULL)
843   {
844     free (cfd.keys);
845     cfd.keys = NULL;
846   }
847
848   return (0);
849 } /* int flush_old_values */
850
851 static void *flush_thread_main (void UNUSED(*args)) /* {{{ */
852 {
853   struct timeval now;
854   struct timespec next_flush;
855   int status;
856
857   gettimeofday (&now, NULL);
858   next_flush.tv_sec = now.tv_sec + config_flush_interval;
859   next_flush.tv_nsec = 1000 * now.tv_usec;
860
861   pthread_mutex_lock(&cache_lock);
862
863   while (state == RUNNING)
864   {
865     gettimeofday (&now, NULL);
866     if ((now.tv_sec > next_flush.tv_sec)
867         || ((now.tv_sec == next_flush.tv_sec)
868           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
869     {
870       RRDD_LOG(LOG_DEBUG, "flushing old values");
871
872       /* Determine the time of the next cache flush. */
873       next_flush.tv_sec = now.tv_sec + config_flush_interval;
874
875       /* Flush all values that haven't been written in the last
876        * `config_write_interval' seconds. */
877       flush_old_values (config_write_interval);
878
879       /* unlock the cache while we rotate so we don't block incoming
880        * updates if the fsync() blocks on disk I/O */
881       pthread_mutex_unlock(&cache_lock);
882       journal_rotate();
883       pthread_mutex_lock(&cache_lock);
884     }
885
886     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
887     if (status != 0 && status != ETIMEDOUT)
888     {
889       RRDD_LOG (LOG_ERR, "flush_thread_main: "
890                 "pthread_cond_timedwait returned %i.", status);
891     }
892   }
893
894   if (config_flush_at_shutdown)
895     flush_old_values (-1); /* flush everything */
896
897   state = SHUTDOWN;
898
899   pthread_mutex_unlock(&cache_lock);
900
901   return NULL;
902 } /* void *flush_thread_main */
903
904 static void *queue_thread_main (void UNUSED(*args)) /* {{{ */
905 {
906   pthread_mutex_lock (&cache_lock);
907
908   while (state != SHUTDOWN
909          || (cache_queue_head != NULL && config_flush_at_shutdown))
910   {
911     cache_item_t *ci;
912     char *file;
913     char **values;
914     size_t values_num;
915     int status;
916
917     /* Now, check if there's something to store away. If not, wait until
918      * something comes in. */
919     if (cache_queue_head == NULL)
920     {
921       status = pthread_cond_wait (&queue_cond, &cache_lock);
922       if ((status != 0) && (status != ETIMEDOUT))
923       {
924         RRDD_LOG (LOG_ERR, "queue_thread_main: "
925             "pthread_cond_wait returned %i.", status);
926       }
927     }
928
929     /* Check if a value has arrived. This may be NULL if we timed out or there
930      * was an interrupt such as a signal. */
931     if (cache_queue_head == NULL)
932       continue;
933
934     ci = cache_queue_head;
935
936     /* copy the relevant parts */
937     file = strdup (ci->file);
938     if (file == NULL)
939     {
940       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
941       continue;
942     }
943
944     assert(ci->values != NULL);
945     assert(ci->values_num > 0);
946
947     values = ci->values;
948     values_num = ci->values_num;
949
950     wipe_ci_values(ci, time(NULL));
951     remove_from_queue(ci);
952
953     pthread_mutex_unlock (&cache_lock);
954
955     rrd_clear_error ();
956     status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
957     if (status != 0)
958     {
959       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
960           "rrd_update_r (%s) failed with status %i. (%s)",
961           file, status, rrd_get_error());
962     }
963
964     journal_write("wrote", file);
965
966     /* Search again in the tree.  It's possible someone issued a "FORGET"
967      * while we were writing the update values. */
968     pthread_mutex_lock(&cache_lock);
969     ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
970     if (ci)
971       pthread_cond_broadcast(&ci->flushed);
972     pthread_mutex_unlock(&cache_lock);
973
974     if (status == 0)
975     {
976       pthread_mutex_lock (&stats_lock);
977       stats_updates_written++;
978       stats_data_sets_written += values_num;
979       pthread_mutex_unlock (&stats_lock);
980     }
981
982     rrd_free_ptrs((void ***) &values, &values_num);
983     free(file);
984
985     pthread_mutex_lock (&cache_lock);
986   }
987   pthread_mutex_unlock (&cache_lock);
988
989   return (NULL);
990 } /* }}} void *queue_thread_main */
991
992 static int buffer_get_field (char **buffer_ret, /* {{{ */
993     size_t *buffer_size_ret, char **field_ret)
994 {
995   char *buffer;
996   size_t buffer_pos;
997   size_t buffer_size;
998   char *field;
999   size_t field_size;
1000   int status;
1001
1002   buffer = *buffer_ret;
1003   buffer_pos = 0;
1004   buffer_size = *buffer_size_ret;
1005   field = *buffer_ret;
1006   field_size = 0;
1007
1008   if (buffer_size <= 0)
1009     return (-1);
1010
1011   /* This is ensured by `handle_request'. */
1012   assert (buffer[buffer_size - 1] == '\0');
1013
1014   status = -1;
1015   while (buffer_pos < buffer_size)
1016   {
1017     /* Check for end-of-field or end-of-buffer */
1018     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1019     {
1020       field[field_size] = 0;
1021       field_size++;
1022       buffer_pos++;
1023       status = 0;
1024       break;
1025     }
1026     /* Handle escaped characters. */
1027     else if (buffer[buffer_pos] == '\\')
1028     {
1029       if (buffer_pos >= (buffer_size - 1))
1030         break;
1031       buffer_pos++;
1032       field[field_size] = buffer[buffer_pos];
1033       field_size++;
1034       buffer_pos++;
1035     }
1036     /* Normal operation */ 
1037     else
1038     {
1039       field[field_size] = buffer[buffer_pos];
1040       field_size++;
1041       buffer_pos++;
1042     }
1043   } /* while (buffer_pos < buffer_size) */
1044
1045   if (status != 0)
1046     return (status);
1047
1048   *buffer_ret = buffer + buffer_pos;
1049   *buffer_size_ret = buffer_size - buffer_pos;
1050   *field_ret = field;
1051
1052   return (0);
1053 } /* }}} int buffer_get_field */
1054
1055 /* if we're restricting writes to the base directory,
1056  * check whether the file falls within the dir
1057  * returns 1 if OK, otherwise 0
1058  */
1059 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1060 {
1061   assert(file != NULL);
1062
1063   if (!config_write_base_only
1064       || JOURNAL_REPLAY(sock)
1065       || config_base_dir == NULL)
1066     return 1;
1067
1068   if (strstr(file, "../") != NULL) goto err;
1069
1070   /* relative paths without "../" are ok */
1071   if (*file != '/') return 1;
1072
1073   /* file must be of the format base + "/" + <1+ char filename> */
1074   if (strlen(file) < _config_base_dir_len + 2) goto err;
1075   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1076   if (*(file + _config_base_dir_len) != '/') goto err;
1077
1078   return 1;
1079
1080 err:
1081   if (sock != NULL && sock->fd >= 0)
1082     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1083
1084   return 0;
1085 } /* }}} static int check_file_access */
1086
1087 /* when using a base dir, convert relative paths to absolute paths.
1088  * if necessary, modifies the "filename" pointer to point
1089  * to the new path created in "tmp".  "tmp" is provided
1090  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1091  *
1092  * this allows us to optimize for the expected case (absolute path)
1093  * with a no-op.
1094  */
1095 static void get_abs_path(char **filename, char *tmp)
1096 {
1097   assert(tmp != NULL);
1098   assert(filename != NULL && *filename != NULL);
1099
1100   if (config_base_dir == NULL || **filename == '/')
1101     return;
1102
1103   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1104   *filename = tmp;
1105 } /* }}} static int get_abs_path */
1106
1107 static int flush_file (const char *filename) /* {{{ */
1108 {
1109   cache_item_t *ci;
1110
1111   pthread_mutex_lock (&cache_lock);
1112
1113   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1114   if (ci == NULL)
1115   {
1116     pthread_mutex_unlock (&cache_lock);
1117     return (ENOENT);
1118   }
1119
1120   if (ci->values_num > 0)
1121   {
1122     /* Enqueue at head */
1123     enqueue_cache_item (ci, HEAD);
1124     pthread_cond_wait(&ci->flushed, &cache_lock);
1125   }
1126
1127   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1128    * may have been purged during our cond_wait() */
1129
1130   pthread_mutex_unlock(&cache_lock);
1131
1132   return (0);
1133 } /* }}} int flush_file */
1134
1135 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1136 {
1137   char *err = "Syntax error.\n";
1138
1139   if (cmd && cmd->syntax)
1140     err = cmd->syntax;
1141
1142   return send_response(sock, RESP_ERR, "Usage: %s", err);
1143 } /* }}} static int syntax_error() */
1144
1145 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1146 {
1147   uint64_t copy_queue_length;
1148   uint64_t copy_updates_received;
1149   uint64_t copy_flush_received;
1150   uint64_t copy_updates_written;
1151   uint64_t copy_data_sets_written;
1152   uint64_t copy_journal_bytes;
1153   uint64_t copy_journal_rotate;
1154
1155   uint64_t tree_nodes_number;
1156   uint64_t tree_depth;
1157
1158   pthread_mutex_lock (&stats_lock);
1159   copy_queue_length       = stats_queue_length;
1160   copy_updates_received   = stats_updates_received;
1161   copy_flush_received     = stats_flush_received;
1162   copy_updates_written    = stats_updates_written;
1163   copy_data_sets_written  = stats_data_sets_written;
1164   copy_journal_bytes      = stats_journal_bytes;
1165   copy_journal_rotate     = stats_journal_rotate;
1166   pthread_mutex_unlock (&stats_lock);
1167
1168   pthread_mutex_lock (&cache_lock);
1169   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1170   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1171   pthread_mutex_unlock (&cache_lock);
1172
1173   add_response_info(sock,
1174                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1175   add_response_info(sock,
1176                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1177   add_response_info(sock,
1178                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1179   add_response_info(sock,
1180                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1181   add_response_info(sock,
1182                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1183   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1184   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1185   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1186   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1187
1188   send_response(sock, RESP_OK, "Statistics follow\n");
1189
1190   return (0);
1191 } /* }}} int handle_request_stats */
1192
1193 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1194 {
1195   char *file, file_tmp[PATH_MAX];
1196   int status;
1197
1198   status = buffer_get_field (&buffer, &buffer_size, &file);
1199   if (status != 0)
1200   {
1201     return syntax_error(sock,cmd);
1202   }
1203   else
1204   {
1205     pthread_mutex_lock(&stats_lock);
1206     stats_flush_received++;
1207     pthread_mutex_unlock(&stats_lock);
1208
1209     get_abs_path(&file, file_tmp);
1210     if (!check_file_access(file, sock)) return 0;
1211
1212     status = flush_file (file);
1213     if (status == 0)
1214       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1215     else if (status == ENOENT)
1216     {
1217       /* no file in our tree; see whether it exists at all */
1218       struct stat statbuf;
1219
1220       memset(&statbuf, 0, sizeof(statbuf));
1221       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1222         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1223       else
1224         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1225     }
1226     else if (status < 0)
1227       return send_response(sock, RESP_ERR, "Internal error.\n");
1228     else
1229       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1230   }
1231
1232   /* NOTREACHED */
1233   assert(1==0);
1234 } /* }}} int handle_request_flush */
1235
1236 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1237 {
1238   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1239
1240   pthread_mutex_lock(&cache_lock);
1241   flush_old_values(-1);
1242   pthread_mutex_unlock(&cache_lock);
1243
1244   return send_response(sock, RESP_OK, "Started flush.\n");
1245 } /* }}} static int handle_request_flushall */
1246
1247 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1248 {
1249   int status;
1250   char *file, file_tmp[PATH_MAX];
1251   cache_item_t *ci;
1252
1253   status = buffer_get_field(&buffer, &buffer_size, &file);
1254   if (status != 0)
1255     return syntax_error(sock,cmd);
1256
1257   get_abs_path(&file, file_tmp);
1258
1259   pthread_mutex_lock(&cache_lock);
1260   ci = g_tree_lookup(cache_tree, file);
1261   if (ci == NULL)
1262   {
1263     pthread_mutex_unlock(&cache_lock);
1264     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1265   }
1266
1267   for (size_t i=0; i < ci->values_num; i++)
1268     add_response_info(sock, "%s\n", ci->values[i]);
1269
1270   pthread_mutex_unlock(&cache_lock);
1271   return send_response(sock, RESP_OK, "updates pending\n");
1272 } /* }}} static int handle_request_pending */
1273
1274 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1275 {
1276   int status;
1277   gboolean found;
1278   char *file, file_tmp[PATH_MAX];
1279
1280   status = buffer_get_field(&buffer, &buffer_size, &file);
1281   if (status != 0)
1282     return syntax_error(sock,cmd);
1283
1284   get_abs_path(&file, file_tmp);
1285   if (!check_file_access(file, sock)) return 0;
1286
1287   pthread_mutex_lock(&cache_lock);
1288   found = g_tree_remove(cache_tree, file);
1289   pthread_mutex_unlock(&cache_lock);
1290
1291   if (found == TRUE)
1292   {
1293     if (!JOURNAL_REPLAY(sock))
1294       journal_write("forget", file);
1295
1296     return send_response(sock, RESP_OK, "Gone!\n");
1297   }
1298   else
1299     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1300
1301   /* NOTREACHED */
1302   assert(1==0);
1303 } /* }}} static int handle_request_forget */
1304
1305 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1306 {
1307   cache_item_t *ci;
1308
1309   pthread_mutex_lock(&cache_lock);
1310
1311   ci = cache_queue_head;
1312   while (ci != NULL)
1313   {
1314     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1315     ci = ci->next;
1316   }
1317
1318   pthread_mutex_unlock(&cache_lock);
1319
1320   return send_response(sock, RESP_OK, "in queue.\n");
1321 } /* }}} int handle_request_queue */
1322
1323 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1324 {
1325   char *file, file_tmp[PATH_MAX];
1326   int values_num = 0;
1327   int status;
1328   char orig_buf[RRD_CMD_MAX];
1329
1330   cache_item_t *ci;
1331
1332   /* save it for the journal later */
1333   if (!JOURNAL_REPLAY(sock))
1334     strncpy(orig_buf, buffer, buffer_size);
1335
1336   status = buffer_get_field (&buffer, &buffer_size, &file);
1337   if (status != 0)
1338     return syntax_error(sock,cmd);
1339
1340   pthread_mutex_lock(&stats_lock);
1341   stats_updates_received++;
1342   pthread_mutex_unlock(&stats_lock);
1343
1344   get_abs_path(&file, file_tmp);
1345   if (!check_file_access(file, sock)) return 0;
1346
1347   pthread_mutex_lock (&cache_lock);
1348   ci = g_tree_lookup (cache_tree, file);
1349
1350   if (ci == NULL) /* {{{ */
1351   {
1352     struct stat statbuf;
1353     cache_item_t *tmp;
1354
1355     /* don't hold the lock while we setup; stat(2) might block */
1356     pthread_mutex_unlock(&cache_lock);
1357
1358     memset (&statbuf, 0, sizeof (statbuf));
1359     status = stat (file, &statbuf);
1360     if (status != 0)
1361     {
1362       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1363
1364       status = errno;
1365       if (status == ENOENT)
1366         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1367       else
1368         return send_response(sock, RESP_ERR,
1369                              "stat failed with error %i.\n", status);
1370     }
1371     if (!S_ISREG (statbuf.st_mode))
1372       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1373
1374     if (access(file, R_OK|W_OK) != 0)
1375       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1376                            file, rrd_strerror(errno));
1377
1378     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1379     if (ci == NULL)
1380     {
1381       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1382
1383       return send_response(sock, RESP_ERR, "malloc failed.\n");
1384     }
1385     memset (ci, 0, sizeof (cache_item_t));
1386
1387     ci->file = strdup (file);
1388     if (ci->file == NULL)
1389     {
1390       free (ci);
1391       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1392
1393       return send_response(sock, RESP_ERR, "strdup failed.\n");
1394     }
1395
1396     wipe_ci_values(ci, now);
1397     ci->flags = CI_FLAGS_IN_TREE;
1398     pthread_cond_init(&ci->flushed, NULL);
1399
1400     pthread_mutex_lock(&cache_lock);
1401
1402     /* another UPDATE might have added this entry in the meantime */
1403     tmp = g_tree_lookup (cache_tree, file);
1404     if (tmp == NULL)
1405       g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1406     else
1407     {
1408       free_cache_item (ci);
1409       ci = tmp;
1410     }
1411
1412     /* state may have changed while we were unlocked */
1413     if (state == SHUTDOWN)
1414       return -1;
1415   } /* }}} */
1416   assert (ci != NULL);
1417
1418   /* don't re-write updates in replay mode */
1419   if (!JOURNAL_REPLAY(sock))
1420     journal_write("update", orig_buf);
1421
1422   while (buffer_size > 0)
1423   {
1424     char *value;
1425     double stamp;
1426     char *eostamp;
1427
1428     status = buffer_get_field (&buffer, &buffer_size, &value);
1429     if (status != 0)
1430     {
1431       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1432       break;
1433     }
1434
1435     /* make sure update time is always moving forward. We use double here since
1436        update does support subsecond precision for timestamps ... */
1437     stamp = strtod(value, &eostamp);
1438     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1439     {
1440       pthread_mutex_unlock(&cache_lock);
1441       return send_response(sock, RESP_ERR,
1442                            "Cannot find timestamp in '%s'!\n", value);
1443     }
1444     else if (stamp <= ci->last_update_stamp)
1445     {
1446       pthread_mutex_unlock(&cache_lock);
1447       return send_response(sock, RESP_ERR,
1448                            "illegal attempt to update using time %lf when last"
1449                            " update time is %lf (minimum one second step)\n",
1450                            stamp, ci->last_update_stamp);
1451     }
1452     else
1453       ci->last_update_stamp = stamp;
1454
1455     if (!rrd_add_strdup_chunk(&ci->values, &ci->values_num, value,
1456                               &ci->values_alloc, config_alloc_chunk))
1457     {
1458       RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1459       continue;
1460     }
1461
1462     values_num++;
1463   }
1464
1465   if (((now - ci->last_flush_time) >= config_write_interval)
1466       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1467       && (ci->values_num > 0))
1468   {
1469     enqueue_cache_item (ci, TAIL);
1470   }
1471
1472   pthread_mutex_unlock (&cache_lock);
1473
1474   if (values_num < 1)
1475     return send_response(sock, RESP_ERR, "No values updated.\n");
1476   else
1477     return send_response(sock, RESP_OK,
1478                          "errors, enqueued %i value(s).\n", values_num);
1479
1480   /* NOTREACHED */
1481   assert(1==0);
1482
1483 } /* }}} int handle_request_update */
1484
1485 static int handle_request_fetch (HANDLER_PROTO) /* {{{ */
1486 {
1487   char *file, file_tmp[PATH_MAX];
1488   char *cf;
1489
1490   char *start_str;
1491   char *end_str;
1492   time_t start_tm;
1493   time_t end_tm;
1494
1495   unsigned long step;
1496   unsigned long ds_cnt;
1497   char **ds_namv;
1498   rrd_value_t *data;
1499
1500   int status;
1501   unsigned long i;
1502   time_t t;
1503   rrd_value_t *data_ptr;
1504
1505   file = NULL;
1506   cf = NULL;
1507   start_str = NULL;
1508   end_str = NULL;
1509
1510   /* Read the arguments */
1511   do /* while (0) */
1512   {
1513     status = buffer_get_field (&buffer, &buffer_size, &file);
1514     if (status != 0)
1515       break;
1516
1517     status = buffer_get_field (&buffer, &buffer_size, &cf);
1518     if (status != 0)
1519       break;
1520
1521     status = buffer_get_field (&buffer, &buffer_size, &start_str);
1522     if (status != 0)
1523     {
1524       start_str = NULL;
1525       status = 0;
1526       break;
1527     }
1528
1529     status = buffer_get_field (&buffer, &buffer_size, &end_str);
1530     if (status != 0)
1531     {
1532       end_str = NULL;
1533       status = 0;
1534       break;
1535     }
1536   } while (0);
1537
1538   if (status != 0)
1539     return (syntax_error(sock,cmd));
1540
1541   get_abs_path(&file, file_tmp);
1542   if (!check_file_access(file, sock)) return 0;
1543
1544   status = flush_file (file);
1545   if ((status != 0) && (status != ENOENT))
1546     return (send_response (sock, RESP_ERR,
1547           "flush_file (%s) failed with status %i.\n", file, status));
1548
1549   t = time (NULL); /* "now" */
1550
1551   /* Parse start time */
1552   if (start_str != NULL)
1553   {
1554     char *endptr;
1555     long value;
1556
1557     endptr = NULL;
1558     errno = 0;
1559     value = strtol (start_str, &endptr, /* base = */ 0);
1560     if ((endptr == start_str) || (errno != 0))
1561       return (send_response(sock, RESP_ERR,
1562             "Cannot parse start time `%s': Only simple integers are allowed.\n",
1563             start_str));
1564
1565     if (value > 0)
1566       start_tm = (time_t) value;
1567     else
1568       start_tm = (time_t) (t + value);
1569   }
1570   else
1571   {
1572     start_tm = t - 86400;
1573   }
1574
1575   /* Parse end time */
1576   if (end_str != NULL)
1577   {
1578     char *endptr;
1579     long value;
1580
1581     endptr = NULL;
1582     errno = 0;
1583     value = strtol (end_str, &endptr, /* base = */ 0);
1584     if ((endptr == end_str) || (errno != 0))
1585       return (send_response(sock, RESP_ERR,
1586             "Cannot parse end time `%s': Only simple integers are allowed.\n",
1587             end_str));
1588
1589     if (value > 0)
1590       end_tm = (time_t) value;
1591     else
1592       end_tm = (time_t) (t + value);
1593   }
1594   else
1595   {
1596     end_tm = t;
1597   }
1598
1599   step = -1;
1600   ds_cnt = 0;
1601   ds_namv = NULL;
1602   data = NULL;
1603
1604   status = rrd_fetch_r (file, cf, &start_tm, &end_tm, &step,
1605       &ds_cnt, &ds_namv, &data);
1606   if (status != 0)
1607     return (send_response(sock, RESP_ERR,
1608           "rrd_fetch_r failed: %s\n", rrd_get_error ()));
1609
1610   add_response_info (sock, "FlushVersion: %lu\n", 1);
1611   add_response_info (sock, "Start: %lu\n", (unsigned long) start_tm);
1612   add_response_info (sock, "End: %lu\n", (unsigned long) end_tm);
1613   add_response_info (sock, "Step: %lu\n", step);
1614   add_response_info (sock, "DSCount: %lu\n", ds_cnt);
1615
1616 #define SSTRCAT(buffer,str,buffer_fill) do { \
1617     size_t str_len = strlen (str); \
1618     if ((buffer_fill + str_len) > sizeof (buffer)) \
1619       str_len = sizeof (buffer) - buffer_fill; \
1620     if (str_len > 0) { \
1621       strncpy (buffer + buffer_fill, str, str_len); \
1622       buffer_fill += str_len; \
1623       assert (buffer_fill <= sizeof (buffer)); \
1624       if (buffer_fill == sizeof (buffer)) \
1625         buffer[buffer_fill - 1] = 0; \
1626       else \
1627         buffer[buffer_fill] = 0; \
1628     } \
1629   } while (0)
1630
1631   { /* Add list of DS names */
1632     char linebuf[1024];
1633     size_t linebuf_fill;
1634
1635     memset (linebuf, 0, sizeof (linebuf));
1636     linebuf_fill = 0;
1637     for (i = 0; i < ds_cnt; i++)
1638     {
1639       if (i > 0)
1640         SSTRCAT (linebuf, " ", linebuf_fill);
1641       SSTRCAT (linebuf, ds_namv[i], linebuf_fill);
1642       rrd_freemem(ds_namv[i]);
1643     }
1644     rrd_freemem(ds_namv);
1645     add_response_info (sock, "DSName: %s\n", linebuf);
1646   }
1647
1648   /* Add the actual data */
1649   assert (step > 0);
1650   data_ptr = data;
1651   for (t = start_tm + step; t <= end_tm; t += step)
1652   {
1653     char linebuf[1024];
1654     size_t linebuf_fill;
1655     char tmp[128];
1656
1657     memset (linebuf, 0, sizeof (linebuf));
1658     linebuf_fill = 0;
1659     for (i = 0; i < ds_cnt; i++)
1660     {
1661       snprintf (tmp, sizeof (tmp), " %0.10e", *data_ptr);
1662       tmp[sizeof (tmp) - 1] = 0;
1663       SSTRCAT (linebuf, tmp, linebuf_fill);
1664
1665       data_ptr++;
1666     }
1667
1668     add_response_info (sock, "%10lu:%s\n", (unsigned long) t, linebuf);
1669   } /* for (t) */
1670   rrd_freemem(data);
1671
1672   return (send_response (sock, RESP_OK, "Success\n"));
1673 #undef SSTRCAT
1674 } /* }}} int handle_request_fetch */
1675
1676 /* we came across a "WROTE" entry during journal replay.
1677  * throw away any values that we have accumulated for this file
1678  */
1679 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1680 {
1681   cache_item_t *ci;
1682   const char *file = buffer;
1683
1684   pthread_mutex_lock(&cache_lock);
1685
1686   ci = g_tree_lookup(cache_tree, file);
1687   if (ci == NULL)
1688   {
1689     pthread_mutex_unlock(&cache_lock);
1690     return (0);
1691   }
1692
1693   if (ci->values)
1694     rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1695
1696   wipe_ci_values(ci, now);
1697   remove_from_queue(ci);
1698
1699   pthread_mutex_unlock(&cache_lock);
1700   return (0);
1701 } /* }}} int handle_request_wrote */
1702
1703 static int handle_request_info (HANDLER_PROTO) /* {{{ */
1704 {
1705   char *file, file_tmp[PATH_MAX];
1706   int status;
1707   rrd_info_t *info;
1708
1709   /* obtain filename */
1710   status = buffer_get_field(&buffer, &buffer_size, &file);
1711   if (status != 0)
1712     return syntax_error(sock,cmd);
1713   /* get full pathname */
1714   get_abs_path(&file, file_tmp);
1715   if (!check_file_access(file, sock)) {
1716     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1717   }
1718   /* get data */
1719   rrd_clear_error ();
1720   info = rrd_info_r(file);
1721   if(!info) {
1722     return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1723   }
1724   for (rrd_info_t *data = info; data != NULL; data = data->next) {
1725       switch (data->type) {
1726       case RD_I_VAL:
1727           if (isnan(data->value.u_val))
1728               add_response_info(sock,"%s %d NaN\n",data->key, data->type);
1729           else
1730               add_response_info(sock,"%s %d %0.10e\n", data->key, data->type, data->value.u_val);
1731           break;
1732       case RD_I_CNT:
1733           add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_cnt);
1734           break;
1735       case RD_I_INT:
1736           add_response_info(sock,"%s %d %d\n", data->key, data->type, data->value.u_int);
1737           break;
1738       case RD_I_STR:
1739           add_response_info(sock,"%s %d %s\n", data->key, data->type, data->value.u_str);
1740           break;
1741       case RD_I_BLO:
1742           add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_blo.size);
1743           break;
1744       }
1745   }
1746
1747   rrd_info_free(info);
1748
1749   return send_response(sock, RESP_OK, "Info for %s follows\n",file);
1750 } /* }}} static int handle_request_info  */
1751
1752 static int handle_request_first (HANDLER_PROTO) /* {{{ */
1753 {
1754   char *i, *file, file_tmp[PATH_MAX];
1755   int status;
1756   int idx;
1757   time_t t;
1758
1759   /* obtain filename */
1760   status = buffer_get_field(&buffer, &buffer_size, &file);
1761   if (status != 0)
1762     return syntax_error(sock,cmd);
1763   /* get full pathname */
1764   get_abs_path(&file, file_tmp);
1765   if (!check_file_access(file, sock)) {
1766     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1767   }
1768
1769   status = buffer_get_field(&buffer, &buffer_size, &i);
1770   if (status != 0)
1771     return syntax_error(sock,cmd);
1772   idx = atoi(i);
1773   if(idx<0) { 
1774     return send_response(sock, RESP_ERR, "Invalid index specified (%d)\n", idx);
1775   }
1776
1777   /* get data */
1778   rrd_clear_error ();
1779   t = rrd_first_r(file,idx);
1780   if(t<1) {
1781     return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1782   }
1783   return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1784 } /* }}} static int handle_request_first  */
1785
1786
1787 static int handle_request_last (HANDLER_PROTO) /* {{{ */
1788 {
1789   char *file, file_tmp[PATH_MAX];
1790   int status;
1791   time_t t, from_file, step;
1792   rrd_file_t * rrd_file;
1793   cache_item_t * ci;
1794   rrd_t rrd; 
1795
1796   /* obtain filename */
1797   status = buffer_get_field(&buffer, &buffer_size, &file);
1798   if (status != 0)
1799     return syntax_error(sock,cmd);
1800   /* get full pathname */
1801   get_abs_path(&file, file_tmp);
1802   if (!check_file_access(file, sock)) {
1803     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1804   }
1805   rrd_clear_error();
1806   rrd_init(&rrd);
1807   rrd_file = rrd_open(file,&rrd,RRD_READONLY);
1808   if(!rrd_file) {
1809     return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1810   }
1811   from_file = rrd.live_head->last_up;
1812   step = rrd.stat_head->pdp_step;
1813   rrd_close(rrd_file);
1814   pthread_mutex_lock(&cache_lock);
1815   ci = g_tree_lookup(cache_tree, file);
1816   if (ci)
1817     t = ci->last_update_stamp;
1818   else
1819     t = from_file;
1820   pthread_mutex_unlock(&cache_lock);
1821   t -= t % step;
1822   rrd_free(&rrd);
1823   if(t<1) {
1824     return send_response(sock, RESP_ERR, "Error: rrdcached: Invalid timestamp returned\n");
1825   }
1826   return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1827 } /* }}} static int handle_request_last  */
1828
1829 static int handle_request_create (HANDLER_PROTO) /* {{{ */
1830 {
1831   char *file, file_tmp[PATH_MAX];
1832   char *tok;
1833   int ac = 0;
1834   char *av[128];
1835   int status;
1836   unsigned long step = 300;
1837   time_t last_up = time(NULL)-10;
1838   int no_overwrite = opt_no_overwrite;
1839
1840
1841   /* obtain filename */
1842   status = buffer_get_field(&buffer, &buffer_size, &file);
1843   if (status != 0)
1844     return syntax_error(sock,cmd);
1845   /* get full pathname */
1846   get_abs_path(&file, file_tmp);
1847   if (!check_file_access(file, sock)) {
1848     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1849   }
1850   RRDD_LOG(LOG_INFO, "rrdcreate request for %s",file);
1851
1852   while ((status = buffer_get_field(&buffer, &buffer_size, &tok)) == 0 && tok) {
1853     if( ! strncmp(tok,"-b",2) ) {
1854       status = buffer_get_field(&buffer, &buffer_size, &tok );
1855       if (status != 0) return syntax_error(sock,cmd);
1856       last_up = (time_t) atol(tok);
1857       continue;
1858     }
1859     if( ! strncmp(tok,"-s",2) ) {
1860       status = buffer_get_field(&buffer, &buffer_size, &tok );
1861       if (status != 0) return syntax_error(sock,cmd);
1862       step = atol(tok);
1863       continue;
1864     }
1865     if( ! strncmp(tok,"-O",2) ) {
1866       no_overwrite = 1;
1867       continue;
1868     }
1869     if( ! strncmp(tok,"DS:",3) ) { av[ac++]=tok; continue; }
1870     if( ! strncmp(tok,"RRA:",4) ) { av[ac++]=tok; continue; }
1871     return syntax_error(sock,cmd);
1872   }
1873   if(step<1) {
1874     return send_response(sock, RESP_ERR, "The step size cannot be less than 1 second.\n");
1875   }
1876   if (last_up < 3600 * 24 * 365 * 10) {
1877     return send_response(sock, RESP_ERR, "The first entry must be after 1980.\n");
1878   }
1879
1880   rrd_clear_error ();
1881   status = rrd_create_r2(file,step,last_up,no_overwrite,ac,(const char **)av);
1882
1883   if(!status) {
1884     return send_response(sock, RESP_OK, "RRD created OK\n");
1885   }
1886   return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1887 } /* }}} static int handle_request_create  */
1888
1889 /* start "BATCH" processing */
1890 static int batch_start (HANDLER_PROTO) /* {{{ */
1891 {
1892   int status;
1893   if (sock->batch_start)
1894     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1895
1896   status = send_response(sock, RESP_OK,
1897                          "Go ahead.  End with dot '.' on its own line.\n");
1898   sock->batch_start = time(NULL);
1899   sock->batch_cmd = 0;
1900
1901   return status;
1902 } /* }}} static int batch_start */
1903
1904 /* finish "BATCH" processing and return results to the client */
1905 static int batch_done (HANDLER_PROTO) /* {{{ */
1906 {
1907   assert(sock->batch_start);
1908   sock->batch_start = 0;
1909   sock->batch_cmd  = 0;
1910   return send_response(sock, RESP_OK, "errors\n");
1911 } /* }}} static int batch_done */
1912
1913 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1914 {
1915   return -1;
1916 } /* }}} static int handle_request_quit */
1917
1918 static command_t list_of_commands[] = { /* {{{ */
1919   {
1920     "UPDATE",
1921     handle_request_update,
1922     CMD_CONTEXT_ANY,
1923     "UPDATE <filename> <values> [<values> ...]\n"
1924     ,
1925     "Adds the given file to the internal cache if it is not yet known and\n"
1926     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1927     "for details.\n"
1928     "\n"
1929     "Each <values> has the following form:\n"
1930     "  <values> = <time>:<value>[:<value>[...]]\n"
1931     "See the rrdupdate(1) manpage for details.\n"
1932   },
1933   {
1934     "WROTE",
1935     handle_request_wrote,
1936     CMD_CONTEXT_JOURNAL,
1937     NULL,
1938     NULL
1939   },
1940   {
1941     "FLUSH",
1942     handle_request_flush,
1943     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1944     "FLUSH <filename>\n"
1945     ,
1946     "Adds the given filename to the head of the update queue and returns\n"
1947     "after it has been dequeued.\n"
1948   },
1949   {
1950     "FLUSHALL",
1951     handle_request_flushall,
1952     CMD_CONTEXT_CLIENT,
1953     "FLUSHALL\n"
1954     ,
1955     "Triggers writing of all pending updates.  Returns immediately.\n"
1956   },
1957   {
1958     "PENDING",
1959     handle_request_pending,
1960     CMD_CONTEXT_CLIENT,
1961     "PENDING <filename>\n"
1962     ,
1963     "Shows any 'pending' updates for a file, in order.\n"
1964     "The updates shown have not yet been written to the underlying RRD file.\n"
1965   },
1966   {
1967     "FORGET",
1968     handle_request_forget,
1969     CMD_CONTEXT_ANY,
1970     "FORGET <filename>\n"
1971     ,
1972     "Removes the file completely from the cache.\n"
1973     "Any pending updates for the file will be lost.\n"
1974   },
1975   {
1976     "QUEUE",
1977     handle_request_queue,
1978     CMD_CONTEXT_CLIENT,
1979     "QUEUE\n"
1980     ,
1981         "Shows all files in the output queue.\n"
1982     "The output is zero or more lines in the following format:\n"
1983     "(where <num_vals> is the number of values to be written)\n"
1984     "\n"
1985     "<num_vals> <filename>\n"
1986   },
1987   {
1988     "STATS",
1989     handle_request_stats,
1990     CMD_CONTEXT_CLIENT,
1991     "STATS\n"
1992     ,
1993     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1994     "a description of the values.\n"
1995   },
1996   {
1997     "HELP",
1998     handle_request_help,
1999     CMD_CONTEXT_CLIENT,
2000     "HELP [<command>]\n",
2001     NULL, /* special! */
2002   },
2003   {
2004     "BATCH",
2005     batch_start,
2006     CMD_CONTEXT_CLIENT,
2007     "BATCH\n"
2008     ,
2009     "The 'BATCH' command permits the client to initiate a bulk load\n"
2010     "   of commands to rrdcached.\n"
2011     "\n"
2012     "Usage:\n"
2013     "\n"
2014     "    client: BATCH\n"
2015     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
2016     "    client: command #1\n"
2017     "    client: command #2\n"
2018     "    client: ... and so on\n"
2019     "    client: .\n"
2020     "    server: 2 errors\n"
2021     "    server: 7 message for command #7\n"
2022     "    server: 9 message for command #9\n"
2023     "\n"
2024     "For more information, consult the rrdcached(1) documentation.\n"
2025   },
2026   {
2027     ".",   /* BATCH terminator */
2028     batch_done,
2029     CMD_CONTEXT_BATCH,
2030     NULL,
2031     NULL
2032   },
2033   {
2034     "FETCH",
2035     handle_request_fetch,
2036     CMD_CONTEXT_CLIENT,
2037     "FETCH <file> <CF> [<start> [<end>]]\n"
2038     ,
2039     "The 'FETCH' can be used by the client to retrieve values from an RRD file.\n"
2040   },
2041   {
2042     "INFO",
2043     handle_request_info,
2044     CMD_CONTEXT_CLIENT,
2045     "INFO <filename>\n",
2046     "The INFO command retrieves information about a specified RRD file.\n"
2047     "This is returned in standard rrdinfo format, a sequence of lines\n"
2048     "with the format <keyname> = <value>\n"
2049     "Note that this is the data as of the last update of the RRD file itself,\n"
2050     "not the last time data was received via rrdcached, so there may be pending\n"
2051     "updates in the queue.  If this bothers you, then first run a FLUSH.\n"
2052   },
2053   {
2054     "FIRST",
2055     handle_request_first,
2056     CMD_CONTEXT_CLIENT,
2057     "FIRST <filename> <rra index>\n",
2058     "The FIRST command retrieves the first data time for a specified RRA in\n"
2059     "an RRD file.\n"
2060   },
2061   {
2062     "LAST",
2063     handle_request_last,
2064     CMD_CONTEXT_CLIENT,
2065     "LAST <filename>\n",
2066     "The LAST command retrieves the last update time for a specified RRD file.\n"
2067     "Note that this is the time of the last update of the RRD file itself, not\n"
2068     "the last time data was received via rrdcached, so there may be pending\n"
2069     "updates in the queue.  If this bothers you, then first run a FLUSH.\n"
2070   },
2071   {
2072     "CREATE",
2073     handle_request_create,
2074     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2075     "CREATE <filename> [-b start] [-s step] [-O] <DS definitions> <RRA definitions>\n",
2076     "The CREATE command will create an RRD file, overwriting any existing file\n"
2077     "unless the -O option is given or rrdcached was started with the -O option.\n"
2078     "The start parameter needs to be in seconds since 1/1/70 (AT-style syntax is\n"
2079     "not acceptable) and the step is in seconds (default is 300).\n"
2080     "The DS and RRA definitions are as for the 'rrdtool create' command.\n"
2081   },
2082   {
2083     "QUIT",
2084     handle_request_quit,
2085     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2086     "QUIT\n"
2087     ,
2088     "Disconnect from rrdcached.\n"
2089   }
2090 }; /* }}} command_t list_of_commands[] */
2091 static size_t list_of_commands_len = sizeof (list_of_commands)
2092   / sizeof (list_of_commands[0]);
2093
2094 static command_t *find_command(char *cmd)
2095 {
2096   size_t i;
2097
2098   for (i = 0; i < list_of_commands_len; i++)
2099     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2100       return (&list_of_commands[i]);
2101   return NULL;
2102 }
2103
2104 /* We currently use the index in the `list_of_commands' array as a bit position
2105  * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
2106  * outside these functions so that switching to a more elegant storage method
2107  * is easily possible. */
2108 static ssize_t find_command_index (const char *cmd) /* {{{ */
2109 {
2110   size_t i;
2111
2112   for (i = 0; i < list_of_commands_len; i++)
2113     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2114       return ((ssize_t) i);
2115   return (-1);
2116 } /* }}} ssize_t find_command_index */
2117
2118 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
2119     const char *cmd)
2120 {
2121   ssize_t i;
2122
2123   if (JOURNAL_REPLAY(sock))
2124     return (1);
2125
2126   if (cmd == NULL)
2127     return (-1);
2128
2129   if ((strcasecmp ("QUIT", cmd) == 0)
2130       || (strcasecmp ("HELP", cmd) == 0))
2131     return (1);
2132   else if (strcmp (".", cmd) == 0)
2133     cmd = "BATCH";
2134
2135   i = find_command_index (cmd);
2136   if (i < 0)
2137     return (-1);
2138   assert (i < 32);
2139
2140   if ((sock->permissions & (1 << i)) != 0)
2141     return (1);
2142   return (0);
2143 } /* }}} int socket_permission_check */
2144
2145 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
2146     const char *cmd)
2147 {
2148   ssize_t i;
2149
2150   i = find_command_index (cmd);
2151   if (i < 0)
2152     return (-1);
2153   assert (i < 32);
2154
2155   sock->permissions |= (1 << i);
2156   return (0);
2157 } /* }}} int socket_permission_add */
2158
2159 static void socket_permission_clear (listen_socket_t *sock) /* {{{ */
2160 {
2161   sock->permissions = 0;
2162 } /* }}} socket_permission_clear */
2163
2164 static void socket_permission_copy (listen_socket_t *dest, /* {{{ */
2165     listen_socket_t *src)
2166 {
2167   dest->permissions = src->permissions;
2168 } /* }}} socket_permission_copy */
2169
2170 static void socket_permission_set_all (listen_socket_t *sock) /* {{{ */
2171 {
2172   size_t i;
2173
2174   sock->permissions = 0;
2175   for (i = 0; i < list_of_commands_len; i++)
2176     sock->permissions |= (1 << i);
2177 } /* }}} void socket_permission_set_all */
2178
2179 /* check whether commands are received in the expected context */
2180 static int command_check_context(listen_socket_t *sock, command_t *cmd)
2181 {
2182   if (JOURNAL_REPLAY(sock))
2183     return (cmd->context & CMD_CONTEXT_JOURNAL);
2184   else if (sock->batch_start)
2185     return (cmd->context & CMD_CONTEXT_BATCH);
2186   else
2187     return (cmd->context & CMD_CONTEXT_CLIENT);
2188
2189   /* NOTREACHED */
2190   assert(1==0);
2191 }
2192
2193 static int handle_request_help (HANDLER_PROTO) /* {{{ */
2194 {
2195   int status;
2196   char *cmd_str;
2197   char *resp_txt;
2198   command_t *help = NULL;
2199
2200   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
2201   if (status == 0)
2202     help = find_command(cmd_str);
2203
2204   if (help && (help->syntax || help->help))
2205   {
2206     char tmp[RRD_CMD_MAX];
2207
2208     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
2209     resp_txt = tmp;
2210
2211     if (help->syntax)
2212       add_response_info(sock, "Usage: %s\n", help->syntax);
2213
2214     if (help->help)
2215       add_response_info(sock, "%s\n", help->help);
2216   }
2217   else
2218   {
2219     size_t i;
2220
2221     resp_txt = "Command overview\n";
2222
2223     for (i = 0; i < list_of_commands_len; i++)
2224     {
2225       if (list_of_commands[i].syntax == NULL)
2226         continue;
2227       add_response_info (sock, "%s", list_of_commands[i].syntax);
2228     }
2229   }
2230
2231   return send_response(sock, RESP_OK, resp_txt);
2232 } /* }}} int handle_request_help */
2233
2234 static int handle_request (DISPATCH_PROTO) /* {{{ */
2235 {
2236   char *buffer_ptr = buffer;
2237   char *cmd_str = NULL;
2238   command_t *cmd = NULL;
2239   int status;
2240
2241   assert (buffer[buffer_size - 1] == '\0');
2242
2243   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
2244   if (status != 0)
2245   {
2246     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
2247     return (-1);
2248   }
2249
2250   if (sock != NULL && sock->batch_start)
2251     sock->batch_cmd++;
2252
2253   cmd = find_command(cmd_str);
2254   if (!cmd)
2255     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
2256
2257   if (!socket_permission_check (sock, cmd->cmd))
2258     return send_response(sock, RESP_ERR, "Permission denied.\n");
2259
2260   if (!command_check_context(sock, cmd))
2261     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
2262
2263   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
2264 } /* }}} int handle_request */
2265
2266 static void journal_set_free (journal_set *js) /* {{{ */
2267 {
2268   if (js == NULL)
2269     return;
2270
2271   rrd_free_ptrs((void ***) &js->files, &js->files_num);
2272
2273   free(js);
2274 } /* }}} journal_set_free */
2275
2276 static void journal_set_remove (journal_set *js) /* {{{ */
2277 {
2278   if (js == NULL)
2279     return;
2280
2281   for (uint i=0; i < js->files_num; i++)
2282   {
2283     RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
2284     unlink(js->files[i]);
2285   }
2286 } /* }}} journal_set_remove */
2287
2288 /* close current journal file handle.
2289  * MUST hold journal_lock before calling */
2290 static void journal_close(void) /* {{{ */
2291 {
2292   if (journal_fh != NULL)
2293   {
2294     if (fclose(journal_fh) != 0)
2295       RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
2296   }
2297
2298   journal_fh = NULL;
2299   journal_size = 0;
2300 } /* }}} journal_close */
2301
2302 /* MUST hold journal_lock before calling */
2303 static void journal_new_file(void) /* {{{ */
2304 {
2305   struct timeval now;
2306   int  new_fd;
2307   char new_file[PATH_MAX + 1];
2308
2309   assert(journal_dir != NULL);
2310   assert(journal_cur != NULL);
2311
2312   journal_close();
2313
2314   gettimeofday(&now, NULL);
2315   /* this format assures that the files sort in strcmp() order */
2316   snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
2317            journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
2318
2319   new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
2320                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
2321   if (new_fd < 0)
2322     goto error;
2323
2324   journal_fh = fdopen(new_fd, "a");
2325   if (journal_fh == NULL)
2326     goto error;
2327
2328   journal_size = ftell(journal_fh);
2329   RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
2330
2331   /* record the file in the journal set */
2332   rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
2333
2334   return;
2335
2336 error:
2337   RRDD_LOG(LOG_CRIT,
2338            "JOURNALING DISABLED: Error while trying to create %s : %s",
2339            new_file, rrd_strerror(errno));
2340   RRDD_LOG(LOG_CRIT,
2341            "JOURNALING DISABLED: All values will be flushed at shutdown");
2342
2343   close(new_fd);
2344   config_flush_at_shutdown = 1;
2345
2346 } /* }}} journal_new_file */
2347
2348 /* MUST NOT hold journal_lock before calling this */
2349 static void journal_rotate(void) /* {{{ */
2350 {
2351   journal_set *old_js = NULL;
2352
2353   if (journal_dir == NULL)
2354     return;
2355
2356   RRDD_LOG(LOG_DEBUG, "rotating journals");
2357
2358   pthread_mutex_lock(&stats_lock);
2359   ++stats_journal_rotate;
2360   pthread_mutex_unlock(&stats_lock);
2361
2362   pthread_mutex_lock(&journal_lock);
2363
2364   journal_close();
2365
2366   /* rotate the journal sets */
2367   old_js = journal_old;
2368   journal_old = journal_cur;
2369   journal_cur = calloc(1, sizeof(journal_set));
2370
2371   if (journal_cur != NULL)
2372     journal_new_file();
2373   else
2374     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2375
2376   pthread_mutex_unlock(&journal_lock);
2377
2378   journal_set_remove(old_js);
2379   journal_set_free  (old_js);
2380
2381 } /* }}} static void journal_rotate */
2382
2383 /* MUST hold journal_lock when calling */
2384 static void journal_done(void) /* {{{ */
2385 {
2386   if (journal_cur == NULL)
2387     return;
2388
2389   journal_close();
2390
2391   if (config_flush_at_shutdown)
2392   {
2393     RRDD_LOG(LOG_INFO, "removing journals");
2394     journal_set_remove(journal_old);
2395     journal_set_remove(journal_cur);
2396   }
2397   else
2398   {
2399     RRDD_LOG(LOG_INFO, "expedited shutdown; "
2400              "journals will be used at next startup");
2401   }
2402
2403   journal_set_free(journal_cur);
2404   journal_set_free(journal_old);
2405   free(journal_dir);
2406
2407 } /* }}} static void journal_done */
2408
2409 static int journal_write(char *cmd, char *args) /* {{{ */
2410 {
2411   int chars;
2412
2413   if (journal_fh == NULL)
2414     return 0;
2415
2416   pthread_mutex_lock(&journal_lock);
2417   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
2418   journal_size += chars;
2419
2420   if (journal_size > JOURNAL_MAX)
2421     journal_new_file();
2422
2423   pthread_mutex_unlock(&journal_lock);
2424
2425   if (chars > 0)
2426   {
2427     pthread_mutex_lock(&stats_lock);
2428     stats_journal_bytes += chars;
2429     pthread_mutex_unlock(&stats_lock);
2430   }
2431
2432   return chars;
2433 } /* }}} static int journal_write */
2434
2435 static int journal_replay (const char *file) /* {{{ */
2436 {
2437   FILE *fh;
2438   int entry_cnt = 0;
2439   int fail_cnt = 0;
2440   uint64_t line = 0;
2441   char entry[RRD_CMD_MAX];
2442   time_t now;
2443
2444   if (file == NULL) return 0;
2445
2446   {
2447     char *reason = "unknown error";
2448     int status = 0;
2449     struct stat statbuf;
2450
2451     memset(&statbuf, 0, sizeof(statbuf));
2452     if (stat(file, &statbuf) != 0)
2453     {
2454       reason = "stat error";
2455       status = errno;
2456     }
2457     else if (!S_ISREG(statbuf.st_mode))
2458     {
2459       reason = "not a regular file";
2460       status = EPERM;
2461     }
2462     if (statbuf.st_uid != daemon_uid)
2463     {
2464       reason = "not owned by daemon user";
2465       status = EACCES;
2466     }
2467     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2468     {
2469       reason = "must not be user/group writable";
2470       status = EACCES;
2471     }
2472
2473     if (status != 0)
2474     {
2475       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2476                file, rrd_strerror(status), reason);
2477       return 0;
2478     }
2479   }
2480
2481   fh = fopen(file, "r");
2482   if (fh == NULL)
2483   {
2484     if (errno != ENOENT)
2485       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2486                file, rrd_strerror(errno));
2487     return 0;
2488   }
2489   else
2490     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2491
2492   now = time(NULL);
2493
2494   while(!feof(fh))
2495   {
2496     size_t entry_len;
2497
2498     ++line;
2499     if (fgets(entry, sizeof(entry), fh) == NULL)
2500       break;
2501     entry_len = strlen(entry);
2502
2503     /* check \n termination in case journal writing crashed mid-line */
2504     if (entry_len == 0)
2505       continue;
2506     else if (entry[entry_len - 1] != '\n')
2507     {
2508       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2509       ++fail_cnt;
2510       continue;
2511     }
2512
2513     entry[entry_len - 1] = '\0';
2514
2515     if (handle_request(NULL, now, entry, entry_len) == 0)
2516       ++entry_cnt;
2517     else
2518       ++fail_cnt;
2519   }
2520
2521   fclose(fh);
2522
2523   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2524            entry_cnt, fail_cnt);
2525
2526   return entry_cnt > 0 ? 1 : 0;
2527 } /* }}} static int journal_replay */
2528
2529 static int journal_sort(const void *v1, const void *v2)
2530 {
2531   char **jn1 = (char **) v1;
2532   char **jn2 = (char **) v2;
2533
2534   return strcmp(*jn1,*jn2);
2535 }
2536
2537 static void journal_init(void) /* {{{ */
2538 {
2539   int had_journal = 0;
2540   DIR *dir;
2541   struct dirent *dent;
2542   char path[PATH_MAX+1];
2543
2544   if (journal_dir == NULL) return;
2545
2546   pthread_mutex_lock(&journal_lock);
2547
2548   journal_cur = calloc(1, sizeof(journal_set));
2549   if (journal_cur == NULL)
2550   {
2551     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2552     return;
2553   }
2554
2555   RRDD_LOG(LOG_INFO, "checking for journal files");
2556
2557   /* Handle old journal files during transition.  This gives them the
2558    * correct sort order.  TODO: remove after first release
2559    */
2560   {
2561     char old_path[PATH_MAX+1];
2562     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2563     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2564     rename(old_path, path);
2565
2566     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE        );
2567     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2568     rename(old_path, path);
2569   }
2570
2571   dir = opendir(journal_dir);
2572   if (!dir) {
2573     RRDD_LOG(LOG_CRIT, "journal_init: opendir(%s) failed\n", journal_dir);
2574     return;
2575   }
2576   while ((dent = readdir(dir)) != NULL)
2577   {
2578     /* looks like a journal file? */
2579     if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2580       continue;
2581
2582     snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2583
2584     if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2585     {
2586       RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2587                dent->d_name);
2588       break;
2589     }
2590   }
2591   closedir(dir);
2592
2593   qsort(journal_cur->files, journal_cur->files_num,
2594         sizeof(journal_cur->files[0]), journal_sort);
2595
2596   for (uint i=0; i < journal_cur->files_num; i++)
2597     had_journal += journal_replay(journal_cur->files[i]);
2598
2599   journal_new_file();
2600
2601   /* it must have been a crash.  start a flush */
2602   if (had_journal && config_flush_at_shutdown)
2603     flush_old_values(-1);
2604
2605   pthread_mutex_unlock(&journal_lock);
2606
2607   RRDD_LOG(LOG_INFO, "journal processing complete");
2608
2609 } /* }}} static void journal_init */
2610
2611 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2612 {
2613   assert(sock != NULL);
2614
2615   free(sock->rbuf);  sock->rbuf = NULL;
2616   free(sock->wbuf);  sock->wbuf = NULL;
2617   free(sock);
2618 } /* }}} void free_listen_socket */
2619
2620 static void close_connection(listen_socket_t *sock) /* {{{ */
2621 {
2622   if (sock->fd >= 0)
2623   {
2624     close(sock->fd);
2625     sock->fd = -1;
2626   }
2627
2628   free_listen_socket(sock);
2629
2630 } /* }}} void close_connection */
2631
2632 static void *connection_thread_main (void *args) /* {{{ */
2633 {
2634   listen_socket_t *sock;
2635   int fd;
2636
2637   sock = (listen_socket_t *) args;
2638   fd = sock->fd;
2639
2640   /* init read buffers */
2641   sock->next_read = sock->next_cmd = 0;
2642   sock->rbuf = malloc(RBUF_SIZE);
2643   if (sock->rbuf == NULL)
2644   {
2645     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2646     close_connection(sock);
2647     return NULL;
2648   }
2649
2650   pthread_mutex_lock (&connection_threads_lock);
2651 #ifdef HAVE_LIBWRAP
2652   /* LIBWRAP does not support multiple threads! By putting this code
2653      inside pthread_mutex_lock we do not have to worry about request_info
2654      getting overwritten by another thread.
2655   */
2656   struct request_info req;
2657   request_init(&req, RQ_DAEMON, "rrdcache\0", RQ_FILE, fd, NULL );
2658   fromhost(&req);
2659   if(!hosts_access(&req)) {
2660     RRDD_LOG(LOG_INFO, "refused connection from %s", eval_client(&req));
2661     pthread_mutex_unlock (&connection_threads_lock);
2662     close_connection(sock);
2663     return NULL;
2664   }
2665 #endif /* HAVE_LIBWRAP */
2666   connection_threads_num++;
2667   pthread_mutex_unlock (&connection_threads_lock);
2668
2669   while (state == RUNNING)
2670   {
2671     char *cmd;
2672     ssize_t cmd_len;
2673     ssize_t rbytes;
2674     time_t now;
2675
2676     struct pollfd pollfd;
2677     int status;
2678
2679     pollfd.fd = fd;
2680     pollfd.events = POLLIN | POLLPRI;
2681     pollfd.revents = 0;
2682
2683     status = poll (&pollfd, 1, /* timeout = */ 500);
2684     if (state != RUNNING)
2685       break;
2686     else if (status == 0) /* timeout */
2687       continue;
2688     else if (status < 0) /* error */
2689     {
2690       status = errno;
2691       if (status != EINTR)
2692         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2693       continue;
2694     }
2695
2696     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2697       break;
2698     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2699     {
2700       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2701           "poll(2) returned something unexpected: %#04hx",
2702           pollfd.revents);
2703       break;
2704     }
2705
2706     rbytes = read(fd, sock->rbuf + sock->next_read,
2707                   RBUF_SIZE - sock->next_read);
2708     if (rbytes < 0)
2709     {
2710       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2711       break;
2712     }
2713     else if (rbytes == 0)
2714       break; /* eof */
2715
2716     sock->next_read += rbytes;
2717
2718     if (sock->batch_start)
2719       now = sock->batch_start;
2720     else
2721       now = time(NULL);
2722
2723     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2724     {
2725       status = handle_request (sock, now, cmd, cmd_len+1);
2726       if (status != 0)
2727         goto out_close;
2728     }
2729   }
2730
2731 out_close:
2732   close_connection(sock);
2733
2734   /* Remove this thread from the connection threads list */
2735   pthread_mutex_lock (&connection_threads_lock);
2736   connection_threads_num--;
2737   if (connection_threads_num <= 0)
2738     pthread_cond_broadcast(&connection_threads_done);
2739   pthread_mutex_unlock (&connection_threads_lock);
2740
2741   return (NULL);
2742 } /* }}} void *connection_thread_main */
2743
2744 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2745 {
2746   int fd;
2747   struct sockaddr_un sa;
2748   listen_socket_t *temp;
2749   int status;
2750   const char *path;
2751   char *path_copy, *dir;
2752
2753   path = sock->addr;
2754   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2755     path += strlen("unix:");
2756
2757   /* dirname may modify its argument */
2758   path_copy = strdup(path);
2759   if (path_copy == NULL)
2760   {
2761     fprintf(stderr, "rrdcached: strdup(): %s\n",
2762         rrd_strerror(errno));
2763     return (-1);
2764   }
2765
2766   dir = dirname(path_copy);
2767   if (rrd_mkdir_p(dir, 0777) != 0)
2768   {
2769     fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2770         dir, rrd_strerror(errno));
2771     return (-1);
2772   }
2773
2774   free(path_copy);
2775
2776   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2777       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2778   if (temp == NULL)
2779   {
2780     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2781     return (-1);
2782   }
2783   listen_fds = temp;
2784   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2785
2786   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2787   if (fd < 0)
2788   {
2789     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2790              rrd_strerror(errno));
2791     return (-1);
2792   }
2793
2794   memset (&sa, 0, sizeof (sa));
2795   sa.sun_family = AF_UNIX;
2796   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2797
2798   /* if we've gotten this far, we own the pid file.  any daemon started
2799    * with the same args must not be alive.  therefore, ensure that we can
2800    * create the socket...
2801    */
2802   unlink(path);
2803
2804   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2805   if (status != 0)
2806   {
2807     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2808              path, rrd_strerror(errno));
2809     close (fd);
2810     return (-1);
2811   }
2812
2813   /* tweak the sockets group ownership */
2814   if (sock->socket_group != (gid_t)-1)
2815   {
2816     if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2817          (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2818     {
2819       fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2820     }
2821   }
2822
2823   if (sock->socket_permissions != (mode_t)-1)
2824   {
2825     if (chmod(path, sock->socket_permissions) != 0)
2826       fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2827           (unsigned int)sock->socket_permissions, strerror(errno));
2828   }
2829
2830   status = listen (fd, /* backlog = */ 10);
2831   if (status != 0)
2832   {
2833     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2834              path, rrd_strerror(errno));
2835     close (fd);
2836     unlink (path);
2837     return (-1);
2838   }
2839
2840   listen_fds[listen_fds_num].fd = fd;
2841   listen_fds[listen_fds_num].family = PF_UNIX;
2842   strncpy(listen_fds[listen_fds_num].addr, path,
2843           sizeof (listen_fds[listen_fds_num].addr) - 1);
2844   listen_fds_num++;
2845
2846   return (0);
2847 } /* }}} int open_listen_socket_unix */
2848
2849 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2850 {
2851   struct addrinfo ai_hints;
2852   struct addrinfo *ai_res;
2853   struct addrinfo *ai_ptr;
2854   char addr_copy[NI_MAXHOST];
2855   char *addr;
2856   char *port;
2857   int status;
2858
2859   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2860   addr_copy[sizeof (addr_copy) - 1] = 0;
2861   addr = addr_copy;
2862
2863   memset (&ai_hints, 0, sizeof (ai_hints));
2864   ai_hints.ai_flags = 0;
2865 #ifdef AI_ADDRCONFIG
2866   ai_hints.ai_flags |= AI_ADDRCONFIG;
2867 #endif
2868   ai_hints.ai_family = AF_UNSPEC;
2869   ai_hints.ai_socktype = SOCK_STREAM;
2870
2871   port = NULL;
2872   if (*addr == '[') /* IPv6+port format */
2873   {
2874     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2875     addr++;
2876
2877     port = strchr (addr, ']');
2878     if (port == NULL)
2879     {
2880       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2881       return (-1);
2882     }
2883     *port = 0;
2884     port++;
2885
2886     if (*port == ':')
2887       port++;
2888     else if (*port == 0)
2889       port = NULL;
2890     else
2891     {
2892       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2893       return (-1);
2894     }
2895   } /* if (*addr == '[') */
2896   else
2897   {
2898     port = rindex(addr, ':');
2899     if (port != NULL)
2900     {
2901       *port = 0;
2902       port++;
2903     }
2904   }
2905   ai_res = NULL;
2906   status = getaddrinfo (addr,
2907                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2908                         &ai_hints, &ai_res);
2909   if (status != 0)
2910   {
2911     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2912              addr, gai_strerror (status));
2913     return (-1);
2914   }
2915
2916   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2917   {
2918     int fd;
2919     listen_socket_t *temp;
2920     int one = 1;
2921
2922     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2923         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2924     if (temp == NULL)
2925     {
2926       fprintf (stderr,
2927                "rrdcached: open_listen_socket_network: realloc failed.\n");
2928       continue;
2929     }
2930     listen_fds = temp;
2931     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2932
2933     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2934     if (fd < 0)
2935     {
2936       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2937                rrd_strerror(errno));
2938       continue;
2939     }
2940
2941     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2942
2943     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2944     if (status != 0)
2945     {
2946       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2947                sock->addr, rrd_strerror(errno));
2948       close (fd);
2949       continue;
2950     }
2951
2952     status = listen (fd, /* backlog = */ 10);
2953     if (status != 0)
2954     {
2955       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2956                sock->addr, rrd_strerror(errno));
2957       close (fd);
2958       freeaddrinfo(ai_res);
2959       return (-1);
2960     }
2961
2962     listen_fds[listen_fds_num].fd = fd;
2963     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2964     listen_fds_num++;
2965   } /* for (ai_ptr) */
2966
2967   freeaddrinfo(ai_res);
2968   return (0);
2969 } /* }}} static int open_listen_socket_network */
2970
2971 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2972 {
2973   assert(sock != NULL);
2974   assert(sock->addr != NULL);
2975
2976   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2977       || sock->addr[0] == '/')
2978     return (open_listen_socket_unix(sock));
2979   else
2980     return (open_listen_socket_network(sock));
2981 } /* }}} int open_listen_socket */
2982
2983 static int close_listen_sockets (void) /* {{{ */
2984 {
2985   size_t i;
2986
2987   for (i = 0; i < listen_fds_num; i++)
2988   {
2989     close (listen_fds[i].fd);
2990
2991     if (listen_fds[i].family == PF_UNIX)
2992       unlink(listen_fds[i].addr);
2993   }
2994
2995   free (listen_fds);
2996   listen_fds = NULL;
2997   listen_fds_num = 0;
2998
2999   return (0);
3000 } /* }}} int close_listen_sockets */
3001
3002 static void *listen_thread_main (void UNUSED(*args)) /* {{{ */
3003 {
3004   struct pollfd *pollfds;
3005   int pollfds_num;
3006   int status;
3007   int i;
3008
3009   if (listen_fds_num < 1)
3010   {
3011     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
3012     return (NULL);
3013   }
3014
3015   pollfds_num = listen_fds_num;
3016   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
3017   if (pollfds == NULL)
3018   {
3019     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
3020     return (NULL);
3021   }
3022   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
3023
3024   RRDD_LOG(LOG_INFO, "listening for connections");
3025
3026   while (state == RUNNING)
3027   {
3028     for (i = 0; i < pollfds_num; i++)
3029     {
3030       pollfds[i].fd = listen_fds[i].fd;
3031       pollfds[i].events = POLLIN | POLLPRI;
3032       pollfds[i].revents = 0;
3033     }
3034
3035     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
3036     if (state != RUNNING)
3037       break;
3038     else if (status == 0) /* timeout */
3039       continue;
3040     else if (status < 0) /* error */
3041     {
3042       status = errno;
3043       if (status != EINTR)
3044       {
3045         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
3046       }
3047       continue;
3048     }
3049
3050     for (i = 0; i < pollfds_num; i++)
3051     {
3052       listen_socket_t *client_sock;
3053       struct sockaddr_storage client_sa;
3054       socklen_t client_sa_size;
3055       pthread_t tid;
3056       pthread_attr_t attr;
3057
3058       if (pollfds[i].revents == 0)
3059         continue;
3060
3061       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
3062       {
3063         RRDD_LOG (LOG_ERR, "listen_thread_main: "
3064             "poll(2) returned something unexpected for listen FD #%i.",
3065             pollfds[i].fd);
3066         continue;
3067       }
3068
3069       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
3070       if (client_sock == NULL)
3071       {
3072         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
3073         continue;
3074       }
3075       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
3076
3077       client_sa_size = sizeof (client_sa);
3078       client_sock->fd = accept (pollfds[i].fd,
3079           (struct sockaddr *) &client_sa, &client_sa_size);
3080       if (client_sock->fd < 0)
3081       {
3082         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
3083         free(client_sock);
3084         continue;
3085       }
3086
3087       pthread_attr_init (&attr);
3088       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
3089
3090       status = pthread_create (&tid, &attr, connection_thread_main,
3091                                client_sock);
3092       if (status != 0)
3093       {
3094         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
3095         close_connection(client_sock);
3096         continue;
3097       }
3098     } /* for (pollfds_num) */
3099   } /* while (state == RUNNING) */
3100
3101   RRDD_LOG(LOG_INFO, "starting shutdown");
3102
3103   close_listen_sockets ();
3104
3105   pthread_mutex_lock (&connection_threads_lock);
3106   while (connection_threads_num > 0)
3107     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
3108   pthread_mutex_unlock (&connection_threads_lock);
3109
3110   free(pollfds);
3111
3112   return (NULL);
3113 } /* }}} void *listen_thread_main */
3114
3115 static int daemonize (void) /* {{{ */
3116 {
3117   int pid_fd;
3118   char *base_dir;
3119
3120   daemon_uid = geteuid();
3121
3122   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
3123   if (pid_fd < 0)
3124     pid_fd = check_pidfile();
3125   if (pid_fd < 0)
3126     return pid_fd;
3127
3128   /* open all the listen sockets */
3129   if (config_listen_address_list_len > 0)
3130   {
3131     for (size_t i = 0; i < config_listen_address_list_len; i++)
3132       open_listen_socket (config_listen_address_list[i]);
3133
3134     rrd_free_ptrs((void ***) &config_listen_address_list,
3135                   &config_listen_address_list_len);
3136   }
3137   else
3138   {
3139     strncpy(default_socket.addr, RRDCACHED_DEFAULT_ADDRESS,
3140         sizeof(default_socket.addr) - 1);
3141     default_socket.addr[sizeof(default_socket.addr) - 1] = '\0';
3142
3143     if (default_socket.permissions == 0)
3144       socket_permission_set_all (&default_socket);
3145
3146     open_listen_socket (&default_socket);
3147   }
3148
3149   if (listen_fds_num < 1)
3150   {
3151     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
3152     goto error;
3153   }
3154
3155   if (!stay_foreground)
3156   {
3157     pid_t child;
3158
3159     child = fork ();
3160     if (child < 0)
3161     {
3162       fprintf (stderr, "daemonize: fork(2) failed.\n");
3163       goto error;
3164     }
3165     else if (child > 0)
3166       exit(0);
3167
3168     /* Become session leader */
3169     setsid ();
3170
3171     /* Open the first three file descriptors to /dev/null */
3172     close (2);
3173     close (1);
3174     close (0);
3175
3176     open ("/dev/null", O_RDWR);
3177     if (dup(0) == -1 || dup(0) == -1){
3178         RRDD_LOG (LOG_ERR, "faild to run dup.\n");
3179     }
3180   } /* if (!stay_foreground) */
3181
3182   /* Change into the /tmp directory. */
3183   base_dir = (config_base_dir != NULL)
3184     ? config_base_dir
3185     : "/tmp";
3186
3187   if (chdir (base_dir) != 0)
3188   {
3189     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
3190     goto error;
3191   }
3192
3193   install_signal_handlers();
3194
3195   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
3196   RRDD_LOG(LOG_INFO, "starting up");
3197
3198   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
3199                                 (GDestroyNotify) free_cache_item);
3200   if (cache_tree == NULL)
3201   {
3202     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
3203     goto error;
3204   }
3205
3206   return write_pidfile (pid_fd);
3207
3208 error:
3209   remove_pidfile();
3210   return -1;
3211 } /* }}} int daemonize */
3212
3213 static int cleanup (void) /* {{{ */
3214 {
3215   pthread_cond_broadcast (&flush_cond);
3216   pthread_join (flush_thread, NULL);
3217
3218   pthread_cond_broadcast (&queue_cond);
3219   for (int i = 0; i < config_queue_threads; i++)
3220     pthread_join (queue_threads[i], NULL);
3221
3222   if (config_flush_at_shutdown)
3223   {
3224     assert(cache_queue_head == NULL);
3225     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
3226   }
3227
3228   free(queue_threads);
3229   free(config_base_dir);
3230
3231   pthread_mutex_lock(&cache_lock);
3232   g_tree_destroy(cache_tree);
3233
3234   pthread_mutex_lock(&journal_lock);
3235   journal_done();
3236
3237   RRDD_LOG(LOG_INFO, "goodbye");
3238   closelog ();
3239
3240   remove_pidfile ();
3241   free(config_pid_file);
3242
3243   return (0);
3244 } /* }}} int cleanup */
3245
3246 static int read_options (int argc, char **argv) /* {{{ */
3247 {
3248   int option;
3249   int status = 0;
3250
3251   socket_permission_clear (&default_socket);
3252
3253   default_socket.socket_group = (gid_t)-1;
3254   default_socket.socket_permissions = (mode_t)-1;
3255
3256   while ((option = getopt(argc, argv, "Ogl:s:m:P:f:w:z:t:Bb:p:Fj:a:h?")) != -1)
3257   {
3258     switch (option)
3259     {
3260       case 'O':
3261         opt_no_overwrite = 1;
3262         break;
3263
3264       case 'g':
3265         stay_foreground=1;
3266         break;
3267
3268       case 'l':
3269       {
3270         listen_socket_t *new;
3271
3272         new = malloc(sizeof(listen_socket_t));
3273         if (new == NULL)
3274         {
3275           fprintf(stderr, "read_options: malloc failed.\n");
3276           return(2);
3277         }
3278         memset(new, 0, sizeof(listen_socket_t));
3279
3280         strncpy(new->addr, optarg, sizeof(new->addr)-1);
3281
3282         /* Add permissions to the socket {{{ */
3283         if (default_socket.permissions != 0)
3284         {
3285           socket_permission_copy (new, &default_socket);
3286         }
3287         else /* if (default_socket.permissions == 0) */
3288         {
3289           /* Add permission for ALL commands to the socket. */
3290           socket_permission_set_all (new);
3291         }
3292         /* }}} Done adding permissions. */
3293
3294         new->socket_group = default_socket.socket_group;
3295         new->socket_permissions = default_socket.socket_permissions;
3296
3297         if (!rrd_add_ptr((void ***)&config_listen_address_list,
3298                          &config_listen_address_list_len, new))
3299         {
3300           fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
3301           return (2);
3302         }
3303       }
3304       break;
3305
3306       /* set socket group permissions */
3307       case 's':
3308       {
3309         gid_t group_gid;
3310         struct group *grp;
3311
3312         group_gid = strtoul(optarg, NULL, 10);
3313         if (errno != EINVAL && group_gid>0)
3314         {
3315           /* we were passed a number */
3316           grp = getgrgid(group_gid);
3317         }
3318         else
3319         {
3320           grp = getgrnam(optarg);
3321         }
3322
3323         if (grp)
3324         {
3325           default_socket.socket_group = grp->gr_gid;
3326         }
3327         else
3328         {
3329           /* no idea what the user wanted... */
3330           fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
3331           return (5);
3332         }
3333       }
3334       break;
3335
3336       /* set socket file permissions */
3337       case 'm':
3338       {
3339         long  tmp;
3340         char *endptr = NULL;
3341
3342         tmp = strtol (optarg, &endptr, 8);
3343         if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
3344             || (tmp > 07777) || (tmp < 0)) {
3345           fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
3346               optarg);
3347           return (5);
3348         }
3349
3350         default_socket.socket_permissions = (mode_t)tmp;
3351       }
3352       break;
3353
3354       case 'P':
3355       {
3356         char *optcopy;
3357         char *saveptr;
3358         char *dummy;
3359         char *ptr;
3360
3361         socket_permission_clear (&default_socket);
3362
3363         optcopy = strdup (optarg);
3364         dummy = optcopy;
3365         saveptr = NULL;
3366         while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
3367         {
3368           dummy = NULL;
3369           status = socket_permission_add (&default_socket, ptr);
3370           if (status != 0)
3371           {
3372             fprintf (stderr, "read_options: Adding permission \"%s\" to "
3373                 "socket failed. Most likely, this permission doesn't "
3374                 "exist. Check your command line.\n", ptr);
3375             status = 4;
3376           }
3377         }
3378
3379         free (optcopy);
3380       }
3381       break;
3382
3383       case 'f':
3384       {
3385         int temp;
3386
3387         temp = atoi (optarg);
3388         if (temp > 0)
3389           config_flush_interval = temp;
3390         else
3391         {
3392           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
3393           status = 3;
3394         }
3395       }
3396       break;
3397
3398       case 'w':
3399       {
3400         int temp;
3401
3402         temp = atoi (optarg);
3403         if (temp > 0)
3404           config_write_interval = temp;
3405         else
3406         {
3407           fprintf (stderr, "Invalid write interval: %s\n", optarg);
3408           status = 2;
3409         }
3410       }
3411       break;
3412
3413       case 'z':
3414       {
3415         int temp;
3416
3417         temp = atoi(optarg);
3418         if (temp > 0)
3419           config_write_jitter = temp;
3420         else
3421         {
3422           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
3423           status = 2;
3424         }
3425
3426         break;
3427       }
3428
3429       case 't':
3430       {
3431         int threads;
3432         threads = atoi(optarg);
3433         if (threads >= 1)
3434           config_queue_threads = threads;
3435         else
3436         {
3437           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
3438           return 1;
3439         }
3440       }
3441       break;
3442
3443       case 'B':
3444         config_write_base_only = 1;
3445         break;
3446
3447       case 'b':
3448       {
3449         size_t len;
3450         char base_realpath[PATH_MAX];
3451
3452         if (config_base_dir != NULL)
3453           free (config_base_dir);
3454         config_base_dir = strdup (optarg);
3455         if (config_base_dir == NULL)
3456         {
3457           fprintf (stderr, "read_options: strdup failed.\n");
3458           return (3);
3459         }
3460
3461         if (rrd_mkdir_p (config_base_dir, 0777) != 0)
3462         {
3463           fprintf (stderr, "Failed to create base directory '%s': %s\n",
3464               config_base_dir, rrd_strerror (errno));
3465           return (3);
3466         }
3467
3468         /* make sure that the base directory is not resolved via
3469          * symbolic links.  this makes some performance-enhancing
3470          * assumptions possible (we don't have to resolve paths
3471          * that start with a "/")
3472          */
3473         if (realpath(config_base_dir, base_realpath) == NULL)
3474         {
3475           fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3476               "%s\n", config_base_dir, rrd_strerror(errno));
3477           return 5;
3478         }
3479
3480         len = strlen (config_base_dir);
3481         while ((len > 0) && (config_base_dir[len - 1] == '/'))
3482         {
3483           config_base_dir[len - 1] = 0;
3484           len--;
3485         }
3486
3487         if (len < 1)
3488         {
3489           fprintf (stderr, "Invalid base directory: %s\n", optarg);
3490           return (4);
3491         }
3492
3493         _config_base_dir_len = len;
3494
3495         len = strlen (base_realpath);
3496         while ((len > 0) && (base_realpath[len - 1] == '/'))
3497         {
3498           base_realpath[len - 1] = '\0';
3499           len--;
3500         }
3501
3502         if (strncmp(config_base_dir,
3503                          base_realpath, sizeof(base_realpath)) != 0)
3504         {
3505           fprintf(stderr,
3506                   "Base directory (-b) resolved via file system links!\n"
3507                   "Please consult rrdcached '-b' documentation!\n"
3508                   "Consider specifying the real directory (%s)\n",
3509                   base_realpath);
3510           return 5;
3511         }
3512       }
3513       break;
3514
3515       case 'p':
3516       {
3517         if (config_pid_file != NULL)
3518           free (config_pid_file);
3519         config_pid_file = strdup (optarg);
3520         if (config_pid_file == NULL)
3521         {
3522           fprintf (stderr, "read_options: strdup failed.\n");
3523           return (3);
3524         }
3525       }
3526       break;
3527
3528       case 'F':
3529         config_flush_at_shutdown = 1;
3530         break;
3531
3532       case 'j':
3533       {
3534         char journal_dir_actual[PATH_MAX];
3535         journal_dir = realpath((const char *)optarg, journal_dir_actual);
3536         if (journal_dir)
3537         {
3538           // if we were able to properly resolve the path, lets have a copy
3539           // for use outside this block.
3540           journal_dir = strdup(journal_dir);           
3541           status = rrd_mkdir_p(journal_dir, 0777);
3542           if (status != 0)
3543           {
3544             fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3545                     journal_dir, rrd_strerror(errno));
3546             return 6;
3547           }
3548           if (access(journal_dir, R_OK|W_OK|X_OK) != 0)
3549           {
3550             fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3551                     errno ? rrd_strerror(errno) : "");
3552             return 6;
3553           }
3554         } else {
3555           fprintf(stderr, "Unable to resolve journal path (%s,%s)\n", optarg,
3556                   errno ? rrd_strerror(errno) : "");
3557           return 6;
3558         }
3559       }
3560       break;
3561
3562       case 'a':
3563       {
3564         int temp = atoi(optarg);
3565         if (temp > 0)
3566           config_alloc_chunk = temp;
3567         else
3568         {
3569           fprintf(stderr, "Invalid allocation size: %s\n", optarg);
3570           return 10;
3571         }
3572       }
3573       break;
3574
3575       case 'h':
3576       case '?':
3577         printf ("RRDCacheD %s\n"
3578             "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3579             "\n"
3580             "Usage: rrdcached [options]\n"
3581             "\n"
3582             "Valid options are:\n"
3583             "  -l <address>  Socket address to listen to.\n"
3584             "                Default: "RRDCACHED_DEFAULT_ADDRESS"\n"
3585             "  -P <perms>    Sets the permissions to assign to all following "
3586                             "sockets\n"
3587             "  -w <seconds>  Interval in which to write data.\n"
3588             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
3589             "  -t <threads>  Number of write threads.\n"
3590             "  -f <seconds>  Interval in which to flush dead data.\n"
3591             "  -p <file>     Location of the PID-file.\n"
3592             "  -b <dir>      Base directory to change to.\n"
3593             "  -B            Restrict file access to paths within -b <dir>\n"
3594             "  -g            Do not fork and run in the foreground.\n"
3595             "  -j <dir>      Directory in which to create the journal files.\n"
3596             "  -F            Always flush all updates at shutdown\n"
3597             "  -s <id|name>  Group owner of all following UNIX sockets\n"
3598             "                (the socket will also have read/write permissions "
3599                             "for that group)\n"
3600             "  -m <mode>     File permissions (octal) of all following UNIX "
3601                             "sockets\n"
3602             "  -a <size>     Memory allocation chunk size. Default is 1.\n"
3603             "  -O            Do not allow CREATE commands to overwrite existing\n"
3604             "                files, even if asked to.\n"
3605             "\n"
3606             "For more information and a detailed description of all options "
3607             "please refer\n"
3608             "to the rrdcached(1) manual page.\n",
3609             VERSION);
3610         if (option == 'h')
3611           status = -1;
3612         else
3613           status = 1;
3614         break;
3615     } /* switch (option) */
3616   } /* while (getopt) */
3617
3618   /* advise the user when values are not sane */
3619   if (config_flush_interval < 2 * config_write_interval)
3620     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3621             " 2x write interval (-w) !\n");
3622   if (config_write_jitter > config_write_interval)
3623     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3624             " write interval (-w) !\n");
3625
3626   if (config_write_base_only && config_base_dir == NULL)
3627     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3628             "  Consult the rrdcached documentation\n");
3629
3630   if (journal_dir == NULL)
3631     config_flush_at_shutdown = 1;
3632
3633   return (status);
3634 } /* }}} int read_options */
3635
3636 int main (int argc, char **argv)
3637 {
3638   int status;
3639
3640   status = read_options (argc, argv);
3641   if (status != 0)
3642   {
3643     if (status < 0)
3644       status = 0;
3645     return (status);
3646   }
3647
3648   status = daemonize ();
3649   if (status != 0)
3650   {
3651     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3652     return (1);
3653   }
3654
3655   journal_init();
3656
3657   /* start the queue threads */
3658   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3659   if (queue_threads == NULL)
3660   {
3661     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3662     cleanup();
3663     return (1);
3664   }
3665   for (int i = 0; i < config_queue_threads; i++)
3666   {
3667     memset (&queue_threads[i], 0, sizeof (*queue_threads));
3668     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3669     if (status != 0)
3670     {
3671       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3672       cleanup();
3673       return (1);
3674     }
3675   }
3676
3677   /* start the flush thread */
3678   memset(&flush_thread, 0, sizeof(flush_thread));
3679   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3680   if (status != 0)
3681   {
3682     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3683     cleanup();
3684     return (1);
3685   }
3686
3687   listen_thread_main (NULL);
3688   cleanup ();
3689
3690   return (0);
3691 } /* int main */
3692
3693 /*
3694  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3695  */