fix win32 distributables
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008-2010 Florian octo Forster
4  * Copyright (C) 2008,2009 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66
67 #include "rrd_tool.h"
68 #include "rrd_client.h"
69 #include "unused.h"
70
71 #include <stdlib.h>
72
73 #ifndef WIN32
74 #ifdef HAVE_STDINT_H
75 #  include <stdint.h>
76 #endif
77 #include <unistd.h>
78 #include <strings.h>
79 #include <inttypes.h>
80 #include <sys/socket.h>
81
82 #else
83
84 #endif
85 #include <stdio.h>
86 #include <string.h>
87
88 #include <sys/types.h>
89 #include <sys/stat.h>
90 #include <dirent.h>
91 #include <fcntl.h>
92 #include <signal.h>
93 #include <sys/un.h>
94 #include <netdb.h>
95 #include <poll.h>
96 #include <syslog.h>
97 #include <pthread.h>
98 #include <errno.h>
99 #include <assert.h>
100 #include <sys/time.h>
101 #include <time.h>
102 #include <libgen.h>
103 #include <grp.h>
104
105 #ifdef HAVE_LIBWRAP
106 #include <tcpd.h>
107 #endif /* HAVE_LIBWRAP */
108
109 #include <glib-2.0/glib.h>
110 /* }}} */
111
112 #define RRDD_LOG(severity, ...) \
113   do { \
114     if (stay_foreground) { \
115       fprintf(stderr, __VA_ARGS__); \
116       fprintf(stderr, "\n"); } \
117     syslog ((severity), __VA_ARGS__); \
118   } while (0)
119
120 /*
121  * Types
122  */
123 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
124
125 struct listen_socket_s
126 {
127   int fd;
128   char addr[PATH_MAX + 1];
129   int family;
130
131   /* state for BATCH processing */
132   time_t batch_start;
133   int batch_cmd;
134
135   /* buffered IO */
136   char *rbuf;
137   off_t next_cmd;
138   off_t next_read;
139
140   char *wbuf;
141   ssize_t wbuf_len;
142
143   uint32_t permissions;
144
145   gid_t  socket_group;
146   mode_t socket_permissions;
147 };
148 typedef struct listen_socket_s listen_socket_t;
149
150 struct command_s;
151 typedef struct command_s command_t;
152 /* note: guard against "unused" warnings in the handlers */
153 #define DISPATCH_PROTO  listen_socket_t UNUSED(*sock),\
154                         time_t UNUSED(now),\
155                         char  UNUSED(*buffer),\
156                         size_t UNUSED(buffer_size)
157
158 #define HANDLER_PROTO   command_t UNUSED(*cmd),\
159                         DISPATCH_PROTO
160
161 struct command_s {
162   char   *cmd;
163   int (*handler)(HANDLER_PROTO);
164
165   char  context;                /* where we expect to see it */
166 #define CMD_CONTEXT_CLIENT      (1<<0)
167 #define CMD_CONTEXT_BATCH       (1<<1)
168 #define CMD_CONTEXT_JOURNAL     (1<<2)
169 #define CMD_CONTEXT_ANY         (0x7f)
170
171   char *syntax;
172   char *help;
173 };
174
175 struct cache_item_s;
176 typedef struct cache_item_s cache_item_t;
177 struct cache_item_s
178 {
179   char *file;
180   char **values;
181   size_t values_num;            /* number of valid pointers */
182   size_t values_alloc;          /* number of allocated pointers */
183   time_t last_flush_time;
184   double last_update_stamp;
185 #define CI_FLAGS_IN_TREE  (1<<0)
186 #define CI_FLAGS_IN_QUEUE (1<<1)
187   int flags;
188   pthread_cond_t  flushed;
189   cache_item_t *prev;
190   cache_item_t *next;
191 };
192
193 struct callback_flush_data_s
194 {
195   time_t now;
196   time_t abs_timeout;
197   char **keys;
198   size_t keys_num;
199 };
200 typedef struct callback_flush_data_s callback_flush_data_t;
201
202 enum queue_side_e
203 {
204   HEAD,
205   TAIL
206 };
207 typedef enum queue_side_e queue_side_t;
208
209 /* describe a set of journal files */
210 typedef struct {
211   char **files;
212   size_t files_num;
213 } journal_set;
214
215 #define RBUF_SIZE (RRD_CMD_MAX*2)
216
217 /*
218  * Variables
219  */
220 static int stay_foreground = 0;
221 static uid_t daemon_uid;
222
223 static listen_socket_t *listen_fds = NULL;
224 static size_t listen_fds_num = 0;
225
226 static listen_socket_t default_socket;
227
228 enum {
229   RUNNING,              /* normal operation */
230   FLUSHING,             /* flushing remaining values */
231   SHUTDOWN              /* shutting down */
232 } state = RUNNING;
233
234 static pthread_t *queue_threads;
235 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
236 static int config_queue_threads = 4;
237
238 static pthread_t flush_thread;
239 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
240
241 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
242 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
243 static int connection_threads_num = 0;
244
245 /* Cache stuff */
246 static GTree          *cache_tree = NULL;
247 static cache_item_t   *cache_queue_head = NULL;
248 static cache_item_t   *cache_queue_tail = NULL;
249 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
250
251 static int config_write_interval = 300;
252 static int config_write_jitter   = 0;
253 static int config_flush_interval = 3600;
254 static int config_flush_at_shutdown = 0;
255 static char *config_pid_file = NULL;
256 static char *config_base_dir = NULL;
257 static size_t _config_base_dir_len = 0;
258 static int config_write_base_only = 0;
259 static size_t config_alloc_chunk = 1;
260
261 static listen_socket_t **config_listen_address_list = NULL;
262 static size_t config_listen_address_list_len = 0;
263
264 static uint64_t stats_queue_length = 0;
265 static uint64_t stats_updates_received = 0;
266 static uint64_t stats_flush_received = 0;
267 static uint64_t stats_updates_written = 0;
268 static uint64_t stats_data_sets_written = 0;
269 static uint64_t stats_journal_bytes = 0;
270 static uint64_t stats_journal_rotate = 0;
271 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
272
273 static int opt_no_overwrite = 0; /* default for the daemon */
274
275 /* Journaled updates */
276 #define JOURNAL_REPLAY(s) ((s) == NULL)
277 #define JOURNAL_BASE "rrd.journal"
278 static journal_set *journal_cur = NULL;
279 static journal_set *journal_old = NULL;
280 static char *journal_dir = NULL;
281 static FILE *journal_fh = NULL;         /* current journal file handle */
282 static long  journal_size = 0;          /* current journal size */
283 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
284 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
285 static int journal_write(char *cmd, char *args);
286 static void journal_done(void);
287 static void journal_rotate(void);
288
289 /* prototypes for forward refernces */
290 static int handle_request_help (HANDLER_PROTO);
291
292 /* 
293  * Functions
294  */
295 static void sig_common (const char *sig) /* {{{ */
296 {
297   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
298   state = FLUSHING;
299   pthread_cond_broadcast(&flush_cond);
300   pthread_cond_broadcast(&queue_cond);
301 } /* }}} void sig_common */
302
303 static void sig_int_handler (int UNUSED(s)) /* {{{ */
304 {
305   sig_common("INT");
306 } /* }}} void sig_int_handler */
307
308 static void sig_term_handler (int UNUSED(s)) /* {{{ */
309 {
310   sig_common("TERM");
311 } /* }}} void sig_term_handler */
312
313 static void sig_usr1_handler (int UNUSED(s)) /* {{{ */
314 {
315   config_flush_at_shutdown = 1;
316   sig_common("USR1");
317 } /* }}} void sig_usr1_handler */
318
319 static void sig_usr2_handler (int UNUSED(s)) /* {{{ */
320 {
321   config_flush_at_shutdown = 0;
322   sig_common("USR2");
323 } /* }}} void sig_usr2_handler */
324
325 static void install_signal_handlers(void) /* {{{ */
326 {
327   /* These structures are static, because `sigaction' behaves weird if the are
328    * overwritten.. */
329   static struct sigaction sa_int;
330   static struct sigaction sa_term;
331   static struct sigaction sa_pipe;
332   static struct sigaction sa_usr1;
333   static struct sigaction sa_usr2;
334
335   /* Install signal handlers */
336   memset (&sa_int, 0, sizeof (sa_int));
337   sa_int.sa_handler = sig_int_handler;
338   sigaction (SIGINT, &sa_int, NULL);
339
340   memset (&sa_term, 0, sizeof (sa_term));
341   sa_term.sa_handler = sig_term_handler;
342   sigaction (SIGTERM, &sa_term, NULL);
343
344   memset (&sa_pipe, 0, sizeof (sa_pipe));
345   sa_pipe.sa_handler = SIG_IGN;
346   sigaction (SIGPIPE, &sa_pipe, NULL);
347
348   memset (&sa_pipe, 0, sizeof (sa_usr1));
349   sa_usr1.sa_handler = sig_usr1_handler;
350   sigaction (SIGUSR1, &sa_usr1, NULL);
351
352   memset (&sa_usr2, 0, sizeof (sa_usr2));
353   sa_usr2.sa_handler = sig_usr2_handler;
354   sigaction (SIGUSR2, &sa_usr2, NULL);
355
356 } /* }}} void install_signal_handlers */
357
358 static int open_pidfile(char *action, int oflag) /* {{{ */
359 {
360   int fd;
361   const char *file;
362   char *file_copy, *dir;
363
364   file = (config_pid_file != NULL)
365     ? config_pid_file
366     : LOCALSTATEDIR "/run/rrdcached.pid";
367
368   /* dirname may modify its argument */
369   file_copy = strdup(file);
370   if (file_copy == NULL)
371   {
372     fprintf(stderr, "rrdcached: strdup(): %s\n",
373         rrd_strerror(errno));
374     return -1;
375   }
376
377   dir = dirname(file_copy);
378   if (rrd_mkdir_p(dir, 0777) != 0)
379   {
380     fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
381         dir, rrd_strerror(errno));
382     return -1;
383   }
384
385   free(file_copy);
386
387   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
388   if (fd < 0)
389     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
390             action, file, rrd_strerror(errno));
391
392   return(fd);
393 } /* }}} static int open_pidfile */
394
395 /* check existing pid file to see whether a daemon is running */
396 static int check_pidfile(void)
397 {
398   int pid_fd;
399   pid_t pid;
400   char pid_str[16];
401
402   pid_fd = open_pidfile("open", O_RDWR);
403   if (pid_fd < 0)
404     return pid_fd;
405
406   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
407     return -1;
408
409   pid = atoi(pid_str);
410   if (pid <= 0)
411     return -1;
412
413   /* another running process that we can signal COULD be
414    * a competing rrdcached */
415   if (pid != getpid() && kill(pid, 0) == 0)
416   {
417     fprintf(stderr,
418             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
419     close(pid_fd);
420     return -1;
421   }
422
423   lseek(pid_fd, 0, SEEK_SET);
424   if (ftruncate(pid_fd, 0) == -1)
425   {
426     fprintf(stderr,
427             "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
428     close(pid_fd);
429     return -1;
430   }
431
432   fprintf(stderr,
433           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
434           "rrdcached: starting normally.\n", pid);
435
436   return pid_fd;
437 } /* }}} static int check_pidfile */
438
439 static int write_pidfile (int fd) /* {{{ */
440 {
441   pid_t pid;
442   FILE *fh;
443
444   pid = getpid ();
445
446   fh = fdopen (fd, "w");
447   if (fh == NULL)
448   {
449     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
450     close(fd);
451     return (-1);
452   }
453
454   fprintf (fh, "%i\n", (int) pid);
455   fclose (fh);
456
457   return (0);
458 } /* }}} int write_pidfile */
459
460 static int remove_pidfile (void) /* {{{ */
461 {
462   char *file;
463   int status;
464
465   file = (config_pid_file != NULL)
466     ? config_pid_file
467     : LOCALSTATEDIR "/run/rrdcached.pid";
468
469   status = unlink (file);
470   if (status == 0)
471     return (0);
472   return (errno);
473 } /* }}} int remove_pidfile */
474
475 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
476 {
477   char *eol;
478
479   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
480                sock->next_read - sock->next_cmd);
481
482   if (eol == NULL)
483   {
484     /* no commands left, move remainder back to front of rbuf */
485     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
486             sock->next_read - sock->next_cmd);
487     sock->next_read -= sock->next_cmd;
488     sock->next_cmd = 0;
489     *len = 0;
490     return NULL;
491   }
492   else
493   {
494     char *cmd = sock->rbuf + sock->next_cmd;
495     *eol = '\0';
496
497     sock->next_cmd = eol - sock->rbuf + 1;
498
499     if (eol > sock->rbuf && *(eol-1) == '\r')
500       *(--eol) = '\0'; /* handle "\r\n" EOL */
501
502     *len = eol - cmd;
503
504     return cmd;
505   }
506
507   /* NOTREACHED */
508   assert(1==0);
509 } /* }}} char *next_cmd */
510
511 /* add the characters directly to the write buffer */
512 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
513 {
514   char *new_buf;
515
516   assert(sock != NULL);
517
518   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
519   if (new_buf == NULL)
520   {
521     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
522     return -1;
523   }
524
525   strncpy(new_buf + sock->wbuf_len, str, len + 1);
526
527   sock->wbuf = new_buf;
528   sock->wbuf_len += len;
529
530   return 0;
531 } /* }}} static int add_to_wbuf */
532
533 /* add the text to the "extra" info that's sent after the status line */
534 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
535 {
536   va_list argp;
537   char buffer[RRD_CMD_MAX];
538   int len;
539
540   if (JOURNAL_REPLAY(sock)) return 0;
541   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
542
543   va_start(argp, fmt);
544 #ifdef HAVE_VSNPRINTF
545   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
546 #else
547   len = vsprintf(buffer, fmt, argp);
548 #endif
549   va_end(argp);
550   if (len < 0)
551   {
552     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
553     return -1;
554   }
555
556   return add_to_wbuf(sock, buffer, len);
557 } /* }}} static int add_response_info */
558
559 static int count_lines(char *str) /* {{{ */
560 {
561   int lines = 0;
562
563   if (str != NULL)
564   {
565     while ((str = strchr(str, '\n')) != NULL)
566     {
567       ++lines;
568       ++str;
569     }
570   }
571
572   return lines;
573 } /* }}} static int count_lines */
574
575 /* send the response back to the user.
576  * returns 0 on success, -1 on error
577  * write buffer is always zeroed after this call */
578 static int send_response (listen_socket_t *sock, response_code rc,
579                           char *fmt, ...) /* {{{ */
580 {
581   va_list argp;
582   char buffer[RRD_CMD_MAX];
583   int lines;
584   ssize_t wrote;
585   int rclen, len;
586
587   if (JOURNAL_REPLAY(sock)) return rc;
588
589   if (sock->batch_start)
590   {
591     if (rc == RESP_OK)
592       return rc; /* no response on success during BATCH */
593     lines = sock->batch_cmd;
594   }
595   else if (rc == RESP_OK)
596     lines = count_lines(sock->wbuf);
597   else
598     lines = -1;
599
600   rclen = sprintf(buffer, "%d ", lines);
601   va_start(argp, fmt);
602 #ifdef HAVE_VSNPRINTF
603   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
604 #else
605   len = vsprintf(buffer+rclen, fmt, argp);
606 #endif
607   va_end(argp);
608   if (len < 0)
609     return -1;
610
611   len += rclen;
612
613   /* append the result to the wbuf, don't write to the user */
614   if (sock->batch_start)
615     return add_to_wbuf(sock, buffer, len);
616
617   /* first write must be complete */
618   if (len != write(sock->fd, buffer, len))
619   {
620     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
621     return -1;
622   }
623
624   if (sock->wbuf != NULL && rc == RESP_OK)
625   {
626     wrote = 0;
627     while (wrote < sock->wbuf_len)
628     {
629       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
630       if (wb <= 0)
631       {
632         RRDD_LOG(LOG_INFO, "send_response: could not write results");
633         return -1;
634       }
635       wrote += wb;
636     }
637   }
638
639   free(sock->wbuf); sock->wbuf = NULL;
640   sock->wbuf_len = 0;
641
642   return 0;
643 } /* }}} */
644
645 static void wipe_ci_values(cache_item_t *ci, time_t when)
646 {
647   ci->values = NULL;
648   ci->values_num = 0;
649   ci->values_alloc = 0;
650
651   ci->last_flush_time = when;
652   if (config_write_jitter > 0)
653     ci->last_flush_time += (rrd_random() % config_write_jitter);
654 }
655
656 /* remove_from_queue
657  * remove a "cache_item_t" item from the queue.
658  * must hold 'cache_lock' when calling this
659  */
660 static void remove_from_queue(cache_item_t *ci) /* {{{ */
661 {
662   if (ci == NULL) return;
663   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
664
665   if (ci->prev == NULL)
666     cache_queue_head = ci->next; /* reset head */
667   else
668     ci->prev->next = ci->next;
669
670   if (ci->next == NULL)
671     cache_queue_tail = ci->prev; /* reset the tail */
672   else
673     ci->next->prev = ci->prev;
674
675   ci->next = ci->prev = NULL;
676   ci->flags &= ~CI_FLAGS_IN_QUEUE;
677
678   pthread_mutex_lock (&stats_lock);
679   assert (stats_queue_length > 0);
680   stats_queue_length--;
681   pthread_mutex_unlock (&stats_lock);
682
683 } /* }}} static void remove_from_queue */
684
685 /* free the resources associated with the cache_item_t
686  * must hold cache_lock when calling this function
687  */
688 static void *free_cache_item(cache_item_t *ci) /* {{{ */
689 {
690   if (ci == NULL) return NULL;
691
692   remove_from_queue(ci);
693
694   for (size_t i=0; i < ci->values_num; i++)
695     free(ci->values[i]);
696
697   free (ci->values);
698   free (ci->file);
699
700   /* in case anyone is waiting */
701   pthread_cond_broadcast(&ci->flushed);
702   pthread_cond_destroy(&ci->flushed);
703
704   free (ci);
705
706   return NULL;
707 } /* }}} static void *free_cache_item */
708
709 /*
710  * enqueue_cache_item:
711  * `cache_lock' must be acquired before calling this function!
712  */
713 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
714     queue_side_t side)
715 {
716   if (ci == NULL)
717     return (-1);
718
719   if (ci->values_num == 0)
720     return (0);
721
722   if (side == HEAD)
723   {
724     if (cache_queue_head == ci)
725       return 0;
726
727     /* remove if further down in queue */
728     remove_from_queue(ci);
729
730     ci->prev = NULL;
731     ci->next = cache_queue_head;
732     if (ci->next != NULL)
733       ci->next->prev = ci;
734     cache_queue_head = ci;
735
736     if (cache_queue_tail == NULL)
737       cache_queue_tail = cache_queue_head;
738   }
739   else /* (side == TAIL) */
740   {
741     /* We don't move values back in the list.. */
742     if (ci->flags & CI_FLAGS_IN_QUEUE)
743       return (0);
744
745     assert (ci->next == NULL);
746     assert (ci->prev == NULL);
747
748     ci->prev = cache_queue_tail;
749
750     if (cache_queue_tail == NULL)
751       cache_queue_head = ci;
752     else
753       cache_queue_tail->next = ci;
754
755     cache_queue_tail = ci;
756   }
757
758   ci->flags |= CI_FLAGS_IN_QUEUE;
759
760   pthread_cond_signal(&queue_cond);
761   pthread_mutex_lock (&stats_lock);
762   stats_queue_length++;
763   pthread_mutex_unlock (&stats_lock);
764
765   return (0);
766 } /* }}} int enqueue_cache_item */
767
768 /*
769  * tree_callback_flush:
770  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
771  * while this is in progress.
772  */
773 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
774     gpointer data)
775 {
776   cache_item_t *ci;
777   callback_flush_data_t *cfd;
778
779   ci = (cache_item_t *) value;
780   cfd = (callback_flush_data_t *) data;
781
782   if (ci->flags & CI_FLAGS_IN_QUEUE)
783     return FALSE;
784
785   if (ci->values_num > 0
786       && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
787   {
788     enqueue_cache_item (ci, TAIL);
789   }
790   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
791       && (ci->values_num <= 0))
792   {
793     assert ((char *) key == ci->file);
794     if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
795     {
796       RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
797       return (FALSE);
798     }
799   }
800
801   return (FALSE);
802 } /* }}} gboolean tree_callback_flush */
803
804 static int flush_old_values (int max_age)
805 {
806   callback_flush_data_t cfd;
807   size_t k;
808
809   memset (&cfd, 0, sizeof (cfd));
810   /* Pass the current time as user data so that we don't need to call
811    * `time' for each node. */
812   cfd.now = time (NULL);
813   cfd.keys = NULL;
814   cfd.keys_num = 0;
815
816   if (max_age > 0)
817     cfd.abs_timeout = cfd.now - max_age;
818   else
819     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
820
821   /* `tree_callback_flush' will return the keys of all values that haven't
822    * been touched in the last `config_flush_interval' seconds in `cfd'.
823    * The char*'s in this array point to the same memory as ci->file, so we
824    * don't need to free them separately. */
825   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
826
827   for (k = 0; k < cfd.keys_num; k++)
828   {
829     gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
830     /* should never fail, since we have held the cache_lock
831      * the entire time */
832     assert(status == TRUE);
833   }
834
835   if (cfd.keys != NULL)
836   {
837     free (cfd.keys);
838     cfd.keys = NULL;
839   }
840
841   return (0);
842 } /* int flush_old_values */
843
844 static void *flush_thread_main (void UNUSED(*args)) /* {{{ */
845 {
846   struct timeval now;
847   struct timespec next_flush;
848   int status;
849
850   gettimeofday (&now, NULL);
851   next_flush.tv_sec = now.tv_sec + config_flush_interval;
852   next_flush.tv_nsec = 1000 * now.tv_usec;
853
854   pthread_mutex_lock(&cache_lock);
855
856   while (state == RUNNING)
857   {
858     gettimeofday (&now, NULL);
859     if ((now.tv_sec > next_flush.tv_sec)
860         || ((now.tv_sec == next_flush.tv_sec)
861           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
862     {
863       RRDD_LOG(LOG_DEBUG, "flushing old values");
864
865       /* Determine the time of the next cache flush. */
866       next_flush.tv_sec = now.tv_sec + config_flush_interval;
867
868       /* Flush all values that haven't been written in the last
869        * `config_write_interval' seconds. */
870       flush_old_values (config_write_interval);
871
872       /* unlock the cache while we rotate so we don't block incoming
873        * updates if the fsync() blocks on disk I/O */
874       pthread_mutex_unlock(&cache_lock);
875       journal_rotate();
876       pthread_mutex_lock(&cache_lock);
877     }
878
879     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
880     if (status != 0 && status != ETIMEDOUT)
881     {
882       RRDD_LOG (LOG_ERR, "flush_thread_main: "
883                 "pthread_cond_timedwait returned %i.", status);
884     }
885   }
886
887   if (config_flush_at_shutdown)
888     flush_old_values (-1); /* flush everything */
889
890   state = SHUTDOWN;
891
892   pthread_mutex_unlock(&cache_lock);
893
894   return NULL;
895 } /* void *flush_thread_main */
896
897 static void *queue_thread_main (void UNUSED(*args)) /* {{{ */
898 {
899   pthread_mutex_lock (&cache_lock);
900
901   while (state != SHUTDOWN
902          || (cache_queue_head != NULL && config_flush_at_shutdown))
903   {
904     cache_item_t *ci;
905     char *file;
906     char **values;
907     size_t values_num;
908     int status;
909
910     /* Now, check if there's something to store away. If not, wait until
911      * something comes in. */
912     if (cache_queue_head == NULL)
913     {
914       status = pthread_cond_wait (&queue_cond, &cache_lock);
915       if ((status != 0) && (status != ETIMEDOUT))
916       {
917         RRDD_LOG (LOG_ERR, "queue_thread_main: "
918             "pthread_cond_wait returned %i.", status);
919       }
920     }
921
922     /* Check if a value has arrived. This may be NULL if we timed out or there
923      * was an interrupt such as a signal. */
924     if (cache_queue_head == NULL)
925       continue;
926
927     ci = cache_queue_head;
928
929     /* copy the relevant parts */
930     file = strdup (ci->file);
931     if (file == NULL)
932     {
933       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
934       continue;
935     }
936
937     assert(ci->values != NULL);
938     assert(ci->values_num > 0);
939
940     values = ci->values;
941     values_num = ci->values_num;
942
943     wipe_ci_values(ci, time(NULL));
944     remove_from_queue(ci);
945
946     pthread_mutex_unlock (&cache_lock);
947
948     rrd_clear_error ();
949     status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
950     if (status != 0)
951     {
952       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
953           "rrd_update_r (%s) failed with status %i. (%s)",
954           file, status, rrd_get_error());
955     }
956
957     journal_write("wrote", file);
958
959     /* Search again in the tree.  It's possible someone issued a "FORGET"
960      * while we were writing the update values. */
961     pthread_mutex_lock(&cache_lock);
962     ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
963     if (ci)
964       pthread_cond_broadcast(&ci->flushed);
965     pthread_mutex_unlock(&cache_lock);
966
967     if (status == 0)
968     {
969       pthread_mutex_lock (&stats_lock);
970       stats_updates_written++;
971       stats_data_sets_written += values_num;
972       pthread_mutex_unlock (&stats_lock);
973     }
974
975     rrd_free_ptrs((void ***) &values, &values_num);
976     free(file);
977
978     pthread_mutex_lock (&cache_lock);
979   }
980   pthread_mutex_unlock (&cache_lock);
981
982   return (NULL);
983 } /* }}} void *queue_thread_main */
984
985 static int buffer_get_field (char **buffer_ret, /* {{{ */
986     size_t *buffer_size_ret, char **field_ret)
987 {
988   char *buffer;
989   size_t buffer_pos;
990   size_t buffer_size;
991   char *field;
992   size_t field_size;
993   int status;
994
995   buffer = *buffer_ret;
996   buffer_pos = 0;
997   buffer_size = *buffer_size_ret;
998   field = *buffer_ret;
999   field_size = 0;
1000
1001   if (buffer_size <= 0)
1002     return (-1);
1003
1004   /* This is ensured by `handle_request'. */
1005   assert (buffer[buffer_size - 1] == '\0');
1006
1007   status = -1;
1008   while (buffer_pos < buffer_size)
1009   {
1010     /* Check for end-of-field or end-of-buffer */
1011     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1012     {
1013       field[field_size] = 0;
1014       field_size++;
1015       buffer_pos++;
1016       status = 0;
1017       break;
1018     }
1019     /* Handle escaped characters. */
1020     else if (buffer[buffer_pos] == '\\')
1021     {
1022       if (buffer_pos >= (buffer_size - 1))
1023         break;
1024       buffer_pos++;
1025       field[field_size] = buffer[buffer_pos];
1026       field_size++;
1027       buffer_pos++;
1028     }
1029     /* Normal operation */ 
1030     else
1031     {
1032       field[field_size] = buffer[buffer_pos];
1033       field_size++;
1034       buffer_pos++;
1035     }
1036   } /* while (buffer_pos < buffer_size) */
1037
1038   if (status != 0)
1039     return (status);
1040
1041   *buffer_ret = buffer + buffer_pos;
1042   *buffer_size_ret = buffer_size - buffer_pos;
1043   *field_ret = field;
1044
1045   return (0);
1046 } /* }}} int buffer_get_field */
1047
1048 /* if we're restricting writes to the base directory,
1049  * check whether the file falls within the dir
1050  * returns 1 if OK, otherwise 0
1051  */
1052 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1053 {
1054   assert(file != NULL);
1055
1056   if (!config_write_base_only
1057       || JOURNAL_REPLAY(sock)
1058       || config_base_dir == NULL)
1059     return 1;
1060
1061   if (strstr(file, "../") != NULL) goto err;
1062
1063   /* relative paths without "../" are ok */
1064   if (*file != '/') return 1;
1065
1066   /* file must be of the format base + "/" + <1+ char filename> */
1067   if (strlen(file) < _config_base_dir_len + 2) goto err;
1068   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1069   if (*(file + _config_base_dir_len) != '/') goto err;
1070
1071   return 1;
1072
1073 err:
1074   if (sock != NULL && sock->fd >= 0)
1075     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1076
1077   return 0;
1078 } /* }}} static int check_file_access */
1079
1080 /* when using a base dir, convert relative paths to absolute paths.
1081  * if necessary, modifies the "filename" pointer to point
1082  * to the new path created in "tmp".  "tmp" is provided
1083  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1084  *
1085  * this allows us to optimize for the expected case (absolute path)
1086  * with a no-op.
1087  */
1088 static void get_abs_path(char **filename, char *tmp)
1089 {
1090   assert(tmp != NULL);
1091   assert(filename != NULL && *filename != NULL);
1092
1093   if (config_base_dir == NULL || **filename == '/')
1094     return;
1095
1096   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1097   *filename = tmp;
1098 } /* }}} static int get_abs_path */
1099
1100 static int flush_file (const char *filename) /* {{{ */
1101 {
1102   cache_item_t *ci;
1103
1104   pthread_mutex_lock (&cache_lock);
1105
1106   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1107   if (ci == NULL)
1108   {
1109     pthread_mutex_unlock (&cache_lock);
1110     return (ENOENT);
1111   }
1112
1113   if (ci->values_num > 0)
1114   {
1115     /* Enqueue at head */
1116     enqueue_cache_item (ci, HEAD);
1117     pthread_cond_wait(&ci->flushed, &cache_lock);
1118   }
1119
1120   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1121    * may have been purged during our cond_wait() */
1122
1123   pthread_mutex_unlock(&cache_lock);
1124
1125   return (0);
1126 } /* }}} int flush_file */
1127
1128 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1129 {
1130   char *err = "Syntax error.\n";
1131
1132   if (cmd && cmd->syntax)
1133     err = cmd->syntax;
1134
1135   return send_response(sock, RESP_ERR, "Usage: %s", err);
1136 } /* }}} static int syntax_error() */
1137
1138 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1139 {
1140   uint64_t copy_queue_length;
1141   uint64_t copy_updates_received;
1142   uint64_t copy_flush_received;
1143   uint64_t copy_updates_written;
1144   uint64_t copy_data_sets_written;
1145   uint64_t copy_journal_bytes;
1146   uint64_t copy_journal_rotate;
1147
1148   uint64_t tree_nodes_number;
1149   uint64_t tree_depth;
1150
1151   pthread_mutex_lock (&stats_lock);
1152   copy_queue_length       = stats_queue_length;
1153   copy_updates_received   = stats_updates_received;
1154   copy_flush_received     = stats_flush_received;
1155   copy_updates_written    = stats_updates_written;
1156   copy_data_sets_written  = stats_data_sets_written;
1157   copy_journal_bytes      = stats_journal_bytes;
1158   copy_journal_rotate     = stats_journal_rotate;
1159   pthread_mutex_unlock (&stats_lock);
1160
1161   pthread_mutex_lock (&cache_lock);
1162   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1163   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1164   pthread_mutex_unlock (&cache_lock);
1165
1166   add_response_info(sock,
1167                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1168   add_response_info(sock,
1169                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1170   add_response_info(sock,
1171                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1172   add_response_info(sock,
1173                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1174   add_response_info(sock,
1175                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1176   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1177   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1178   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1179   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1180
1181   send_response(sock, RESP_OK, "Statistics follow\n");
1182
1183   return (0);
1184 } /* }}} int handle_request_stats */
1185
1186 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1187 {
1188   char *file, file_tmp[PATH_MAX];
1189   int status;
1190
1191   status = buffer_get_field (&buffer, &buffer_size, &file);
1192   if (status != 0)
1193   {
1194     return syntax_error(sock,cmd);
1195   }
1196   else
1197   {
1198     pthread_mutex_lock(&stats_lock);
1199     stats_flush_received++;
1200     pthread_mutex_unlock(&stats_lock);
1201
1202     get_abs_path(&file, file_tmp);
1203     if (!check_file_access(file, sock)) return 0;
1204
1205     status = flush_file (file);
1206     if (status == 0)
1207       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1208     else if (status == ENOENT)
1209     {
1210       /* no file in our tree; see whether it exists at all */
1211       struct stat statbuf;
1212
1213       memset(&statbuf, 0, sizeof(statbuf));
1214       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1215         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1216       else
1217         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1218     }
1219     else if (status < 0)
1220       return send_response(sock, RESP_ERR, "Internal error.\n");
1221     else
1222       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1223   }
1224
1225   /* NOTREACHED */
1226   assert(1==0);
1227 } /* }}} int handle_request_flush */
1228
1229 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1230 {
1231   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1232
1233   pthread_mutex_lock(&cache_lock);
1234   flush_old_values(-1);
1235   pthread_mutex_unlock(&cache_lock);
1236
1237   return send_response(sock, RESP_OK, "Started flush.\n");
1238 } /* }}} static int handle_request_flushall */
1239
1240 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1241 {
1242   int status;
1243   char *file, file_tmp[PATH_MAX];
1244   cache_item_t *ci;
1245
1246   status = buffer_get_field(&buffer, &buffer_size, &file);
1247   if (status != 0)
1248     return syntax_error(sock,cmd);
1249
1250   get_abs_path(&file, file_tmp);
1251
1252   pthread_mutex_lock(&cache_lock);
1253   ci = g_tree_lookup(cache_tree, file);
1254   if (ci == NULL)
1255   {
1256     pthread_mutex_unlock(&cache_lock);
1257     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1258   }
1259
1260   for (size_t i=0; i < ci->values_num; i++)
1261     add_response_info(sock, "%s\n", ci->values[i]);
1262
1263   pthread_mutex_unlock(&cache_lock);
1264   return send_response(sock, RESP_OK, "updates pending\n");
1265 } /* }}} static int handle_request_pending */
1266
1267 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1268 {
1269   int status;
1270   gboolean found;
1271   char *file, file_tmp[PATH_MAX];
1272
1273   status = buffer_get_field(&buffer, &buffer_size, &file);
1274   if (status != 0)
1275     return syntax_error(sock,cmd);
1276
1277   get_abs_path(&file, file_tmp);
1278   if (!check_file_access(file, sock)) return 0;
1279
1280   pthread_mutex_lock(&cache_lock);
1281   found = g_tree_remove(cache_tree, file);
1282   pthread_mutex_unlock(&cache_lock);
1283
1284   if (found == TRUE)
1285   {
1286     if (!JOURNAL_REPLAY(sock))
1287       journal_write("forget", file);
1288
1289     return send_response(sock, RESP_OK, "Gone!\n");
1290   }
1291   else
1292     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1293
1294   /* NOTREACHED */
1295   assert(1==0);
1296 } /* }}} static int handle_request_forget */
1297
1298 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1299 {
1300   cache_item_t *ci;
1301
1302   pthread_mutex_lock(&cache_lock);
1303
1304   ci = cache_queue_head;
1305   while (ci != NULL)
1306   {
1307     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1308     ci = ci->next;
1309   }
1310
1311   pthread_mutex_unlock(&cache_lock);
1312
1313   return send_response(sock, RESP_OK, "in queue.\n");
1314 } /* }}} int handle_request_queue */
1315
1316 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1317 {
1318   char *file, file_tmp[PATH_MAX];
1319   int values_num = 0;
1320   int status;
1321   char orig_buf[RRD_CMD_MAX];
1322
1323   cache_item_t *ci;
1324
1325   /* save it for the journal later */
1326   if (!JOURNAL_REPLAY(sock))
1327     strncpy(orig_buf, buffer, min(RRD_CMD_MAX,buffer_size));
1328
1329   status = buffer_get_field (&buffer, &buffer_size, &file);
1330   if (status != 0)
1331     return syntax_error(sock,cmd);
1332
1333   pthread_mutex_lock(&stats_lock);
1334   stats_updates_received++;
1335   pthread_mutex_unlock(&stats_lock);
1336
1337   get_abs_path(&file, file_tmp);
1338   if (!check_file_access(file, sock)) return 0;
1339
1340   pthread_mutex_lock (&cache_lock);
1341   ci = g_tree_lookup (cache_tree, file);
1342
1343   if (ci == NULL) /* {{{ */
1344   {
1345     struct stat statbuf;
1346     cache_item_t *tmp;
1347
1348     /* don't hold the lock while we setup; stat(2) might block */
1349     pthread_mutex_unlock(&cache_lock);
1350
1351     memset (&statbuf, 0, sizeof (statbuf));
1352     status = stat (file, &statbuf);
1353     if (status != 0)
1354     {
1355       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1356
1357       status = errno;
1358       if (status == ENOENT)
1359         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1360       else
1361         return send_response(sock, RESP_ERR,
1362                              "stat failed with error %i.\n", status);
1363     }
1364     if (!S_ISREG (statbuf.st_mode))
1365       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1366
1367     if (access(file, R_OK|W_OK) != 0)
1368       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1369                            file, rrd_strerror(errno));
1370
1371     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1372     if (ci == NULL)
1373     {
1374       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1375
1376       return send_response(sock, RESP_ERR, "malloc failed.\n");
1377     }
1378     memset (ci, 0, sizeof (cache_item_t));
1379
1380     ci->file = strdup (file);
1381     if (ci->file == NULL)
1382     {
1383       free (ci);
1384       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1385
1386       return send_response(sock, RESP_ERR, "strdup failed.\n");
1387     }
1388
1389     wipe_ci_values(ci, now);
1390     ci->flags = CI_FLAGS_IN_TREE;
1391     pthread_cond_init(&ci->flushed, NULL);
1392
1393     pthread_mutex_lock(&cache_lock);
1394
1395     /* another UPDATE might have added this entry in the meantime */
1396     tmp = g_tree_lookup (cache_tree, file);
1397     if (tmp == NULL)
1398       g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1399     else
1400     {
1401       free_cache_item (ci);
1402       ci = tmp;
1403     }
1404
1405     /* state may have changed while we were unlocked */
1406     if (state == SHUTDOWN)
1407       return -1;
1408   } /* }}} */
1409   assert (ci != NULL);
1410
1411   /* don't re-write updates in replay mode */
1412   if (!JOURNAL_REPLAY(sock))
1413     journal_write("update", orig_buf);
1414
1415   while (buffer_size > 0)
1416   {
1417     char *value;
1418     double stamp;
1419     char *eostamp;
1420
1421     status = buffer_get_field (&buffer, &buffer_size, &value);
1422     if (status != 0)
1423     {
1424       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1425       break;
1426     }
1427
1428     /* make sure update time is always moving forward. We use double here since
1429        update does support subsecond precision for timestamps ... */
1430     stamp = strtod(value, &eostamp);
1431     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1432     {
1433       pthread_mutex_unlock(&cache_lock);
1434       return send_response(sock, RESP_ERR,
1435                            "Cannot find timestamp in '%s'!\n", value);
1436     }
1437     else if (stamp <= ci->last_update_stamp)
1438     {
1439       pthread_mutex_unlock(&cache_lock);
1440       return send_response(sock, RESP_ERR,
1441                            "illegal attempt to update using time %lf when last"
1442                            " update time is %lf (minimum one second step)\n",
1443                            stamp, ci->last_update_stamp);
1444     }
1445     else
1446       ci->last_update_stamp = stamp;
1447
1448     if (!rrd_add_strdup_chunk(&ci->values, &ci->values_num, value,
1449                               &ci->values_alloc, config_alloc_chunk))
1450     {
1451       RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1452       continue;
1453     }
1454
1455     values_num++;
1456   }
1457
1458   if (((now - ci->last_flush_time) >= config_write_interval)
1459       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1460       && (ci->values_num > 0))
1461   {
1462     enqueue_cache_item (ci, TAIL);
1463   }
1464
1465   pthread_mutex_unlock (&cache_lock);
1466
1467   if (values_num < 1)
1468     return send_response(sock, RESP_ERR, "No values updated.\n");
1469   else
1470     return send_response(sock, RESP_OK,
1471                          "errors, enqueued %i value(s).\n", values_num);
1472
1473   /* NOTREACHED */
1474   assert(1==0);
1475
1476 } /* }}} int handle_request_update */
1477
1478 static int handle_request_fetch (HANDLER_PROTO) /* {{{ */
1479 {
1480   char *file, file_tmp[PATH_MAX];
1481   char *cf;
1482
1483   char *start_str;
1484   char *end_str;
1485   time_t start_tm;
1486   time_t end_tm;
1487
1488   unsigned long step;
1489   unsigned long ds_cnt;
1490   char **ds_namv;
1491   rrd_value_t *data;
1492
1493   int status;
1494   unsigned long i;
1495   time_t t;
1496   rrd_value_t *data_ptr;
1497
1498   file = NULL;
1499   cf = NULL;
1500   start_str = NULL;
1501   end_str = NULL;
1502
1503   /* Read the arguments */
1504   do /* while (0) */
1505   {
1506     status = buffer_get_field (&buffer, &buffer_size, &file);
1507     if (status != 0)
1508       break;
1509
1510     status = buffer_get_field (&buffer, &buffer_size, &cf);
1511     if (status != 0)
1512       break;
1513
1514     status = buffer_get_field (&buffer, &buffer_size, &start_str);
1515     if (status != 0)
1516     {
1517       start_str = NULL;
1518       status = 0;
1519       break;
1520     }
1521
1522     status = buffer_get_field (&buffer, &buffer_size, &end_str);
1523     if (status != 0)
1524     {
1525       end_str = NULL;
1526       status = 0;
1527       break;
1528     }
1529   } while (0);
1530
1531   if (status != 0)
1532     return (syntax_error(sock,cmd));
1533
1534   get_abs_path(&file, file_tmp);
1535   if (!check_file_access(file, sock)) return 0;
1536
1537   status = flush_file (file);
1538   if ((status != 0) && (status != ENOENT))
1539     return (send_response (sock, RESP_ERR,
1540           "flush_file (%s) failed with status %i.\n", file, status));
1541
1542   t = time (NULL); /* "now" */
1543
1544   /* Parse start time */
1545   if (start_str != NULL)
1546   {
1547     char *endptr;
1548     long value;
1549
1550     endptr = NULL;
1551     errno = 0;
1552     value = strtol (start_str, &endptr, /* base = */ 0);
1553     if ((endptr == start_str) || (errno != 0))
1554       return (send_response(sock, RESP_ERR,
1555             "Cannot parse start time `%s': Only simple integers are allowed.\n",
1556             start_str));
1557
1558     if (value > 0)
1559       start_tm = (time_t) value;
1560     else
1561       start_tm = (time_t) (t + value);
1562   }
1563   else
1564   {
1565     start_tm = t - 86400;
1566   }
1567
1568   /* Parse end time */
1569   if (end_str != NULL)
1570   {
1571     char *endptr;
1572     long value;
1573
1574     endptr = NULL;
1575     errno = 0;
1576     value = strtol (end_str, &endptr, /* base = */ 0);
1577     if ((endptr == end_str) || (errno != 0))
1578       return (send_response(sock, RESP_ERR,
1579             "Cannot parse end time `%s': Only simple integers are allowed.\n",
1580             end_str));
1581
1582     if (value > 0)
1583       end_tm = (time_t) value;
1584     else
1585       end_tm = (time_t) (t + value);
1586   }
1587   else
1588   {
1589     end_tm = t;
1590   }
1591
1592   step = -1;
1593   ds_cnt = 0;
1594   ds_namv = NULL;
1595   data = NULL;
1596
1597   status = rrd_fetch_r (file, cf, &start_tm, &end_tm, &step,
1598       &ds_cnt, &ds_namv, &data);
1599   if (status != 0)
1600     return (send_response(sock, RESP_ERR,
1601           "rrd_fetch_r failed: %s\n", rrd_get_error ()));
1602
1603   add_response_info (sock, "FlushVersion: %lu\n", 1);
1604   add_response_info (sock, "Start: %lu\n", (unsigned long) start_tm);
1605   add_response_info (sock, "End: %lu\n", (unsigned long) end_tm);
1606   add_response_info (sock, "Step: %lu\n", step);
1607   add_response_info (sock, "DSCount: %lu\n", ds_cnt);
1608
1609 #define SSTRCAT(buffer,str,buffer_fill) do { \
1610     size_t str_len = strlen (str); \
1611     if ((buffer_fill + str_len) > sizeof (buffer)) \
1612       str_len = sizeof (buffer) - buffer_fill; \
1613     if (str_len > 0) { \
1614       strncpy (buffer + buffer_fill, str, str_len); \
1615       buffer_fill += str_len; \
1616       assert (buffer_fill <= sizeof (buffer)); \
1617       if (buffer_fill == sizeof (buffer)) \
1618         buffer[buffer_fill - 1] = 0; \
1619       else \
1620         buffer[buffer_fill] = 0; \
1621     } \
1622   } while (0)
1623
1624   { /* Add list of DS names */
1625     char linebuf[1024];
1626     size_t linebuf_fill;
1627
1628     memset (linebuf, 0, sizeof (linebuf));
1629     linebuf_fill = 0;
1630     for (i = 0; i < ds_cnt; i++)
1631     {
1632       if (i > 0)
1633         SSTRCAT (linebuf, " ", linebuf_fill);
1634       SSTRCAT (linebuf, ds_namv[i], linebuf_fill);
1635       rrd_freemem(ds_namv[i]);
1636     }
1637     rrd_freemem(ds_namv);
1638     add_response_info (sock, "DSName: %s\n", linebuf);
1639   }
1640
1641   /* Add the actual data */
1642   assert (step > 0);
1643   data_ptr = data;
1644   for (t = start_tm + step; t <= end_tm; t += step)
1645   {
1646     char linebuf[1024];
1647     size_t linebuf_fill;
1648     char tmp[128];
1649
1650     memset (linebuf, 0, sizeof (linebuf));
1651     linebuf_fill = 0;
1652     for (i = 0; i < ds_cnt; i++)
1653     {
1654       snprintf (tmp, sizeof (tmp), " %0.10e", *data_ptr);
1655       tmp[sizeof (tmp) - 1] = 0;
1656       SSTRCAT (linebuf, tmp, linebuf_fill);
1657
1658       data_ptr++;
1659     }
1660
1661     add_response_info (sock, "%10lu:%s\n", (unsigned long) t, linebuf);
1662   } /* for (t) */
1663   rrd_freemem(data);
1664
1665   return (send_response (sock, RESP_OK, "Success\n"));
1666 #undef SSTRCAT
1667 } /* }}} int handle_request_fetch */
1668
1669 /* we came across a "WROTE" entry during journal replay.
1670  * throw away any values that we have accumulated for this file
1671  */
1672 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1673 {
1674   cache_item_t *ci;
1675   const char *file = buffer;
1676
1677   pthread_mutex_lock(&cache_lock);
1678
1679   ci = g_tree_lookup(cache_tree, file);
1680   if (ci == NULL)
1681   {
1682     pthread_mutex_unlock(&cache_lock);
1683     return (0);
1684   }
1685
1686   if (ci->values)
1687     rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1688
1689   wipe_ci_values(ci, now);
1690   remove_from_queue(ci);
1691
1692   pthread_mutex_unlock(&cache_lock);
1693   return (0);
1694 } /* }}} int handle_request_wrote */
1695
1696 static int handle_request_info (HANDLER_PROTO) /* {{{ */
1697 {
1698   char *file, file_tmp[PATH_MAX];
1699   int status;
1700   rrd_info_t *info;
1701
1702   /* obtain filename */
1703   status = buffer_get_field(&buffer, &buffer_size, &file);
1704   if (status != 0)
1705     return syntax_error(sock,cmd);
1706   /* get full pathname */
1707   get_abs_path(&file, file_tmp);
1708   if (!check_file_access(file, sock)) {
1709     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1710   }
1711   /* get data */
1712   rrd_clear_error ();
1713   info = rrd_info_r(file);
1714   if(!info) {
1715     return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1716   }
1717   for (rrd_info_t *data = info; data != NULL; data = data->next) {
1718       switch (data->type) {
1719       case RD_I_VAL:
1720           if (isnan(data->value.u_val))
1721               add_response_info(sock,"%s %d NaN\n",data->key, data->type);
1722           else
1723               add_response_info(sock,"%s %d %0.10e\n", data->key, data->type, data->value.u_val);
1724           break;
1725       case RD_I_CNT:
1726           add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_cnt);
1727           break;
1728       case RD_I_INT:
1729           add_response_info(sock,"%s %d %d\n", data->key, data->type, data->value.u_int);
1730           break;
1731       case RD_I_STR:
1732           add_response_info(sock,"%s %d %s\n", data->key, data->type, data->value.u_str);
1733           break;
1734       case RD_I_BLO:
1735           add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_blo.size);
1736           break;
1737       }
1738   }
1739
1740   rrd_info_free(info);
1741
1742   return send_response(sock, RESP_OK, "Info for %s follows\n",file);
1743 } /* }}} static int handle_request_info  */
1744
1745 static int handle_request_first (HANDLER_PROTO) /* {{{ */
1746 {
1747   char *i, *file, file_tmp[PATH_MAX];
1748   int status;
1749   int idx;
1750   time_t t;
1751
1752   /* obtain filename */
1753   status = buffer_get_field(&buffer, &buffer_size, &file);
1754   if (status != 0)
1755     return syntax_error(sock,cmd);
1756   /* get full pathname */
1757   get_abs_path(&file, file_tmp);
1758   if (!check_file_access(file, sock)) {
1759     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1760   }
1761
1762   status = buffer_get_field(&buffer, &buffer_size, &i);
1763   if (status != 0)
1764     return syntax_error(sock,cmd);
1765   idx = atoi(i);
1766   if(idx<0) { 
1767     return send_response(sock, RESP_ERR, "Invalid index specified (%d)\n", idx);
1768   }
1769
1770   /* get data */
1771   rrd_clear_error ();
1772   t = rrd_first_r(file,idx);
1773   if(t<1) {
1774     return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1775   }
1776   return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1777 } /* }}} static int handle_request_first  */
1778
1779
1780 static int handle_request_last (HANDLER_PROTO) /* {{{ */
1781 {
1782   char *file, file_tmp[PATH_MAX];
1783   int status;
1784   time_t t, from_file, step;
1785   rrd_file_t * rrd_file;
1786   cache_item_t * ci;
1787   rrd_t rrd; 
1788
1789   /* obtain filename */
1790   status = buffer_get_field(&buffer, &buffer_size, &file);
1791   if (status != 0)
1792     return syntax_error(sock,cmd);
1793   /* get full pathname */
1794   get_abs_path(&file, file_tmp);
1795   if (!check_file_access(file, sock)) {
1796     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1797   }
1798   rrd_clear_error();
1799   rrd_init(&rrd);
1800   rrd_file = rrd_open(file,&rrd,RRD_READONLY);
1801   if(!rrd_file) {
1802     return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1803   }
1804   from_file = rrd.live_head->last_up;
1805   step = rrd.stat_head->pdp_step;
1806   rrd_close(rrd_file);
1807   pthread_mutex_lock(&cache_lock);
1808   ci = g_tree_lookup(cache_tree, file);
1809   if (ci)
1810     t = ci->last_update_stamp;
1811   else
1812     t = from_file;
1813   pthread_mutex_unlock(&cache_lock);
1814   t -= t % step;
1815   rrd_free(&rrd);
1816   if(t<1) {
1817     return send_response(sock, RESP_ERR, "Error: rrdcached: Invalid timestamp returned\n");
1818   }
1819   return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1820 } /* }}} static int handle_request_last  */
1821
1822 static int handle_request_create (HANDLER_PROTO) /* {{{ */
1823 {
1824   char *file, file_tmp[PATH_MAX];
1825   char *tok;
1826   int ac = 0;
1827   char *av[128];
1828   int status;
1829   unsigned long step = 300;
1830   time_t last_up = time(NULL)-10;
1831   int no_overwrite = opt_no_overwrite;
1832
1833
1834   /* obtain filename */
1835   status = buffer_get_field(&buffer, &buffer_size, &file);
1836   if (status != 0)
1837     return syntax_error(sock,cmd);
1838   /* get full pathname */
1839   get_abs_path(&file, file_tmp);
1840   if (!check_file_access(file, sock)) {
1841     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1842   }
1843   RRDD_LOG(LOG_INFO, "rrdcreate request for %s",file);
1844
1845   while ((status = buffer_get_field(&buffer, &buffer_size, &tok)) == 0 && tok) {
1846     if( ! strncmp(tok,"-b",2) ) {
1847       status = buffer_get_field(&buffer, &buffer_size, &tok );
1848       if (status != 0) return syntax_error(sock,cmd);
1849       last_up = (time_t) atol(tok);
1850       continue;
1851     }
1852     if( ! strncmp(tok,"-s",2) ) {
1853       status = buffer_get_field(&buffer, &buffer_size, &tok );
1854       if (status != 0) return syntax_error(sock,cmd);
1855       step = atol(tok);
1856       continue;
1857     }
1858     if( ! strncmp(tok,"-O",2) ) {
1859       no_overwrite = 1;
1860       continue;
1861     }
1862     if( ! strncmp(tok,"DS:",3) ) { av[ac++]=tok; continue; }
1863     if( ! strncmp(tok,"RRA:",4) ) { av[ac++]=tok; continue; }
1864     return syntax_error(sock,cmd);
1865   }
1866   if(step<1) {
1867     return send_response(sock, RESP_ERR, "The step size cannot be less than 1 second.\n");
1868   }
1869   if (last_up < 3600 * 24 * 365 * 10) {
1870     return send_response(sock, RESP_ERR, "The first entry must be after 1980.\n");
1871   }
1872
1873   rrd_clear_error ();
1874   status = rrd_create_r2(file,step,last_up,no_overwrite,ac,(const char **)av);
1875
1876   if(!status) {
1877     return send_response(sock, RESP_OK, "RRD created OK\n");
1878   }
1879   return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1880 } /* }}} static int handle_request_create  */
1881
1882 /* start "BATCH" processing */
1883 static int batch_start (HANDLER_PROTO) /* {{{ */
1884 {
1885   int status;
1886   if (sock->batch_start)
1887     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1888
1889   status = send_response(sock, RESP_OK,
1890                          "Go ahead.  End with dot '.' on its own line.\n");
1891   sock->batch_start = time(NULL);
1892   sock->batch_cmd = 0;
1893
1894   return status;
1895 } /* }}} static int batch_start */
1896
1897 /* finish "BATCH" processing and return results to the client */
1898 static int batch_done (HANDLER_PROTO) /* {{{ */
1899 {
1900   assert(sock->batch_start);
1901   sock->batch_start = 0;
1902   sock->batch_cmd  = 0;
1903   return send_response(sock, RESP_OK, "errors\n");
1904 } /* }}} static int batch_done */
1905
1906 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1907 {
1908   return -1;
1909 } /* }}} static int handle_request_quit */
1910
1911 static command_t list_of_commands[] = { /* {{{ */
1912   {
1913     "UPDATE",
1914     handle_request_update,
1915     CMD_CONTEXT_ANY,
1916     "UPDATE <filename> <values> [<values> ...]\n"
1917     ,
1918     "Adds the given file to the internal cache if it is not yet known and\n"
1919     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1920     "for details.\n"
1921     "\n"
1922     "Each <values> has the following form:\n"
1923     "  <values> = <time>:<value>[:<value>[...]]\n"
1924     "See the rrdupdate(1) manpage for details.\n"
1925   },
1926   {
1927     "WROTE",
1928     handle_request_wrote,
1929     CMD_CONTEXT_JOURNAL,
1930     NULL,
1931     NULL
1932   },
1933   {
1934     "FLUSH",
1935     handle_request_flush,
1936     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1937     "FLUSH <filename>\n"
1938     ,
1939     "Adds the given filename to the head of the update queue and returns\n"
1940     "after it has been dequeued.\n"
1941   },
1942   {
1943     "FLUSHALL",
1944     handle_request_flushall,
1945     CMD_CONTEXT_CLIENT,
1946     "FLUSHALL\n"
1947     ,
1948     "Triggers writing of all pending updates.  Returns immediately.\n"
1949   },
1950   {
1951     "PENDING",
1952     handle_request_pending,
1953     CMD_CONTEXT_CLIENT,
1954     "PENDING <filename>\n"
1955     ,
1956     "Shows any 'pending' updates for a file, in order.\n"
1957     "The updates shown have not yet been written to the underlying RRD file.\n"
1958   },
1959   {
1960     "FORGET",
1961     handle_request_forget,
1962     CMD_CONTEXT_ANY,
1963     "FORGET <filename>\n"
1964     ,
1965     "Removes the file completely from the cache.\n"
1966     "Any pending updates for the file will be lost.\n"
1967   },
1968   {
1969     "QUEUE",
1970     handle_request_queue,
1971     CMD_CONTEXT_CLIENT,
1972     "QUEUE\n"
1973     ,
1974         "Shows all files in the output queue.\n"
1975     "The output is zero or more lines in the following format:\n"
1976     "(where <num_vals> is the number of values to be written)\n"
1977     "\n"
1978     "<num_vals> <filename>\n"
1979   },
1980   {
1981     "STATS",
1982     handle_request_stats,
1983     CMD_CONTEXT_CLIENT,
1984     "STATS\n"
1985     ,
1986     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1987     "a description of the values.\n"
1988   },
1989   {
1990     "HELP",
1991     handle_request_help,
1992     CMD_CONTEXT_CLIENT,
1993     "HELP [<command>]\n",
1994     NULL, /* special! */
1995   },
1996   {
1997     "BATCH",
1998     batch_start,
1999     CMD_CONTEXT_CLIENT,
2000     "BATCH\n"
2001     ,
2002     "The 'BATCH' command permits the client to initiate a bulk load\n"
2003     "   of commands to rrdcached.\n"
2004     "\n"
2005     "Usage:\n"
2006     "\n"
2007     "    client: BATCH\n"
2008     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
2009     "    client: command #1\n"
2010     "    client: command #2\n"
2011     "    client: ... and so on\n"
2012     "    client: .\n"
2013     "    server: 2 errors\n"
2014     "    server: 7 message for command #7\n"
2015     "    server: 9 message for command #9\n"
2016     "\n"
2017     "For more information, consult the rrdcached(1) documentation.\n"
2018   },
2019   {
2020     ".",   /* BATCH terminator */
2021     batch_done,
2022     CMD_CONTEXT_BATCH,
2023     NULL,
2024     NULL
2025   },
2026   {
2027     "FETCH",
2028     handle_request_fetch,
2029     CMD_CONTEXT_CLIENT,
2030     "FETCH <file> <CF> [<start> [<end>]]\n"
2031     ,
2032     "The 'FETCH' can be used by the client to retrieve values from an RRD file.\n"
2033   },
2034   {
2035     "INFO",
2036     handle_request_info,
2037     CMD_CONTEXT_CLIENT,
2038     "INFO <filename>\n",
2039     "The INFO command retrieves information about a specified RRD file.\n"
2040     "This is returned in standard rrdinfo format, a sequence of lines\n"
2041     "with the format <keyname> = <value>\n"
2042     "Note that this is the data as of the last update of the RRD file itself,\n"
2043     "not the last time data was received via rrdcached, so there may be pending\n"
2044     "updates in the queue.  If this bothers you, then first run a FLUSH.\n"
2045   },
2046   {
2047     "FIRST",
2048     handle_request_first,
2049     CMD_CONTEXT_CLIENT,
2050     "FIRST <filename> <rra index>\n",
2051     "The FIRST command retrieves the first data time for a specified RRA in\n"
2052     "an RRD file.\n"
2053   },
2054   {
2055     "LAST",
2056     handle_request_last,
2057     CMD_CONTEXT_CLIENT,
2058     "LAST <filename>\n",
2059     "The LAST command retrieves the last update time for a specified RRD file.\n"
2060     "Note that this is the time of the last update of the RRD file itself, not\n"
2061     "the last time data was received via rrdcached, so there may be pending\n"
2062     "updates in the queue.  If this bothers you, then first run a FLUSH.\n"
2063   },
2064   {
2065     "CREATE",
2066     handle_request_create,
2067     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2068     "CREATE <filename> [-b start] [-s step] [-O] <DS definitions> <RRA definitions>\n",
2069     "The CREATE command will create an RRD file, overwriting any existing file\n"
2070     "unless the -O option is given or rrdcached was started with the -O option.\n"
2071     "The start parameter needs to be in seconds since 1/1/70 (AT-style syntax is\n"
2072     "not acceptable) and the step is in seconds (default is 300).\n"
2073     "The DS and RRA definitions are as for the 'rrdtool create' command.\n"
2074   },
2075   {
2076     "QUIT",
2077     handle_request_quit,
2078     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2079     "QUIT\n"
2080     ,
2081     "Disconnect from rrdcached.\n"
2082   }
2083 }; /* }}} command_t list_of_commands[] */
2084 static size_t list_of_commands_len = sizeof (list_of_commands)
2085   / sizeof (list_of_commands[0]);
2086
2087 static command_t *find_command(char *cmd)
2088 {
2089   size_t i;
2090
2091   for (i = 0; i < list_of_commands_len; i++)
2092     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2093       return (&list_of_commands[i]);
2094   return NULL;
2095 }
2096
2097 /* We currently use the index in the `list_of_commands' array as a bit position
2098  * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
2099  * outside these functions so that switching to a more elegant storage method
2100  * is easily possible. */
2101 static ssize_t find_command_index (const char *cmd) /* {{{ */
2102 {
2103   size_t i;
2104
2105   for (i = 0; i < list_of_commands_len; i++)
2106     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2107       return ((ssize_t) i);
2108   return (-1);
2109 } /* }}} ssize_t find_command_index */
2110
2111 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
2112     const char *cmd)
2113 {
2114   ssize_t i;
2115
2116   if (JOURNAL_REPLAY(sock))
2117     return (1);
2118
2119   if (cmd == NULL)
2120     return (-1);
2121
2122   if ((strcasecmp ("QUIT", cmd) == 0)
2123       || (strcasecmp ("HELP", cmd) == 0))
2124     return (1);
2125   else if (strcmp (".", cmd) == 0)
2126     cmd = "BATCH";
2127
2128   i = find_command_index (cmd);
2129   if (i < 0)
2130     return (-1);
2131   assert (i < 32);
2132
2133   if ((sock->permissions & (1 << i)) != 0)
2134     return (1);
2135   return (0);
2136 } /* }}} int socket_permission_check */
2137
2138 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
2139     const char *cmd)
2140 {
2141   ssize_t i;
2142
2143   i = find_command_index (cmd);
2144   if (i < 0)
2145     return (-1);
2146   assert (i < 32);
2147
2148   sock->permissions |= (1 << i);
2149   return (0);
2150 } /* }}} int socket_permission_add */
2151
2152 static void socket_permission_clear (listen_socket_t *sock) /* {{{ */
2153 {
2154   sock->permissions = 0;
2155 } /* }}} socket_permission_clear */
2156
2157 static void socket_permission_copy (listen_socket_t *dest, /* {{{ */
2158     listen_socket_t *src)
2159 {
2160   dest->permissions = src->permissions;
2161 } /* }}} socket_permission_copy */
2162
2163 static void socket_permission_set_all (listen_socket_t *sock) /* {{{ */
2164 {
2165   size_t i;
2166
2167   sock->permissions = 0;
2168   for (i = 0; i < list_of_commands_len; i++)
2169     sock->permissions |= (1 << i);
2170 } /* }}} void socket_permission_set_all */
2171
2172 /* check whether commands are received in the expected context */
2173 static int command_check_context(listen_socket_t *sock, command_t *cmd)
2174 {
2175   if (JOURNAL_REPLAY(sock))
2176     return (cmd->context & CMD_CONTEXT_JOURNAL);
2177   else if (sock->batch_start)
2178     return (cmd->context & CMD_CONTEXT_BATCH);
2179   else
2180     return (cmd->context & CMD_CONTEXT_CLIENT);
2181
2182   /* NOTREACHED */
2183   assert(1==0);
2184 }
2185
2186 static int handle_request_help (HANDLER_PROTO) /* {{{ */
2187 {
2188   int status;
2189   char *cmd_str;
2190   char *resp_txt;
2191   command_t *help = NULL;
2192
2193   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
2194   if (status == 0)
2195     help = find_command(cmd_str);
2196
2197   if (help && (help->syntax || help->help))
2198   {
2199     char tmp[RRD_CMD_MAX];
2200
2201     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
2202     resp_txt = tmp;
2203
2204     if (help->syntax)
2205       add_response_info(sock, "Usage: %s\n", help->syntax);
2206
2207     if (help->help)
2208       add_response_info(sock, "%s\n", help->help);
2209   }
2210   else
2211   {
2212     size_t i;
2213
2214     resp_txt = "Command overview\n";
2215
2216     for (i = 0; i < list_of_commands_len; i++)
2217     {
2218       if (list_of_commands[i].syntax == NULL)
2219         continue;
2220       add_response_info (sock, "%s", list_of_commands[i].syntax);
2221     }
2222   }
2223
2224   return send_response(sock, RESP_OK, resp_txt);
2225 } /* }}} int handle_request_help */
2226
2227 static int handle_request (DISPATCH_PROTO) /* {{{ */
2228 {
2229   char *buffer_ptr = buffer;
2230   char *cmd_str = NULL;
2231   command_t *cmd = NULL;
2232   int status;
2233
2234   assert (buffer[buffer_size - 1] == '\0');
2235
2236   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
2237   if (status != 0)
2238   {
2239     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
2240     return (-1);
2241   }
2242
2243   if (sock != NULL && sock->batch_start)
2244     sock->batch_cmd++;
2245
2246   cmd = find_command(cmd_str);
2247   if (!cmd)
2248     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
2249
2250   if (!socket_permission_check (sock, cmd->cmd))
2251     return send_response(sock, RESP_ERR, "Permission denied.\n");
2252
2253   if (!command_check_context(sock, cmd))
2254     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
2255
2256   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
2257 } /* }}} int handle_request */
2258
2259 static void journal_set_free (journal_set *js) /* {{{ */
2260 {
2261   if (js == NULL)
2262     return;
2263
2264   rrd_free_ptrs((void ***) &js->files, &js->files_num);
2265
2266   free(js);
2267 } /* }}} journal_set_free */
2268
2269 static void journal_set_remove (journal_set *js) /* {{{ */
2270 {
2271   if (js == NULL)
2272     return;
2273
2274   for (uint i=0; i < js->files_num; i++)
2275   {
2276     RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
2277     unlink(js->files[i]);
2278   }
2279 } /* }}} journal_set_remove */
2280
2281 /* close current journal file handle.
2282  * MUST hold journal_lock before calling */
2283 static void journal_close(void) /* {{{ */
2284 {
2285   if (journal_fh != NULL)
2286   {
2287     if (fclose(journal_fh) != 0)
2288       RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
2289   }
2290
2291   journal_fh = NULL;
2292   journal_size = 0;
2293 } /* }}} journal_close */
2294
2295 /* MUST hold journal_lock before calling */
2296 static void journal_new_file(void) /* {{{ */
2297 {
2298   struct timeval now;
2299   int  new_fd;
2300   char new_file[PATH_MAX + 1];
2301
2302   assert(journal_dir != NULL);
2303   assert(journal_cur != NULL);
2304
2305   journal_close();
2306
2307   gettimeofday(&now, NULL);
2308   /* this format assures that the files sort in strcmp() order */
2309   snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
2310            journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
2311
2312   new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
2313                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
2314   if (new_fd < 0)
2315     goto error;
2316
2317   journal_fh = fdopen(new_fd, "a");
2318   if (journal_fh == NULL)
2319     goto error;
2320
2321   journal_size = ftell(journal_fh);
2322   RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
2323
2324   /* record the file in the journal set */
2325   rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
2326
2327   return;
2328
2329 error:
2330   RRDD_LOG(LOG_CRIT,
2331            "JOURNALING DISABLED: Error while trying to create %s : %s",
2332            new_file, rrd_strerror(errno));
2333   RRDD_LOG(LOG_CRIT,
2334            "JOURNALING DISABLED: All values will be flushed at shutdown");
2335
2336   close(new_fd);
2337   config_flush_at_shutdown = 1;
2338
2339 } /* }}} journal_new_file */
2340
2341 /* MUST NOT hold journal_lock before calling this */
2342 static void journal_rotate(void) /* {{{ */
2343 {
2344   journal_set *old_js = NULL;
2345
2346   if (journal_dir == NULL)
2347     return;
2348
2349   RRDD_LOG(LOG_DEBUG, "rotating journals");
2350
2351   pthread_mutex_lock(&stats_lock);
2352   ++stats_journal_rotate;
2353   pthread_mutex_unlock(&stats_lock);
2354
2355   pthread_mutex_lock(&journal_lock);
2356
2357   journal_close();
2358
2359   /* rotate the journal sets */
2360   old_js = journal_old;
2361   journal_old = journal_cur;
2362   journal_cur = calloc(1, sizeof(journal_set));
2363
2364   if (journal_cur != NULL)
2365     journal_new_file();
2366   else
2367     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2368
2369   pthread_mutex_unlock(&journal_lock);
2370
2371   journal_set_remove(old_js);
2372   journal_set_free  (old_js);
2373
2374 } /* }}} static void journal_rotate */
2375
2376 /* MUST hold journal_lock when calling */
2377 static void journal_done(void) /* {{{ */
2378 {
2379   if (journal_cur == NULL)
2380     return;
2381
2382   journal_close();
2383
2384   if (config_flush_at_shutdown)
2385   {
2386     RRDD_LOG(LOG_INFO, "removing journals");
2387     journal_set_remove(journal_old);
2388     journal_set_remove(journal_cur);
2389   }
2390   else
2391   {
2392     RRDD_LOG(LOG_INFO, "expedited shutdown; "
2393              "journals will be used at next startup");
2394   }
2395
2396   journal_set_free(journal_cur);
2397   journal_set_free(journal_old);
2398   free(journal_dir);
2399
2400 } /* }}} static void journal_done */
2401
2402 static int journal_write(char *cmd, char *args) /* {{{ */
2403 {
2404   int chars;
2405
2406   if (journal_fh == NULL)
2407     return 0;
2408
2409   pthread_mutex_lock(&journal_lock);
2410   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
2411   journal_size += chars;
2412
2413   if (journal_size > JOURNAL_MAX)
2414     journal_new_file();
2415
2416   pthread_mutex_unlock(&journal_lock);
2417
2418   if (chars > 0)
2419   {
2420     pthread_mutex_lock(&stats_lock);
2421     stats_journal_bytes += chars;
2422     pthread_mutex_unlock(&stats_lock);
2423   }
2424
2425   return chars;
2426 } /* }}} static int journal_write */
2427
2428 static int journal_replay (const char *file) /* {{{ */
2429 {
2430   FILE *fh;
2431   int entry_cnt = 0;
2432   int fail_cnt = 0;
2433   uint64_t line = 0;
2434   char entry[RRD_CMD_MAX];
2435   time_t now;
2436
2437   if (file == NULL) return 0;
2438
2439   {
2440     char *reason = "unknown error";
2441     int status = 0;
2442     struct stat statbuf;
2443
2444     memset(&statbuf, 0, sizeof(statbuf));
2445     if (stat(file, &statbuf) != 0)
2446     {
2447       reason = "stat error";
2448       status = errno;
2449     }
2450     else if (!S_ISREG(statbuf.st_mode))
2451     {
2452       reason = "not a regular file";
2453       status = EPERM;
2454     }
2455     if (statbuf.st_uid != daemon_uid)
2456     {
2457       reason = "not owned by daemon user";
2458       status = EACCES;
2459     }
2460     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2461     {
2462       reason = "must not be user/group writable";
2463       status = EACCES;
2464     }
2465
2466     if (status != 0)
2467     {
2468       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2469                file, rrd_strerror(status), reason);
2470       return 0;
2471     }
2472   }
2473
2474   fh = fopen(file, "r");
2475   if (fh == NULL)
2476   {
2477     if (errno != ENOENT)
2478       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2479                file, rrd_strerror(errno));
2480     return 0;
2481   }
2482   else
2483     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2484
2485   now = time(NULL);
2486
2487   while(!feof(fh))
2488   {
2489     size_t entry_len;
2490
2491     ++line;
2492     if (fgets(entry, sizeof(entry), fh) == NULL)
2493       break;
2494     entry_len = strlen(entry);
2495
2496     /* check \n termination in case journal writing crashed mid-line */
2497     if (entry_len == 0)
2498       continue;
2499     else if (entry[entry_len - 1] != '\n')
2500     {
2501       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2502       ++fail_cnt;
2503       continue;
2504     }
2505
2506     entry[entry_len - 1] = '\0';
2507
2508     if (handle_request(NULL, now, entry, entry_len) == 0)
2509       ++entry_cnt;
2510     else
2511       ++fail_cnt;
2512   }
2513
2514   fclose(fh);
2515
2516   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2517            entry_cnt, fail_cnt);
2518
2519   return entry_cnt > 0 ? 1 : 0;
2520 } /* }}} static int journal_replay */
2521
2522 static int journal_sort(const void *v1, const void *v2)
2523 {
2524   char **jn1 = (char **) v1;
2525   char **jn2 = (char **) v2;
2526
2527   return strcmp(*jn1,*jn2);
2528 }
2529
2530 static void journal_init(void) /* {{{ */
2531 {
2532   int had_journal = 0;
2533   DIR *dir;
2534   struct dirent *dent;
2535   char path[PATH_MAX+1];
2536
2537   if (journal_dir == NULL) return;
2538
2539   pthread_mutex_lock(&journal_lock);
2540
2541   journal_cur = calloc(1, sizeof(journal_set));
2542   if (journal_cur == NULL)
2543   {
2544     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2545     return;
2546   }
2547
2548   RRDD_LOG(LOG_INFO, "checking for journal files");
2549
2550   /* Handle old journal files during transition.  This gives them the
2551    * correct sort order.  TODO: remove after first release
2552    */
2553   {
2554     char old_path[PATH_MAX+1];
2555     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2556     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2557     rename(old_path, path);
2558
2559     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE        );
2560     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2561     rename(old_path, path);
2562   }
2563
2564   dir = opendir(journal_dir);
2565   if (!dir) {
2566     RRDD_LOG(LOG_CRIT, "journal_init: opendir(%s) failed\n", journal_dir);
2567     return;
2568   }
2569   while ((dent = readdir(dir)) != NULL)
2570   {
2571     /* looks like a journal file? */
2572     if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2573       continue;
2574
2575     snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2576
2577     if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2578     {
2579       RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2580                dent->d_name);
2581       break;
2582     }
2583   }
2584   closedir(dir);
2585
2586   qsort(journal_cur->files, journal_cur->files_num,
2587         sizeof(journal_cur->files[0]), journal_sort);
2588
2589   for (uint i=0; i < journal_cur->files_num; i++)
2590     had_journal += journal_replay(journal_cur->files[i]);
2591
2592   journal_new_file();
2593
2594   /* it must have been a crash.  start a flush */
2595   if (had_journal && config_flush_at_shutdown)
2596     flush_old_values(-1);
2597
2598   pthread_mutex_unlock(&journal_lock);
2599
2600   RRDD_LOG(LOG_INFO, "journal processing complete");
2601
2602 } /* }}} static void journal_init */
2603
2604 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2605 {
2606   assert(sock != NULL);
2607
2608   free(sock->rbuf);  sock->rbuf = NULL;
2609   free(sock->wbuf);  sock->wbuf = NULL;
2610   free(sock);
2611 } /* }}} void free_listen_socket */
2612
2613 static void close_connection(listen_socket_t *sock) /* {{{ */
2614 {
2615   if (sock->fd >= 0)
2616   {
2617     close(sock->fd);
2618     sock->fd = -1;
2619   }
2620
2621   free_listen_socket(sock);
2622
2623 } /* }}} void close_connection */
2624
2625 static void *connection_thread_main (void *args) /* {{{ */
2626 {
2627   listen_socket_t *sock;
2628   int fd;
2629
2630   sock = (listen_socket_t *) args;
2631   fd = sock->fd;
2632
2633   /* init read buffers */
2634   sock->next_read = sock->next_cmd = 0;
2635   sock->rbuf = malloc(RBUF_SIZE);
2636   if (sock->rbuf == NULL)
2637   {
2638     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2639     close_connection(sock);
2640     return NULL;
2641   }
2642
2643   pthread_mutex_lock (&connection_threads_lock);
2644 #ifdef HAVE_LIBWRAP
2645   /* LIBWRAP does not support multiple threads! By putting this code
2646      inside pthread_mutex_lock we do not have to worry about request_info
2647      getting overwritten by another thread.
2648   */
2649   struct request_info req;
2650   request_init(&req, RQ_DAEMON, "rrdcached\0", RQ_FILE, fd, NULL );
2651   fromhost(&req);
2652   if(!hosts_access(&req)) {
2653     RRDD_LOG(LOG_INFO, "refused connection from %s", eval_client(&req));
2654     pthread_mutex_unlock (&connection_threads_lock);
2655     close_connection(sock);
2656     return NULL;
2657   }
2658 #endif /* HAVE_LIBWRAP */
2659   connection_threads_num++;
2660   pthread_mutex_unlock (&connection_threads_lock);
2661
2662   while (state == RUNNING)
2663   {
2664     char *cmd;
2665     ssize_t cmd_len;
2666     ssize_t rbytes;
2667     time_t now;
2668
2669     struct pollfd pollfd;
2670     int status;
2671
2672     pollfd.fd = fd;
2673     pollfd.events = POLLIN | POLLPRI;
2674     pollfd.revents = 0;
2675
2676     status = poll (&pollfd, 1, /* timeout = */ 500);
2677     if (state != RUNNING)
2678       break;
2679     else if (status == 0) /* timeout */
2680       continue;
2681     else if (status < 0) /* error */
2682     {
2683       status = errno;
2684       if (status != EINTR)
2685         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2686       continue;
2687     }
2688
2689     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2690       break;
2691     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2692     {
2693       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2694           "poll(2) returned something unexpected: %#04hx",
2695           pollfd.revents);
2696       break;
2697     }
2698
2699     rbytes = read(fd, sock->rbuf + sock->next_read,
2700                   RBUF_SIZE - sock->next_read);
2701     if (rbytes < 0)
2702     {
2703       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2704       break;
2705     }
2706     else if (rbytes == 0)
2707       break; /* eof */
2708
2709     sock->next_read += rbytes;
2710
2711     if (sock->batch_start)
2712       now = sock->batch_start;
2713     else
2714       now = time(NULL);
2715
2716     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2717     {
2718       status = handle_request (sock, now, cmd, cmd_len+1);
2719       if (status != 0)
2720         goto out_close;
2721     }
2722   }
2723
2724 out_close:
2725   close_connection(sock);
2726
2727   /* Remove this thread from the connection threads list */
2728   pthread_mutex_lock (&connection_threads_lock);
2729   connection_threads_num--;
2730   if (connection_threads_num <= 0)
2731     pthread_cond_broadcast(&connection_threads_done);
2732   pthread_mutex_unlock (&connection_threads_lock);
2733
2734   return (NULL);
2735 } /* }}} void *connection_thread_main */
2736
2737 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2738 {
2739   int fd;
2740   struct sockaddr_un sa;
2741   listen_socket_t *temp;
2742   int status;
2743   const char *path;
2744   char *path_copy, *dir;
2745
2746   path = sock->addr;
2747   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2748     path += strlen("unix:");
2749
2750   /* dirname may modify its argument */
2751   path_copy = strdup(path);
2752   if (path_copy == NULL)
2753   {
2754     fprintf(stderr, "rrdcached: strdup(): %s\n",
2755         rrd_strerror(errno));
2756     return (-1);
2757   }
2758
2759   dir = dirname(path_copy);
2760   if (rrd_mkdir_p(dir, 0777) != 0)
2761   {
2762     fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2763         dir, rrd_strerror(errno));
2764     return (-1);
2765   }
2766
2767   free(path_copy);
2768
2769   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2770       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2771   if (temp == NULL)
2772   {
2773     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2774     return (-1);
2775   }
2776   listen_fds = temp;
2777   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2778
2779   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2780   if (fd < 0)
2781   {
2782     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2783              rrd_strerror(errno));
2784     return (-1);
2785   }
2786
2787   memset (&sa, 0, sizeof (sa));
2788   sa.sun_family = AF_UNIX;
2789   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2790
2791   /* if we've gotten this far, we own the pid file.  any daemon started
2792    * with the same args must not be alive.  therefore, ensure that we can
2793    * create the socket...
2794    */
2795   unlink(path);
2796
2797   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2798   if (status != 0)
2799   {
2800     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2801              path, rrd_strerror(errno));
2802     close (fd);
2803     return (-1);
2804   }
2805
2806   /* tweak the sockets group ownership */
2807   if (sock->socket_group != (gid_t)-1)
2808   {
2809     if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2810          (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2811     {
2812       fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2813     }
2814   }
2815
2816   if (sock->socket_permissions != (mode_t)-1)
2817   {
2818     if (chmod(path, sock->socket_permissions) != 0)
2819       fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2820           (unsigned int)sock->socket_permissions, strerror(errno));
2821   }
2822
2823   status = listen (fd, /* backlog = */ 10);
2824   if (status != 0)
2825   {
2826     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2827              path, rrd_strerror(errno));
2828     close (fd);
2829     unlink (path);
2830     return (-1);
2831   }
2832
2833   listen_fds[listen_fds_num].fd = fd;
2834   listen_fds[listen_fds_num].family = PF_UNIX;
2835   strncpy(listen_fds[listen_fds_num].addr, path,
2836           sizeof (listen_fds[listen_fds_num].addr) - 1);
2837   listen_fds_num++;
2838
2839   return (0);
2840 } /* }}} int open_listen_socket_unix */
2841
2842 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2843 {
2844   struct addrinfo ai_hints;
2845   struct addrinfo *ai_res;
2846   struct addrinfo *ai_ptr;
2847   char addr_copy[NI_MAXHOST];
2848   char *addr;
2849   char *port;
2850   int status;
2851
2852   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2853   addr_copy[sizeof (addr_copy) - 1] = 0;
2854   addr = addr_copy;
2855
2856   memset (&ai_hints, 0, sizeof (ai_hints));
2857   ai_hints.ai_flags = 0;
2858 #ifdef AI_ADDRCONFIG
2859   ai_hints.ai_flags |= AI_ADDRCONFIG;
2860 #endif
2861   ai_hints.ai_family = AF_UNSPEC;
2862   ai_hints.ai_socktype = SOCK_STREAM;
2863
2864   port = NULL;
2865   if (*addr == '[') /* IPv6+port format */
2866   {
2867     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2868     addr++;
2869
2870     port = strchr (addr, ']');
2871     if (port == NULL)
2872     {
2873       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2874       return (-1);
2875     }
2876     *port = 0;
2877     port++;
2878
2879     if (*port == ':')
2880       port++;
2881     else if (*port == 0)
2882       port = NULL;
2883     else
2884     {
2885       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2886       return (-1);
2887     }
2888   } /* if (*addr == '[') */
2889   else
2890   {
2891     port = rindex(addr, ':');
2892     if (port != NULL)
2893     {
2894       *port = 0;
2895       port++;
2896     }
2897   }
2898   ai_res = NULL;
2899   status = getaddrinfo (addr,
2900                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2901                         &ai_hints, &ai_res);
2902   if (status != 0)
2903   {
2904     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2905              addr, gai_strerror (status));
2906     return (-1);
2907   }
2908
2909   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2910   {
2911     int fd;
2912     listen_socket_t *temp;
2913     int one = 1;
2914
2915     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2916         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2917     if (temp == NULL)
2918     {
2919       fprintf (stderr,
2920                "rrdcached: open_listen_socket_network: realloc failed.\n");
2921       continue;
2922     }
2923     listen_fds = temp;
2924     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2925
2926     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2927     if (fd < 0)
2928     {
2929       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2930                rrd_strerror(errno));
2931       continue;
2932     }
2933
2934     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2935
2936     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2937     if (status != 0)
2938     {
2939       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2940                sock->addr, rrd_strerror(errno));
2941       close (fd);
2942       continue;
2943     }
2944
2945     status = listen (fd, /* backlog = */ 10);
2946     if (status != 0)
2947     {
2948       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2949                sock->addr, rrd_strerror(errno));
2950       close (fd);
2951       freeaddrinfo(ai_res);
2952       return (-1);
2953     }
2954
2955     listen_fds[listen_fds_num].fd = fd;
2956     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2957     listen_fds_num++;
2958   } /* for (ai_ptr) */
2959
2960   freeaddrinfo(ai_res);
2961   return (0);
2962 } /* }}} static int open_listen_socket_network */
2963
2964 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2965 {
2966   assert(sock != NULL);
2967   assert(sock->addr != NULL);
2968
2969   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2970       || sock->addr[0] == '/')
2971     return (open_listen_socket_unix(sock));
2972   else
2973     return (open_listen_socket_network(sock));
2974 } /* }}} int open_listen_socket */
2975
2976 static int close_listen_sockets (void) /* {{{ */
2977 {
2978   size_t i;
2979
2980   for (i = 0; i < listen_fds_num; i++)
2981   {
2982     close (listen_fds[i].fd);
2983
2984     if (listen_fds[i].family == PF_UNIX)
2985       unlink(listen_fds[i].addr);
2986   }
2987
2988   free (listen_fds);
2989   listen_fds = NULL;
2990   listen_fds_num = 0;
2991
2992   return (0);
2993 } /* }}} int close_listen_sockets */
2994
2995 static void *listen_thread_main (void UNUSED(*args)) /* {{{ */
2996 {
2997   struct pollfd *pollfds;
2998   int pollfds_num;
2999   int status;
3000   int i;
3001
3002   if (listen_fds_num < 1)
3003   {
3004     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
3005     return (NULL);
3006   }
3007
3008   pollfds_num = listen_fds_num;
3009   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
3010   if (pollfds == NULL)
3011   {
3012     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
3013     return (NULL);
3014   }
3015   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
3016
3017   RRDD_LOG(LOG_INFO, "listening for connections");
3018
3019   while (state == RUNNING)
3020   {
3021     for (i = 0; i < pollfds_num; i++)
3022     {
3023       pollfds[i].fd = listen_fds[i].fd;
3024       pollfds[i].events = POLLIN | POLLPRI;
3025       pollfds[i].revents = 0;
3026     }
3027
3028     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
3029     if (state != RUNNING)
3030       break;
3031     else if (status == 0) /* timeout */
3032       continue;
3033     else if (status < 0) /* error */
3034     {
3035       status = errno;
3036       if (status != EINTR)
3037       {
3038         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
3039       }
3040       continue;
3041     }
3042
3043     for (i = 0; i < pollfds_num; i++)
3044     {
3045       listen_socket_t *client_sock;
3046       struct sockaddr_storage client_sa;
3047       socklen_t client_sa_size;
3048       pthread_t tid;
3049       pthread_attr_t attr;
3050
3051       if (pollfds[i].revents == 0)
3052         continue;
3053
3054       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
3055       {
3056         RRDD_LOG (LOG_ERR, "listen_thread_main: "
3057             "poll(2) returned something unexpected for listen FD #%i.",
3058             pollfds[i].fd);
3059         continue;
3060       }
3061
3062       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
3063       if (client_sock == NULL)
3064       {
3065         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
3066         continue;
3067       }
3068       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
3069
3070       client_sa_size = sizeof (client_sa);
3071       client_sock->fd = accept (pollfds[i].fd,
3072           (struct sockaddr *) &client_sa, &client_sa_size);
3073       if (client_sock->fd < 0)
3074       {
3075         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
3076         free(client_sock);
3077         continue;
3078       }
3079
3080       pthread_attr_init (&attr);
3081       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
3082
3083       status = pthread_create (&tid, &attr, connection_thread_main,
3084                                client_sock);
3085       if (status != 0)
3086       {
3087         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
3088         close_connection(client_sock);
3089         continue;
3090       }
3091     } /* for (pollfds_num) */
3092   } /* while (state == RUNNING) */
3093
3094   RRDD_LOG(LOG_INFO, "starting shutdown");
3095
3096   close_listen_sockets ();
3097
3098   pthread_mutex_lock (&connection_threads_lock);
3099   while (connection_threads_num > 0)
3100     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
3101   pthread_mutex_unlock (&connection_threads_lock);
3102
3103   free(pollfds);
3104
3105   return (NULL);
3106 } /* }}} void *listen_thread_main */
3107
3108 static int daemonize (void) /* {{{ */
3109 {
3110   int pid_fd;
3111   char *base_dir;
3112
3113   daemon_uid = geteuid();
3114
3115   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
3116   if (pid_fd < 0)
3117     pid_fd = check_pidfile();
3118   if (pid_fd < 0)
3119     return pid_fd;
3120
3121   /* open all the listen sockets */
3122   if (config_listen_address_list_len > 0)
3123   {
3124     for (size_t i = 0; i < config_listen_address_list_len; i++)
3125       open_listen_socket (config_listen_address_list[i]);
3126
3127     rrd_free_ptrs((void ***) &config_listen_address_list,
3128                   &config_listen_address_list_len);
3129   }
3130   else
3131   {
3132     strncpy(default_socket.addr, RRDCACHED_DEFAULT_ADDRESS,
3133         sizeof(default_socket.addr) - 1);
3134     default_socket.addr[sizeof(default_socket.addr) - 1] = '\0';
3135
3136     if (default_socket.permissions == 0)
3137       socket_permission_set_all (&default_socket);
3138
3139     open_listen_socket (&default_socket);
3140   }
3141
3142   if (listen_fds_num < 1)
3143   {
3144     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
3145     goto error;
3146   }
3147
3148   if (!stay_foreground)
3149   {
3150     pid_t child;
3151
3152     child = fork ();
3153     if (child < 0)
3154     {
3155       fprintf (stderr, "daemonize: fork(2) failed.\n");
3156       goto error;
3157     }
3158     else if (child > 0)
3159       exit(0);
3160
3161     /* Become session leader */
3162     setsid ();
3163
3164     /* Open the first three file descriptors to /dev/null */
3165     close (2);
3166     close (1);
3167     close (0);
3168
3169     open ("/dev/null", O_RDWR);
3170     if (dup(0) == -1 || dup(0) == -1){
3171         RRDD_LOG (LOG_ERR, "faild to run dup.\n");
3172     }
3173   } /* if (!stay_foreground) */
3174
3175   /* Change into the /tmp directory. */
3176   base_dir = (config_base_dir != NULL)
3177     ? config_base_dir
3178     : "/tmp";
3179
3180   if (chdir (base_dir) != 0)
3181   {
3182     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
3183     goto error;
3184   }
3185
3186   install_signal_handlers();
3187
3188   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
3189   RRDD_LOG(LOG_INFO, "starting up");
3190
3191   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
3192                                 (GDestroyNotify) free_cache_item);
3193   if (cache_tree == NULL)
3194   {
3195     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
3196     goto error;
3197   }
3198
3199   return write_pidfile (pid_fd);
3200
3201 error:
3202   remove_pidfile();
3203   return -1;
3204 } /* }}} int daemonize */
3205
3206 static int cleanup (void) /* {{{ */
3207 {
3208   pthread_cond_broadcast (&flush_cond);
3209   pthread_join (flush_thread, NULL);
3210
3211   pthread_cond_broadcast (&queue_cond);
3212   for (int i = 0; i < config_queue_threads; i++)
3213     pthread_join (queue_threads[i], NULL);
3214
3215   if (config_flush_at_shutdown)
3216   {
3217     assert(cache_queue_head == NULL);
3218     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
3219   }
3220
3221   free(queue_threads);
3222   free(config_base_dir);
3223
3224   pthread_mutex_lock(&cache_lock);
3225   g_tree_destroy(cache_tree);
3226
3227   pthread_mutex_lock(&journal_lock);
3228   journal_done();
3229
3230   RRDD_LOG(LOG_INFO, "goodbye");
3231   closelog ();
3232
3233   remove_pidfile ();
3234   free(config_pid_file);
3235
3236   return (0);
3237 } /* }}} int cleanup */
3238
3239 static int read_options (int argc, char **argv) /* {{{ */
3240 {
3241   int option;
3242   int status = 0;
3243
3244   socket_permission_clear (&default_socket);
3245
3246   default_socket.socket_group = (gid_t)-1;
3247   default_socket.socket_permissions = (mode_t)-1;
3248
3249   while ((option = getopt(argc, argv, "Ogl:s:m:P:f:w:z:t:Bb:p:Fj:a:h?")) != -1)
3250   {
3251     switch (option)
3252     {
3253       case 'O':
3254         opt_no_overwrite = 1;
3255         break;
3256
3257       case 'g':
3258         stay_foreground=1;
3259         break;
3260
3261       case 'l':
3262       {
3263         listen_socket_t *new;
3264
3265         new = malloc(sizeof(listen_socket_t));
3266         if (new == NULL)
3267         {
3268           fprintf(stderr, "read_options: malloc failed.\n");
3269           return(2);
3270         }
3271         memset(new, 0, sizeof(listen_socket_t));
3272
3273         strncpy(new->addr, optarg, sizeof(new->addr)-1);
3274
3275         /* Add permissions to the socket {{{ */
3276         if (default_socket.permissions != 0)
3277         {
3278           socket_permission_copy (new, &default_socket);
3279         }
3280         else /* if (default_socket.permissions == 0) */
3281         {
3282           /* Add permission for ALL commands to the socket. */
3283           socket_permission_set_all (new);
3284         }
3285         /* }}} Done adding permissions. */
3286
3287         new->socket_group = default_socket.socket_group;
3288         new->socket_permissions = default_socket.socket_permissions;
3289
3290         if (!rrd_add_ptr((void ***)&config_listen_address_list,
3291                          &config_listen_address_list_len, new))
3292         {
3293           fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
3294           return (2);
3295         }
3296       }
3297       break;
3298
3299       /* set socket group permissions */
3300       case 's':
3301       {
3302         gid_t group_gid;
3303         struct group *grp;
3304
3305         group_gid = strtoul(optarg, NULL, 10);
3306         if (errno != EINVAL && group_gid>0)
3307         {
3308           /* we were passed a number */
3309           grp = getgrgid(group_gid);
3310         }
3311         else
3312         {
3313           grp = getgrnam(optarg);
3314         }
3315
3316         if (grp)
3317         {
3318           default_socket.socket_group = grp->gr_gid;
3319         }
3320         else
3321         {
3322           /* no idea what the user wanted... */
3323           fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
3324           return (5);
3325         }
3326       }
3327       break;
3328
3329       /* set socket file permissions */
3330       case 'm':
3331       {
3332         long  tmp;
3333         char *endptr = NULL;
3334
3335         tmp = strtol (optarg, &endptr, 8);
3336         if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
3337             || (tmp > 07777) || (tmp < 0)) {
3338           fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
3339               optarg);
3340           return (5);
3341         }
3342
3343         default_socket.socket_permissions = (mode_t)tmp;
3344       }
3345       break;
3346
3347       case 'P':
3348       {
3349         char *optcopy;
3350         char *saveptr;
3351         char *dummy;
3352         char *ptr;
3353
3354         socket_permission_clear (&default_socket);
3355
3356         optcopy = strdup (optarg);
3357         dummy = optcopy;
3358         saveptr = NULL;
3359         while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
3360         {
3361           dummy = NULL;
3362           status = socket_permission_add (&default_socket, ptr);
3363           if (status != 0)
3364           {
3365             fprintf (stderr, "read_options: Adding permission \"%s\" to "
3366                 "socket failed. Most likely, this permission doesn't "
3367                 "exist. Check your command line.\n", ptr);
3368             status = 4;
3369           }
3370         }
3371
3372         free (optcopy);
3373       }
3374       break;
3375
3376       case 'f':
3377       {
3378         int temp;
3379
3380         temp = atoi (optarg);
3381         if (temp > 0)
3382           config_flush_interval = temp;
3383         else
3384         {
3385           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
3386           status = 3;
3387         }
3388       }
3389       break;
3390
3391       case 'w':
3392       {
3393         int temp;
3394
3395         temp = atoi (optarg);
3396         if (temp > 0)
3397           config_write_interval = temp;
3398         else
3399         {
3400           fprintf (stderr, "Invalid write interval: %s\n", optarg);
3401           status = 2;
3402         }
3403       }
3404       break;
3405
3406       case 'z':
3407       {
3408         int temp;
3409
3410         temp = atoi(optarg);
3411         if (temp > 0)
3412           config_write_jitter = temp;
3413         else
3414         {
3415           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
3416           status = 2;
3417         }
3418
3419         break;
3420       }
3421
3422       case 't':
3423       {
3424         int threads;
3425         threads = atoi(optarg);
3426         if (threads >= 1)
3427           config_queue_threads = threads;
3428         else
3429         {
3430           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
3431           return 1;
3432         }
3433       }
3434       break;
3435
3436       case 'B':
3437         config_write_base_only = 1;
3438         break;
3439
3440       case 'b':
3441       {
3442         size_t len;
3443         char base_realpath[PATH_MAX];
3444
3445         if (config_base_dir != NULL)
3446           free (config_base_dir);
3447         config_base_dir = strdup (optarg);
3448         if (config_base_dir == NULL)
3449         {
3450           fprintf (stderr, "read_options: strdup failed.\n");
3451           return (3);
3452         }
3453
3454         if (rrd_mkdir_p (config_base_dir, 0777) != 0)
3455         {
3456           fprintf (stderr, "Failed to create base directory '%s': %s\n",
3457               config_base_dir, rrd_strerror (errno));
3458           return (3);
3459         }
3460
3461         /* make sure that the base directory is not resolved via
3462          * symbolic links.  this makes some performance-enhancing
3463          * assumptions possible (we don't have to resolve paths
3464          * that start with a "/")
3465          */
3466         if (realpath(config_base_dir, base_realpath) == NULL)
3467         {
3468           fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3469               "%s\n", config_base_dir, rrd_strerror(errno));
3470           return 5;
3471         }
3472
3473         len = strlen (config_base_dir);
3474         while ((len > 0) && (config_base_dir[len - 1] == '/'))
3475         {
3476           config_base_dir[len - 1] = 0;
3477           len--;
3478         }
3479
3480         if (len < 1)
3481         {
3482           fprintf (stderr, "Invalid base directory: %s\n", optarg);
3483           return (4);
3484         }
3485
3486         _config_base_dir_len = len;
3487
3488         len = strlen (base_realpath);
3489         while ((len > 0) && (base_realpath[len - 1] == '/'))
3490         {
3491           base_realpath[len - 1] = '\0';
3492           len--;
3493         }
3494
3495         if (strncmp(config_base_dir,
3496                          base_realpath, sizeof(base_realpath)) != 0)
3497         {
3498           fprintf(stderr,
3499                   "Base directory (-b) resolved via file system links!\n"
3500                   "Please consult rrdcached '-b' documentation!\n"
3501                   "Consider specifying the real directory (%s)\n",
3502                   base_realpath);
3503           return 5;
3504         }
3505       }
3506       break;
3507
3508       case 'p':
3509       {
3510         if (config_pid_file != NULL)
3511           free (config_pid_file);
3512         config_pid_file = strdup (optarg);
3513         if (config_pid_file == NULL)
3514         {
3515           fprintf (stderr, "read_options: strdup failed.\n");
3516           return (3);
3517         }
3518       }
3519       break;
3520
3521       case 'F':
3522         config_flush_at_shutdown = 1;
3523         break;
3524
3525       case 'j':
3526       {
3527         char journal_dir_actual[PATH_MAX];
3528         journal_dir = realpath((const char *)optarg, journal_dir_actual);
3529         if (journal_dir)
3530         {
3531           // if we were able to properly resolve the path, lets have a copy
3532           // for use outside this block.
3533           journal_dir = strdup(journal_dir);           
3534           status = rrd_mkdir_p(journal_dir, 0777);
3535           if (status != 0)
3536           {
3537             fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3538                     journal_dir, rrd_strerror(errno));
3539             return 6;
3540           }
3541           if (access(journal_dir, R_OK|W_OK|X_OK) != 0)
3542           {
3543             fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3544                     errno ? rrd_strerror(errno) : "");
3545             return 6;
3546           }
3547         } else {
3548           fprintf(stderr, "Unable to resolve journal path (%s,%s)\n", optarg,
3549                   errno ? rrd_strerror(errno) : "");
3550           return 6;
3551         }
3552       }
3553       break;
3554
3555       case 'a':
3556       {
3557         int temp = atoi(optarg);
3558         if (temp > 0)
3559           config_alloc_chunk = temp;
3560         else
3561         {
3562           fprintf(stderr, "Invalid allocation size: %s\n", optarg);
3563           return 10;
3564         }
3565       }
3566       break;
3567
3568       case 'h':
3569       case '?':
3570         printf ("RRDCacheD %s\n"
3571             "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3572             "\n"
3573             "Usage: rrdcached [options]\n"
3574             "\n"
3575             "Valid options are:\n"
3576             "  -l <address>  Socket address to listen to.\n"
3577             "                Default: "RRDCACHED_DEFAULT_ADDRESS"\n"
3578             "  -P <perms>    Sets the permissions to assign to all following "
3579                             "sockets\n"
3580             "  -w <seconds>  Interval in which to write data.\n"
3581             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
3582             "  -t <threads>  Number of write threads.\n"
3583             "  -f <seconds>  Interval in which to flush dead data.\n"
3584             "  -p <file>     Location of the PID-file.\n"
3585             "  -b <dir>      Base directory to change to.\n"
3586             "  -B            Restrict file access to paths within -b <dir>\n"
3587             "  -g            Do not fork and run in the foreground.\n"
3588             "  -j <dir>      Directory in which to create the journal files.\n"
3589             "  -F            Always flush all updates at shutdown\n"
3590             "  -s <id|name>  Group owner of all following UNIX sockets\n"
3591             "                (the socket will also have read/write permissions "
3592                             "for that group)\n"
3593             "  -m <mode>     File permissions (octal) of all following UNIX "
3594                             "sockets\n"
3595             "  -a <size>     Memory allocation chunk size. Default is 1.\n"
3596             "  -O            Do not allow CREATE commands to overwrite existing\n"
3597             "                files, even if asked to.\n"
3598             "\n"
3599             "For more information and a detailed description of all options "
3600             "please refer\n"
3601             "to the rrdcached(1) manual page.\n",
3602             VERSION);
3603         if (option == 'h')
3604           status = -1;
3605         else
3606           status = 1;
3607         break;
3608     } /* switch (option) */
3609   } /* while (getopt) */
3610
3611   /* advise the user when values are not sane */
3612   if (config_flush_interval < 2 * config_write_interval)
3613     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3614             " 2x write interval (-w) !\n");
3615   if (config_write_jitter > config_write_interval)
3616     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3617             " write interval (-w) !\n");
3618
3619   if (config_write_base_only && config_base_dir == NULL)
3620     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3621             "  Consult the rrdcached documentation\n");
3622
3623   if (journal_dir == NULL)
3624     config_flush_at_shutdown = 1;
3625
3626   return (status);
3627 } /* }}} int read_options */
3628
3629 int main (int argc, char **argv)
3630 {
3631   int status;
3632
3633   status = read_options (argc, argv);
3634   if (status != 0)
3635   {
3636     if (status < 0)
3637       status = 0;
3638     return (status);
3639   }
3640
3641   status = daemonize ();
3642   if (status != 0)
3643   {
3644     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3645     return (1);
3646   }
3647
3648   journal_init();
3649
3650   /* start the queue threads */
3651   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3652   if (queue_threads == NULL)
3653   {
3654     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3655     cleanup();
3656     return (1);
3657   }
3658   for (int i = 0; i < config_queue_threads; i++)
3659   {
3660     memset (&queue_threads[i], 0, sizeof (*queue_threads));
3661     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3662     if (status != 0)
3663     {
3664       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3665       cleanup();
3666       return (1);
3667     }
3668   }
3669
3670   /* start the flush thread */
3671   memset(&flush_thread, 0, sizeof(flush_thread));
3672   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3673   if (status != 0)
3674   {
3675     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3676     cleanup();
3677     return (1);
3678   }
3679
3680   listen_thread_main (NULL);
3681   cleanup ();
3682
3683   return (0);
3684 } /* int main */
3685
3686 /*
3687  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3688  */