rrdcached: Fix permissions of the default socket.
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008-2010 Florian octo Forster
4  * Copyright (C) 2008,2009 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
73
74 #include "rrd_tool.h"
75 #include "rrd_client.h"
76 #include "unused.h"
77
78 #include <stdlib.h>
79
80 #ifndef WIN32
81 #ifdef HAVE_STDINT_H
82 #  include <stdint.h>
83 #endif
84 #include <unistd.h>
85 #include <strings.h>
86 #include <inttypes.h>
87 #include <sys/socket.h>
88
89 #else
90
91 #endif
92 #include <stdio.h>
93 #include <string.h>
94
95 #include <sys/types.h>
96 #include <sys/stat.h>
97 #include <dirent.h>
98 #include <fcntl.h>
99 #include <signal.h>
100 #include <sys/un.h>
101 #include <netdb.h>
102 #include <poll.h>
103 #include <syslog.h>
104 #include <pthread.h>
105 #include <errno.h>
106 #include <assert.h>
107 #include <sys/time.h>
108 #include <time.h>
109 #include <libgen.h>
110 #include <grp.h>
111
112 #ifdef HAVE_LIBWRAP
113 #include <tcpd.h>
114 #endif /* HAVE_LIBWRAP */
115
116 #include <glib-2.0/glib.h>
117 /* }}} */
118
119 #define RRDD_LOG(severity, ...) \
120   do { \
121     if (stay_foreground) { \
122       fprintf(stderr, __VA_ARGS__); \
123       fprintf(stderr, "\n"); } \
124     syslog ((severity), __VA_ARGS__); \
125   } while (0)
126
127 /*
128  * Types
129  */
130 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
131
132 struct listen_socket_s
133 {
134   int fd;
135   char addr[PATH_MAX + 1];
136   int family;
137
138   /* state for BATCH processing */
139   time_t batch_start;
140   int batch_cmd;
141
142   /* buffered IO */
143   char *rbuf;
144   off_t next_cmd;
145   off_t next_read;
146
147   char *wbuf;
148   ssize_t wbuf_len;
149
150   uint32_t permissions;
151
152   gid_t  socket_group;
153   mode_t socket_permissions;
154 };
155 typedef struct listen_socket_s listen_socket_t;
156
157 struct command_s;
158 typedef struct command_s command_t;
159 /* note: guard against "unused" warnings in the handlers */
160 #define DISPATCH_PROTO  listen_socket_t UNUSED(*sock),\
161                         time_t UNUSED(now),\
162                         char  UNUSED(*buffer),\
163                         size_t UNUSED(buffer_size)
164
165 #define HANDLER_PROTO   command_t UNUSED(*cmd),\
166                         DISPATCH_PROTO
167
168 struct command_s {
169   char   *cmd;
170   int (*handler)(HANDLER_PROTO);
171
172   char  context;                /* where we expect to see it */
173 #define CMD_CONTEXT_CLIENT      (1<<0)
174 #define CMD_CONTEXT_BATCH       (1<<1)
175 #define CMD_CONTEXT_JOURNAL     (1<<2)
176 #define CMD_CONTEXT_ANY         (0x7f)
177
178   char *syntax;
179   char *help;
180 };
181
182 struct cache_item_s;
183 typedef struct cache_item_s cache_item_t;
184 struct cache_item_s
185 {
186   char *file;
187   char **values;
188   size_t values_num;            /* number of valid pointers */
189   size_t values_alloc;          /* number of allocated pointers */
190   time_t last_flush_time;
191   time_t last_update_stamp;
192 #define CI_FLAGS_IN_TREE  (1<<0)
193 #define CI_FLAGS_IN_QUEUE (1<<1)
194   int flags;
195   pthread_cond_t  flushed;
196   cache_item_t *prev;
197   cache_item_t *next;
198 };
199
200 struct callback_flush_data_s
201 {
202   time_t now;
203   time_t abs_timeout;
204   char **keys;
205   size_t keys_num;
206 };
207 typedef struct callback_flush_data_s callback_flush_data_t;
208
209 enum queue_side_e
210 {
211   HEAD,
212   TAIL
213 };
214 typedef enum queue_side_e queue_side_t;
215
216 /* describe a set of journal files */
217 typedef struct {
218   char **files;
219   size_t files_num;
220 } journal_set;
221
222 /* max length of socket command or response */
223 #define CMD_MAX 4096
224 #define RBUF_SIZE (CMD_MAX*2)
225
226 /*
227  * Variables
228  */
229 static int stay_foreground = 0;
230 static uid_t daemon_uid;
231
232 static listen_socket_t *listen_fds = NULL;
233 static size_t listen_fds_num = 0;
234
235 static listen_socket_t default_socket;
236
237 enum {
238   RUNNING,              /* normal operation */
239   FLUSHING,             /* flushing remaining values */
240   SHUTDOWN              /* shutting down */
241 } state = RUNNING;
242
243 static pthread_t *queue_threads;
244 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
245 static int config_queue_threads = 4;
246
247 static pthread_t flush_thread;
248 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
249
250 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
251 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
252 static int connection_threads_num = 0;
253
254 /* Cache stuff */
255 static GTree          *cache_tree = NULL;
256 static cache_item_t   *cache_queue_head = NULL;
257 static cache_item_t   *cache_queue_tail = NULL;
258 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
259
260 static int config_write_interval = 300;
261 static int config_write_jitter   = 0;
262 static int config_flush_interval = 3600;
263 static int config_flush_at_shutdown = 0;
264 static char *config_pid_file = NULL;
265 static char *config_base_dir = NULL;
266 static size_t _config_base_dir_len = 0;
267 static int config_write_base_only = 0;
268 static size_t config_alloc_chunk = 1;
269
270 static listen_socket_t **config_listen_address_list = NULL;
271 static size_t config_listen_address_list_len = 0;
272
273 static uint64_t stats_queue_length = 0;
274 static uint64_t stats_updates_received = 0;
275 static uint64_t stats_flush_received = 0;
276 static uint64_t stats_updates_written = 0;
277 static uint64_t stats_data_sets_written = 0;
278 static uint64_t stats_journal_bytes = 0;
279 static uint64_t stats_journal_rotate = 0;
280 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
281
282 static int opt_no_overwrite = 0; /* default for the daemon */
283
284 /* Journaled updates */
285 #define JOURNAL_REPLAY(s) ((s) == NULL)
286 #define JOURNAL_BASE "rrd.journal"
287 static journal_set *journal_cur = NULL;
288 static journal_set *journal_old = NULL;
289 static char *journal_dir = NULL;
290 static FILE *journal_fh = NULL;         /* current journal file handle */
291 static long  journal_size = 0;          /* current journal size */
292 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
293 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
294 static int journal_write(char *cmd, char *args);
295 static void journal_done(void);
296 static void journal_rotate(void);
297
298 /* prototypes for forward refernces */
299 static int handle_request_help (HANDLER_PROTO);
300
301 /* 
302  * Functions
303  */
304 static void sig_common (const char *sig) /* {{{ */
305 {
306   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
307   state = FLUSHING;
308   pthread_cond_broadcast(&flush_cond);
309   pthread_cond_broadcast(&queue_cond);
310 } /* }}} void sig_common */
311
312 static void sig_int_handler (int UNUSED(s)) /* {{{ */
313 {
314   sig_common("INT");
315 } /* }}} void sig_int_handler */
316
317 static void sig_term_handler (int UNUSED(s)) /* {{{ */
318 {
319   sig_common("TERM");
320 } /* }}} void sig_term_handler */
321
322 static void sig_usr1_handler (int UNUSED(s)) /* {{{ */
323 {
324   config_flush_at_shutdown = 1;
325   sig_common("USR1");
326 } /* }}} void sig_usr1_handler */
327
328 static void sig_usr2_handler (int UNUSED(s)) /* {{{ */
329 {
330   config_flush_at_shutdown = 0;
331   sig_common("USR2");
332 } /* }}} void sig_usr2_handler */
333
334 static void install_signal_handlers(void) /* {{{ */
335 {
336   /* These structures are static, because `sigaction' behaves weird if the are
337    * overwritten.. */
338   static struct sigaction sa_int;
339   static struct sigaction sa_term;
340   static struct sigaction sa_pipe;
341   static struct sigaction sa_usr1;
342   static struct sigaction sa_usr2;
343
344   /* Install signal handlers */
345   memset (&sa_int, 0, sizeof (sa_int));
346   sa_int.sa_handler = sig_int_handler;
347   sigaction (SIGINT, &sa_int, NULL);
348
349   memset (&sa_term, 0, sizeof (sa_term));
350   sa_term.sa_handler = sig_term_handler;
351   sigaction (SIGTERM, &sa_term, NULL);
352
353   memset (&sa_pipe, 0, sizeof (sa_pipe));
354   sa_pipe.sa_handler = SIG_IGN;
355   sigaction (SIGPIPE, &sa_pipe, NULL);
356
357   memset (&sa_pipe, 0, sizeof (sa_usr1));
358   sa_usr1.sa_handler = sig_usr1_handler;
359   sigaction (SIGUSR1, &sa_usr1, NULL);
360
361   memset (&sa_usr2, 0, sizeof (sa_usr2));
362   sa_usr2.sa_handler = sig_usr2_handler;
363   sigaction (SIGUSR2, &sa_usr2, NULL);
364
365 } /* }}} void install_signal_handlers */
366
367 static int open_pidfile(char *action, int oflag) /* {{{ */
368 {
369   int fd;
370   const char *file;
371   char *file_copy, *dir;
372
373   file = (config_pid_file != NULL)
374     ? config_pid_file
375     : LOCALSTATEDIR "/run/rrdcached.pid";
376
377   /* dirname may modify its argument */
378   file_copy = strdup(file);
379   if (file_copy == NULL)
380   {
381     fprintf(stderr, "rrdcached: strdup(): %s\n",
382         rrd_strerror(errno));
383     return -1;
384   }
385
386   dir = dirname(file_copy);
387   if (rrd_mkdir_p(dir, 0777) != 0)
388   {
389     fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
390         dir, rrd_strerror(errno));
391     return -1;
392   }
393
394   free(file_copy);
395
396   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
397   if (fd < 0)
398     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
399             action, file, rrd_strerror(errno));
400
401   return(fd);
402 } /* }}} static int open_pidfile */
403
404 /* check existing pid file to see whether a daemon is running */
405 static int check_pidfile(void)
406 {
407   int pid_fd;
408   pid_t pid;
409   char pid_str[16];
410
411   pid_fd = open_pidfile("open", O_RDWR);
412   if (pid_fd < 0)
413     return pid_fd;
414
415   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
416     return -1;
417
418   pid = atoi(pid_str);
419   if (pid <= 0)
420     return -1;
421
422   /* another running process that we can signal COULD be
423    * a competing rrdcached */
424   if (pid != getpid() && kill(pid, 0) == 0)
425   {
426     fprintf(stderr,
427             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
428     close(pid_fd);
429     return -1;
430   }
431
432   lseek(pid_fd, 0, SEEK_SET);
433   if (ftruncate(pid_fd, 0) == -1)
434   {
435     fprintf(stderr,
436             "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
437     close(pid_fd);
438     return -1;
439   }
440
441   fprintf(stderr,
442           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
443           "rrdcached: starting normally.\n", pid);
444
445   return pid_fd;
446 } /* }}} static int check_pidfile */
447
448 static int write_pidfile (int fd) /* {{{ */
449 {
450   pid_t pid;
451   FILE *fh;
452
453   pid = getpid ();
454
455   fh = fdopen (fd, "w");
456   if (fh == NULL)
457   {
458     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
459     close(fd);
460     return (-1);
461   }
462
463   fprintf (fh, "%i\n", (int) pid);
464   fclose (fh);
465
466   return (0);
467 } /* }}} int write_pidfile */
468
469 static int remove_pidfile (void) /* {{{ */
470 {
471   char *file;
472   int status;
473
474   file = (config_pid_file != NULL)
475     ? config_pid_file
476     : LOCALSTATEDIR "/run/rrdcached.pid";
477
478   status = unlink (file);
479   if (status == 0)
480     return (0);
481   return (errno);
482 } /* }}} int remove_pidfile */
483
484 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
485 {
486   char *eol;
487
488   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
489                sock->next_read - sock->next_cmd);
490
491   if (eol == NULL)
492   {
493     /* no commands left, move remainder back to front of rbuf */
494     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
495             sock->next_read - sock->next_cmd);
496     sock->next_read -= sock->next_cmd;
497     sock->next_cmd = 0;
498     *len = 0;
499     return NULL;
500   }
501   else
502   {
503     char *cmd = sock->rbuf + sock->next_cmd;
504     *eol = '\0';
505
506     sock->next_cmd = eol - sock->rbuf + 1;
507
508     if (eol > sock->rbuf && *(eol-1) == '\r')
509       *(--eol) = '\0'; /* handle "\r\n" EOL */
510
511     *len = eol - cmd;
512
513     return cmd;
514   }
515
516   /* NOTREACHED */
517   assert(1==0);
518 } /* }}} char *next_cmd */
519
520 /* add the characters directly to the write buffer */
521 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
522 {
523   char *new_buf;
524
525   assert(sock != NULL);
526
527   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
528   if (new_buf == NULL)
529   {
530     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
531     return -1;
532   }
533
534   strncpy(new_buf + sock->wbuf_len, str, len + 1);
535
536   sock->wbuf = new_buf;
537   sock->wbuf_len += len;
538
539   return 0;
540 } /* }}} static int add_to_wbuf */
541
542 /* add the text to the "extra" info that's sent after the status line */
543 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
544 {
545   va_list argp;
546   char buffer[CMD_MAX];
547   int len;
548
549   if (JOURNAL_REPLAY(sock)) return 0;
550   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
551
552   va_start(argp, fmt);
553 #ifdef HAVE_VSNPRINTF
554   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
555 #else
556   len = vsprintf(buffer, fmt, argp);
557 #endif
558   va_end(argp);
559   if (len < 0)
560   {
561     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
562     return -1;
563   }
564
565   return add_to_wbuf(sock, buffer, len);
566 } /* }}} static int add_response_info */
567
568 static int count_lines(char *str) /* {{{ */
569 {
570   int lines = 0;
571
572   if (str != NULL)
573   {
574     while ((str = strchr(str, '\n')) != NULL)
575     {
576       ++lines;
577       ++str;
578     }
579   }
580
581   return lines;
582 } /* }}} static int count_lines */
583
584 /* send the response back to the user.
585  * returns 0 on success, -1 on error
586  * write buffer is always zeroed after this call */
587 static int send_response (listen_socket_t *sock, response_code rc,
588                           char *fmt, ...) /* {{{ */
589 {
590   va_list argp;
591   char buffer[CMD_MAX];
592   int lines;
593   ssize_t wrote;
594   int rclen, len;
595
596   if (JOURNAL_REPLAY(sock)) return rc;
597
598   if (sock->batch_start)
599   {
600     if (rc == RESP_OK)
601       return rc; /* no response on success during BATCH */
602     lines = sock->batch_cmd;
603   }
604   else if (rc == RESP_OK)
605     lines = count_lines(sock->wbuf);
606   else
607     lines = -1;
608
609   rclen = sprintf(buffer, "%d ", lines);
610   va_start(argp, fmt);
611 #ifdef HAVE_VSNPRINTF
612   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
613 #else
614   len = vsprintf(buffer+rclen, fmt, argp);
615 #endif
616   va_end(argp);
617   if (len < 0)
618     return -1;
619
620   len += rclen;
621
622   /* append the result to the wbuf, don't write to the user */
623   if (sock->batch_start)
624     return add_to_wbuf(sock, buffer, len);
625
626   /* first write must be complete */
627   if (len != write(sock->fd, buffer, len))
628   {
629     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
630     return -1;
631   }
632
633   if (sock->wbuf != NULL && rc == RESP_OK)
634   {
635     wrote = 0;
636     while (wrote < sock->wbuf_len)
637     {
638       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
639       if (wb <= 0)
640       {
641         RRDD_LOG(LOG_INFO, "send_response: could not write results");
642         return -1;
643       }
644       wrote += wb;
645     }
646   }
647
648   free(sock->wbuf); sock->wbuf = NULL;
649   sock->wbuf_len = 0;
650
651   return 0;
652 } /* }}} */
653
654 static void wipe_ci_values(cache_item_t *ci, time_t when)
655 {
656   ci->values = NULL;
657   ci->values_num = 0;
658   ci->values_alloc = 0;
659
660   ci->last_flush_time = when;
661   if (config_write_jitter > 0)
662     ci->last_flush_time += (rrd_random() % config_write_jitter);
663 }
664
665 /* remove_from_queue
666  * remove a "cache_item_t" item from the queue.
667  * must hold 'cache_lock' when calling this
668  */
669 static void remove_from_queue(cache_item_t *ci) /* {{{ */
670 {
671   if (ci == NULL) return;
672   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
673
674   if (ci->prev == NULL)
675     cache_queue_head = ci->next; /* reset head */
676   else
677     ci->prev->next = ci->next;
678
679   if (ci->next == NULL)
680     cache_queue_tail = ci->prev; /* reset the tail */
681   else
682     ci->next->prev = ci->prev;
683
684   ci->next = ci->prev = NULL;
685   ci->flags &= ~CI_FLAGS_IN_QUEUE;
686
687   pthread_mutex_lock (&stats_lock);
688   assert (stats_queue_length > 0);
689   stats_queue_length--;
690   pthread_mutex_unlock (&stats_lock);
691
692 } /* }}} static void remove_from_queue */
693
694 /* free the resources associated with the cache_item_t
695  * must hold cache_lock when calling this function
696  */
697 static void *free_cache_item(cache_item_t *ci) /* {{{ */
698 {
699   if (ci == NULL) return NULL;
700
701   remove_from_queue(ci);
702
703   for (size_t i=0; i < ci->values_num; i++)
704     free(ci->values[i]);
705
706   free (ci->values);
707   free (ci->file);
708
709   /* in case anyone is waiting */
710   pthread_cond_broadcast(&ci->flushed);
711   pthread_cond_destroy(&ci->flushed);
712
713   free (ci);
714
715   return NULL;
716 } /* }}} static void *free_cache_item */
717
718 /*
719  * enqueue_cache_item:
720  * `cache_lock' must be acquired before calling this function!
721  */
722 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
723     queue_side_t side)
724 {
725   if (ci == NULL)
726     return (-1);
727
728   if (ci->values_num == 0)
729     return (0);
730
731   if (side == HEAD)
732   {
733     if (cache_queue_head == ci)
734       return 0;
735
736     /* remove if further down in queue */
737     remove_from_queue(ci);
738
739     ci->prev = NULL;
740     ci->next = cache_queue_head;
741     if (ci->next != NULL)
742       ci->next->prev = ci;
743     cache_queue_head = ci;
744
745     if (cache_queue_tail == NULL)
746       cache_queue_tail = cache_queue_head;
747   }
748   else /* (side == TAIL) */
749   {
750     /* We don't move values back in the list.. */
751     if (ci->flags & CI_FLAGS_IN_QUEUE)
752       return (0);
753
754     assert (ci->next == NULL);
755     assert (ci->prev == NULL);
756
757     ci->prev = cache_queue_tail;
758
759     if (cache_queue_tail == NULL)
760       cache_queue_head = ci;
761     else
762       cache_queue_tail->next = ci;
763
764     cache_queue_tail = ci;
765   }
766
767   ci->flags |= CI_FLAGS_IN_QUEUE;
768
769   pthread_cond_signal(&queue_cond);
770   pthread_mutex_lock (&stats_lock);
771   stats_queue_length++;
772   pthread_mutex_unlock (&stats_lock);
773
774   return (0);
775 } /* }}} int enqueue_cache_item */
776
777 /*
778  * tree_callback_flush:
779  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
780  * while this is in progress.
781  */
782 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
783     gpointer data)
784 {
785   cache_item_t *ci;
786   callback_flush_data_t *cfd;
787
788   ci = (cache_item_t *) value;
789   cfd = (callback_flush_data_t *) data;
790
791   if (ci->flags & CI_FLAGS_IN_QUEUE)
792     return FALSE;
793
794   if (ci->values_num > 0
795       && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
796   {
797     enqueue_cache_item (ci, TAIL);
798   }
799   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
800       && (ci->values_num <= 0))
801   {
802     assert ((char *) key == ci->file);
803     if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
804     {
805       RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
806       return (FALSE);
807     }
808   }
809
810   return (FALSE);
811 } /* }}} gboolean tree_callback_flush */
812
813 static int flush_old_values (int max_age)
814 {
815   callback_flush_data_t cfd;
816   size_t k;
817
818   memset (&cfd, 0, sizeof (cfd));
819   /* Pass the current time as user data so that we don't need to call
820    * `time' for each node. */
821   cfd.now = time (NULL);
822   cfd.keys = NULL;
823   cfd.keys_num = 0;
824
825   if (max_age > 0)
826     cfd.abs_timeout = cfd.now - max_age;
827   else
828     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
829
830   /* `tree_callback_flush' will return the keys of all values that haven't
831    * been touched in the last `config_flush_interval' seconds in `cfd'.
832    * The char*'s in this array point to the same memory as ci->file, so we
833    * don't need to free them separately. */
834   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
835
836   for (k = 0; k < cfd.keys_num; k++)
837   {
838     gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
839     /* should never fail, since we have held the cache_lock
840      * the entire time */
841     assert(status == TRUE);
842   }
843
844   if (cfd.keys != NULL)
845   {
846     free (cfd.keys);
847     cfd.keys = NULL;
848   }
849
850   return (0);
851 } /* int flush_old_values */
852
853 static void *flush_thread_main (void UNUSED(*args)) /* {{{ */
854 {
855   struct timeval now;
856   struct timespec next_flush;
857   int status;
858
859   gettimeofday (&now, NULL);
860   next_flush.tv_sec = now.tv_sec + config_flush_interval;
861   next_flush.tv_nsec = 1000 * now.tv_usec;
862
863   pthread_mutex_lock(&cache_lock);
864
865   while (state == RUNNING)
866   {
867     gettimeofday (&now, NULL);
868     if ((now.tv_sec > next_flush.tv_sec)
869         || ((now.tv_sec == next_flush.tv_sec)
870           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
871     {
872       RRDD_LOG(LOG_DEBUG, "flushing old values");
873
874       /* Determine the time of the next cache flush. */
875       next_flush.tv_sec = now.tv_sec + config_flush_interval;
876
877       /* Flush all values that haven't been written in the last
878        * `config_write_interval' seconds. */
879       flush_old_values (config_write_interval);
880
881       /* unlock the cache while we rotate so we don't block incoming
882        * updates if the fsync() blocks on disk I/O */
883       pthread_mutex_unlock(&cache_lock);
884       journal_rotate();
885       pthread_mutex_lock(&cache_lock);
886     }
887
888     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
889     if (status != 0 && status != ETIMEDOUT)
890     {
891       RRDD_LOG (LOG_ERR, "flush_thread_main: "
892                 "pthread_cond_timedwait returned %i.", status);
893     }
894   }
895
896   if (config_flush_at_shutdown)
897     flush_old_values (-1); /* flush everything */
898
899   state = SHUTDOWN;
900
901   pthread_mutex_unlock(&cache_lock);
902
903   return NULL;
904 } /* void *flush_thread_main */
905
906 static void *queue_thread_main (void UNUSED(*args)) /* {{{ */
907 {
908   pthread_mutex_lock (&cache_lock);
909
910   while (state != SHUTDOWN
911          || (cache_queue_head != NULL && config_flush_at_shutdown))
912   {
913     cache_item_t *ci;
914     char *file;
915     char **values;
916     size_t values_num;
917     int status;
918
919     /* Now, check if there's something to store away. If not, wait until
920      * something comes in. */
921     if (cache_queue_head == NULL)
922     {
923       status = pthread_cond_wait (&queue_cond, &cache_lock);
924       if ((status != 0) && (status != ETIMEDOUT))
925       {
926         RRDD_LOG (LOG_ERR, "queue_thread_main: "
927             "pthread_cond_wait returned %i.", status);
928       }
929     }
930
931     /* Check if a value has arrived. This may be NULL if we timed out or there
932      * was an interrupt such as a signal. */
933     if (cache_queue_head == NULL)
934       continue;
935
936     ci = cache_queue_head;
937
938     /* copy the relevant parts */
939     file = strdup (ci->file);
940     if (file == NULL)
941     {
942       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
943       continue;
944     }
945
946     assert(ci->values != NULL);
947     assert(ci->values_num > 0);
948
949     values = ci->values;
950     values_num = ci->values_num;
951
952     wipe_ci_values(ci, time(NULL));
953     remove_from_queue(ci);
954
955     pthread_mutex_unlock (&cache_lock);
956
957     rrd_clear_error ();
958     status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
959     if (status != 0)
960     {
961       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
962           "rrd_update_r (%s) failed with status %i. (%s)",
963           file, status, rrd_get_error());
964     }
965
966     journal_write("wrote", file);
967
968     /* Search again in the tree.  It's possible someone issued a "FORGET"
969      * while we were writing the update values. */
970     pthread_mutex_lock(&cache_lock);
971     ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
972     if (ci)
973       pthread_cond_broadcast(&ci->flushed);
974     pthread_mutex_unlock(&cache_lock);
975
976     if (status == 0)
977     {
978       pthread_mutex_lock (&stats_lock);
979       stats_updates_written++;
980       stats_data_sets_written += values_num;
981       pthread_mutex_unlock (&stats_lock);
982     }
983
984     rrd_free_ptrs((void ***) &values, &values_num);
985     free(file);
986
987     pthread_mutex_lock (&cache_lock);
988   }
989   pthread_mutex_unlock (&cache_lock);
990
991   return (NULL);
992 } /* }}} void *queue_thread_main */
993
994 static int buffer_get_field (char **buffer_ret, /* {{{ */
995     size_t *buffer_size_ret, char **field_ret)
996 {
997   char *buffer;
998   size_t buffer_pos;
999   size_t buffer_size;
1000   char *field;
1001   size_t field_size;
1002   int status;
1003
1004   buffer = *buffer_ret;
1005   buffer_pos = 0;
1006   buffer_size = *buffer_size_ret;
1007   field = *buffer_ret;
1008   field_size = 0;
1009
1010   if (buffer_size <= 0)
1011     return (-1);
1012
1013   /* This is ensured by `handle_request'. */
1014   assert (buffer[buffer_size - 1] == '\0');
1015
1016   status = -1;
1017   while (buffer_pos < buffer_size)
1018   {
1019     /* Check for end-of-field or end-of-buffer */
1020     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1021     {
1022       field[field_size] = 0;
1023       field_size++;
1024       buffer_pos++;
1025       status = 0;
1026       break;
1027     }
1028     /* Handle escaped characters. */
1029     else if (buffer[buffer_pos] == '\\')
1030     {
1031       if (buffer_pos >= (buffer_size - 1))
1032         break;
1033       buffer_pos++;
1034       field[field_size] = buffer[buffer_pos];
1035       field_size++;
1036       buffer_pos++;
1037     }
1038     /* Normal operation */ 
1039     else
1040     {
1041       field[field_size] = buffer[buffer_pos];
1042       field_size++;
1043       buffer_pos++;
1044     }
1045   } /* while (buffer_pos < buffer_size) */
1046
1047   if (status != 0)
1048     return (status);
1049
1050   *buffer_ret = buffer + buffer_pos;
1051   *buffer_size_ret = buffer_size - buffer_pos;
1052   *field_ret = field;
1053
1054   return (0);
1055 } /* }}} int buffer_get_field */
1056
1057 /* if we're restricting writes to the base directory,
1058  * check whether the file falls within the dir
1059  * returns 1 if OK, otherwise 0
1060  */
1061 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1062 {
1063   assert(file != NULL);
1064
1065   if (!config_write_base_only
1066       || JOURNAL_REPLAY(sock)
1067       || config_base_dir == NULL)
1068     return 1;
1069
1070   if (strstr(file, "../") != NULL) goto err;
1071
1072   /* relative paths without "../" are ok */
1073   if (*file != '/') return 1;
1074
1075   /* file must be of the format base + "/" + <1+ char filename> */
1076   if (strlen(file) < _config_base_dir_len + 2) goto err;
1077   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1078   if (*(file + _config_base_dir_len) != '/') goto err;
1079
1080   return 1;
1081
1082 err:
1083   if (sock != NULL && sock->fd >= 0)
1084     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1085
1086   return 0;
1087 } /* }}} static int check_file_access */
1088
1089 /* when using a base dir, convert relative paths to absolute paths.
1090  * if necessary, modifies the "filename" pointer to point
1091  * to the new path created in "tmp".  "tmp" is provided
1092  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1093  *
1094  * this allows us to optimize for the expected case (absolute path)
1095  * with a no-op.
1096  */
1097 static void get_abs_path(char **filename, char *tmp)
1098 {
1099   assert(tmp != NULL);
1100   assert(filename != NULL && *filename != NULL);
1101
1102   if (config_base_dir == NULL || **filename == '/')
1103     return;
1104
1105   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1106   *filename = tmp;
1107 } /* }}} static int get_abs_path */
1108
1109 static int flush_file (const char *filename) /* {{{ */
1110 {
1111   cache_item_t *ci;
1112
1113   pthread_mutex_lock (&cache_lock);
1114
1115   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1116   if (ci == NULL)
1117   {
1118     pthread_mutex_unlock (&cache_lock);
1119     return (ENOENT);
1120   }
1121
1122   if (ci->values_num > 0)
1123   {
1124     /* Enqueue at head */
1125     enqueue_cache_item (ci, HEAD);
1126     pthread_cond_wait(&ci->flushed, &cache_lock);
1127   }
1128
1129   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1130    * may have been purged during our cond_wait() */
1131
1132   pthread_mutex_unlock(&cache_lock);
1133
1134   return (0);
1135 } /* }}} int flush_file */
1136
1137 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1138 {
1139   char *err = "Syntax error.\n";
1140
1141   if (cmd && cmd->syntax)
1142     err = cmd->syntax;
1143
1144   return send_response(sock, RESP_ERR, "Usage: %s", err);
1145 } /* }}} static int syntax_error() */
1146
1147 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1148 {
1149   uint64_t copy_queue_length;
1150   uint64_t copy_updates_received;
1151   uint64_t copy_flush_received;
1152   uint64_t copy_updates_written;
1153   uint64_t copy_data_sets_written;
1154   uint64_t copy_journal_bytes;
1155   uint64_t copy_journal_rotate;
1156
1157   uint64_t tree_nodes_number;
1158   uint64_t tree_depth;
1159
1160   pthread_mutex_lock (&stats_lock);
1161   copy_queue_length       = stats_queue_length;
1162   copy_updates_received   = stats_updates_received;
1163   copy_flush_received     = stats_flush_received;
1164   copy_updates_written    = stats_updates_written;
1165   copy_data_sets_written  = stats_data_sets_written;
1166   copy_journal_bytes      = stats_journal_bytes;
1167   copy_journal_rotate     = stats_journal_rotate;
1168   pthread_mutex_unlock (&stats_lock);
1169
1170   pthread_mutex_lock (&cache_lock);
1171   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1172   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1173   pthread_mutex_unlock (&cache_lock);
1174
1175   add_response_info(sock,
1176                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1177   add_response_info(sock,
1178                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1179   add_response_info(sock,
1180                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1181   add_response_info(sock,
1182                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1183   add_response_info(sock,
1184                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1185   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1186   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1187   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1188   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1189
1190   send_response(sock, RESP_OK, "Statistics follow\n");
1191
1192   return (0);
1193 } /* }}} int handle_request_stats */
1194
1195 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1196 {
1197   char *file, file_tmp[PATH_MAX];
1198   int status;
1199
1200   status = buffer_get_field (&buffer, &buffer_size, &file);
1201   if (status != 0)
1202   {
1203     return syntax_error(sock,cmd);
1204   }
1205   else
1206   {
1207     pthread_mutex_lock(&stats_lock);
1208     stats_flush_received++;
1209     pthread_mutex_unlock(&stats_lock);
1210
1211     get_abs_path(&file, file_tmp);
1212     if (!check_file_access(file, sock)) return 0;
1213
1214     status = flush_file (file);
1215     if (status == 0)
1216       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1217     else if (status == ENOENT)
1218     {
1219       /* no file in our tree; see whether it exists at all */
1220       struct stat statbuf;
1221
1222       memset(&statbuf, 0, sizeof(statbuf));
1223       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1224         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1225       else
1226         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1227     }
1228     else if (status < 0)
1229       return send_response(sock, RESP_ERR, "Internal error.\n");
1230     else
1231       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1232   }
1233
1234   /* NOTREACHED */
1235   assert(1==0);
1236 } /* }}} int handle_request_flush */
1237
1238 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1239 {
1240   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1241
1242   pthread_mutex_lock(&cache_lock);
1243   flush_old_values(-1);
1244   pthread_mutex_unlock(&cache_lock);
1245
1246   return send_response(sock, RESP_OK, "Started flush.\n");
1247 } /* }}} static int handle_request_flushall */
1248
1249 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1250 {
1251   int status;
1252   char *file, file_tmp[PATH_MAX];
1253   cache_item_t *ci;
1254
1255   status = buffer_get_field(&buffer, &buffer_size, &file);
1256   if (status != 0)
1257     return syntax_error(sock,cmd);
1258
1259   get_abs_path(&file, file_tmp);
1260
1261   pthread_mutex_lock(&cache_lock);
1262   ci = g_tree_lookup(cache_tree, file);
1263   if (ci == NULL)
1264   {
1265     pthread_mutex_unlock(&cache_lock);
1266     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1267   }
1268
1269   for (size_t i=0; i < ci->values_num; i++)
1270     add_response_info(sock, "%s\n", ci->values[i]);
1271
1272   pthread_mutex_unlock(&cache_lock);
1273   return send_response(sock, RESP_OK, "updates pending\n");
1274 } /* }}} static int handle_request_pending */
1275
1276 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1277 {
1278   int status;
1279   gboolean found;
1280   char *file, file_tmp[PATH_MAX];
1281
1282   status = buffer_get_field(&buffer, &buffer_size, &file);
1283   if (status != 0)
1284     return syntax_error(sock,cmd);
1285
1286   get_abs_path(&file, file_tmp);
1287   if (!check_file_access(file, sock)) return 0;
1288
1289   pthread_mutex_lock(&cache_lock);
1290   found = g_tree_remove(cache_tree, file);
1291   pthread_mutex_unlock(&cache_lock);
1292
1293   if (found == TRUE)
1294   {
1295     if (!JOURNAL_REPLAY(sock))
1296       journal_write("forget", file);
1297
1298     return send_response(sock, RESP_OK, "Gone!\n");
1299   }
1300   else
1301     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1302
1303   /* NOTREACHED */
1304   assert(1==0);
1305 } /* }}} static int handle_request_forget */
1306
1307 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1308 {
1309   cache_item_t *ci;
1310
1311   pthread_mutex_lock(&cache_lock);
1312
1313   ci = cache_queue_head;
1314   while (ci != NULL)
1315   {
1316     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1317     ci = ci->next;
1318   }
1319
1320   pthread_mutex_unlock(&cache_lock);
1321
1322   return send_response(sock, RESP_OK, "in queue.\n");
1323 } /* }}} int handle_request_queue */
1324
1325 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1326 {
1327   char *file, file_tmp[PATH_MAX];
1328   int values_num = 0;
1329   int status;
1330   char orig_buf[CMD_MAX];
1331
1332   cache_item_t *ci;
1333
1334   /* save it for the journal later */
1335   if (!JOURNAL_REPLAY(sock))
1336     strncpy(orig_buf, buffer, buffer_size);
1337
1338   status = buffer_get_field (&buffer, &buffer_size, &file);
1339   if (status != 0)
1340     return syntax_error(sock,cmd);
1341
1342   pthread_mutex_lock(&stats_lock);
1343   stats_updates_received++;
1344   pthread_mutex_unlock(&stats_lock);
1345
1346   get_abs_path(&file, file_tmp);
1347   if (!check_file_access(file, sock)) return 0;
1348
1349   pthread_mutex_lock (&cache_lock);
1350   ci = g_tree_lookup (cache_tree, file);
1351
1352   if (ci == NULL) /* {{{ */
1353   {
1354     struct stat statbuf;
1355     cache_item_t *tmp;
1356
1357     /* don't hold the lock while we setup; stat(2) might block */
1358     pthread_mutex_unlock(&cache_lock);
1359
1360     memset (&statbuf, 0, sizeof (statbuf));
1361     status = stat (file, &statbuf);
1362     if (status != 0)
1363     {
1364       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1365
1366       status = errno;
1367       if (status == ENOENT)
1368         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1369       else
1370         return send_response(sock, RESP_ERR,
1371                              "stat failed with error %i.\n", status);
1372     }
1373     if (!S_ISREG (statbuf.st_mode))
1374       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1375
1376     if (access(file, R_OK|W_OK) != 0)
1377       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1378                            file, rrd_strerror(errno));
1379
1380     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1381     if (ci == NULL)
1382     {
1383       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1384
1385       return send_response(sock, RESP_ERR, "malloc failed.\n");
1386     }
1387     memset (ci, 0, sizeof (cache_item_t));
1388
1389     ci->file = strdup (file);
1390     if (ci->file == NULL)
1391     {
1392       free (ci);
1393       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1394
1395       return send_response(sock, RESP_ERR, "strdup failed.\n");
1396     }
1397
1398     wipe_ci_values(ci, now);
1399     ci->flags = CI_FLAGS_IN_TREE;
1400     pthread_cond_init(&ci->flushed, NULL);
1401
1402     pthread_mutex_lock(&cache_lock);
1403
1404     /* another UPDATE might have added this entry in the meantime */
1405     tmp = g_tree_lookup (cache_tree, file);
1406     if (tmp == NULL)
1407       g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1408     else
1409     {
1410       free_cache_item (ci);
1411       ci = tmp;
1412     }
1413
1414     /* state may have changed while we were unlocked */
1415     if (state == SHUTDOWN)
1416       return -1;
1417   } /* }}} */
1418   assert (ci != NULL);
1419
1420   /* don't re-write updates in replay mode */
1421   if (!JOURNAL_REPLAY(sock))
1422     journal_write("update", orig_buf);
1423
1424   while (buffer_size > 0)
1425   {
1426     char *value;
1427     time_t stamp;
1428     char *eostamp;
1429
1430     status = buffer_get_field (&buffer, &buffer_size, &value);
1431     if (status != 0)
1432     {
1433       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1434       break;
1435     }
1436
1437     /* make sure update time is always moving forward */
1438     stamp = strtol(value, &eostamp, 10);
1439     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1440     {
1441       pthread_mutex_unlock(&cache_lock);
1442       return send_response(sock, RESP_ERR,
1443                            "Cannot find timestamp in '%s'!\n", value);
1444     }
1445     else if (stamp <= ci->last_update_stamp)
1446     {
1447       pthread_mutex_unlock(&cache_lock);
1448       return send_response(sock, RESP_ERR,
1449                            "illegal attempt to update using time %ld when last"
1450                            " update time is %ld (minimum one second step)\n",
1451                            stamp, ci->last_update_stamp);
1452     }
1453     else
1454       ci->last_update_stamp = stamp;
1455
1456     if (!rrd_add_strdup_chunk(&ci->values, &ci->values_num, value,
1457                               &ci->values_alloc, config_alloc_chunk))
1458     {
1459       RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1460       continue;
1461     }
1462
1463     values_num++;
1464   }
1465
1466   if (((now - ci->last_flush_time) >= config_write_interval)
1467       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1468       && (ci->values_num > 0))
1469   {
1470     enqueue_cache_item (ci, TAIL);
1471   }
1472
1473   pthread_mutex_unlock (&cache_lock);
1474
1475   if (values_num < 1)
1476     return send_response(sock, RESP_ERR, "No values updated.\n");
1477   else
1478     return send_response(sock, RESP_OK,
1479                          "errors, enqueued %i value(s).\n", values_num);
1480
1481   /* NOTREACHED */
1482   assert(1==0);
1483
1484 } /* }}} int handle_request_update */
1485
1486 static int handle_request_fetch (HANDLER_PROTO) /* {{{ */
1487 {
1488   char *file, file_tmp[PATH_MAX];
1489   char *cf;
1490
1491   char *start_str;
1492   char *end_str;
1493   time_t start_tm;
1494   time_t end_tm;
1495
1496   unsigned long step;
1497   unsigned long ds_cnt;
1498   char **ds_namv;
1499   rrd_value_t *data;
1500
1501   int status;
1502   unsigned long i;
1503   time_t t;
1504   rrd_value_t *data_ptr;
1505
1506   file = NULL;
1507   cf = NULL;
1508   start_str = NULL;
1509   end_str = NULL;
1510
1511   /* Read the arguments */
1512   do /* while (0) */
1513   {
1514     status = buffer_get_field (&buffer, &buffer_size, &file);
1515     if (status != 0)
1516       break;
1517
1518     status = buffer_get_field (&buffer, &buffer_size, &cf);
1519     if (status != 0)
1520       break;
1521
1522     status = buffer_get_field (&buffer, &buffer_size, &start_str);
1523     if (status != 0)
1524     {
1525       start_str = NULL;
1526       status = 0;
1527       break;
1528     }
1529
1530     status = buffer_get_field (&buffer, &buffer_size, &end_str);
1531     if (status != 0)
1532     {
1533       end_str = NULL;
1534       status = 0;
1535       break;
1536     }
1537   } while (0);
1538
1539   if (status != 0)
1540     return (syntax_error(sock,cmd));
1541
1542   get_abs_path(&file, file_tmp);
1543   if (!check_file_access(file, sock)) return 0;
1544
1545   status = flush_file (file);
1546   if ((status != 0) && (status != ENOENT))
1547     return (send_response (sock, RESP_ERR,
1548           "flush_file (%s) failed with status %i.\n", file, status));
1549
1550   t = time (NULL); /* "now" */
1551
1552   /* Parse start time */
1553   if (start_str != NULL)
1554   {
1555     char *endptr;
1556     long value;
1557
1558     endptr = NULL;
1559     errno = 0;
1560     value = strtol (start_str, &endptr, /* base = */ 0);
1561     if ((endptr == start_str) || (errno != 0))
1562       return (send_response(sock, RESP_ERR,
1563             "Cannot parse start time `%s': Only simple integers are allowed.\n",
1564             start_str));
1565
1566     if (value > 0)
1567       start_tm = (time_t) value;
1568     else
1569       start_tm = (time_t) (t + value);
1570   }
1571   else
1572   {
1573     start_tm = t - 86400;
1574   }
1575
1576   /* Parse end time */
1577   if (end_str != NULL)
1578   {
1579     char *endptr;
1580     long value;
1581
1582     endptr = NULL;
1583     errno = 0;
1584     value = strtol (end_str, &endptr, /* base = */ 0);
1585     if ((endptr == end_str) || (errno != 0))
1586       return (send_response(sock, RESP_ERR,
1587             "Cannot parse end time `%s': Only simple integers are allowed.\n",
1588             end_str));
1589
1590     if (value > 0)
1591       end_tm = (time_t) value;
1592     else
1593       end_tm = (time_t) (t + value);
1594   }
1595   else
1596   {
1597     end_tm = t;
1598   }
1599
1600   step = -1;
1601   ds_cnt = 0;
1602   ds_namv = NULL;
1603   data = NULL;
1604
1605   status = rrd_fetch_r (file, cf, &start_tm, &end_tm, &step,
1606       &ds_cnt, &ds_namv, &data);
1607   if (status != 0)
1608     return (send_response(sock, RESP_ERR,
1609           "rrd_fetch_r failed: %s\n", rrd_get_error ()));
1610
1611   add_response_info (sock, "FlushVersion: %lu\n", 1);
1612   add_response_info (sock, "Start: %lu\n", (unsigned long) start_tm);
1613   add_response_info (sock, "End: %lu\n", (unsigned long) end_tm);
1614   add_response_info (sock, "Step: %lu\n", step);
1615   add_response_info (sock, "DSCount: %lu\n", ds_cnt);
1616
1617 #define SSTRCAT(buffer,str,buffer_fill) do { \
1618     size_t str_len = strlen (str); \
1619     if ((buffer_fill + str_len) > sizeof (buffer)) \
1620       str_len = sizeof (buffer) - buffer_fill; \
1621     if (str_len > 0) { \
1622       strncpy (buffer + buffer_fill, str, str_len); \
1623       buffer_fill += str_len; \
1624       assert (buffer_fill <= sizeof (buffer)); \
1625       if (buffer_fill == sizeof (buffer)) \
1626         buffer[buffer_fill - 1] = 0; \
1627       else \
1628         buffer[buffer_fill] = 0; \
1629     } \
1630   } while (0)
1631
1632   { /* Add list of DS names */
1633     char linebuf[1024];
1634     size_t linebuf_fill;
1635
1636     memset (linebuf, 0, sizeof (linebuf));
1637     linebuf_fill = 0;
1638     for (i = 0; i < ds_cnt; i++)
1639     {
1640       if (i > 0)
1641         SSTRCAT (linebuf, " ", linebuf_fill);
1642       SSTRCAT (linebuf, ds_namv[i], linebuf_fill);
1643       rrd_freemem(ds_namv[i]);
1644     }
1645     rrd_freemem(ds_namv);
1646     add_response_info (sock, "DSName: %s\n", linebuf);
1647   }
1648
1649   /* Add the actual data */
1650   assert (step > 0);
1651   data_ptr = data;
1652   for (t = start_tm + step; t <= end_tm; t += step)
1653   {
1654     char linebuf[1024];
1655     size_t linebuf_fill;
1656     char tmp[128];
1657
1658     memset (linebuf, 0, sizeof (linebuf));
1659     linebuf_fill = 0;
1660     for (i = 0; i < ds_cnt; i++)
1661     {
1662       snprintf (tmp, sizeof (tmp), " %0.10e", *data_ptr);
1663       tmp[sizeof (tmp) - 1] = 0;
1664       SSTRCAT (linebuf, tmp, linebuf_fill);
1665
1666       data_ptr++;
1667     }
1668
1669     add_response_info (sock, "%10lu:%s\n", (unsigned long) t, linebuf);
1670   } /* for (t) */
1671   rrd_freemem(data);
1672
1673   return (send_response (sock, RESP_OK, "Success\n"));
1674 #undef SSTRCAT
1675 } /* }}} int handle_request_fetch */
1676
1677 /* we came across a "WROTE" entry during journal replay.
1678  * throw away any values that we have accumulated for this file
1679  */
1680 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1681 {
1682   cache_item_t *ci;
1683   const char *file = buffer;
1684
1685   pthread_mutex_lock(&cache_lock);
1686
1687   ci = g_tree_lookup(cache_tree, file);
1688   if (ci == NULL)
1689   {
1690     pthread_mutex_unlock(&cache_lock);
1691     return (0);
1692   }
1693
1694   if (ci->values)
1695     rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1696
1697   wipe_ci_values(ci, now);
1698   remove_from_queue(ci);
1699
1700   pthread_mutex_unlock(&cache_lock);
1701   return (0);
1702 } /* }}} int handle_request_wrote */
1703
1704 static int handle_request_info (HANDLER_PROTO) /* {{{ */
1705 {
1706   char *file, file_tmp[PATH_MAX];
1707   int status;
1708   rrd_info_t *info;
1709
1710   /* obtain filename */
1711   status = buffer_get_field(&buffer, &buffer_size, &file);
1712   if (status != 0)
1713     return syntax_error(sock,cmd);
1714   /* get full pathname */
1715   get_abs_path(&file, file_tmp);
1716   if (!check_file_access(file, sock)) {
1717     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1718   }
1719   /* get data */
1720   rrd_clear_error ();
1721   info = rrd_info_r(file);
1722   if(!info) {
1723     return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1724   }
1725   for (rrd_info_t *data = info; data != NULL; data = data->next) {
1726       switch (data->type) {
1727       case RD_I_VAL:
1728           if (isnan(data->value.u_val))
1729               add_response_info(sock,"%s %d NaN\n",data->key, data->type);
1730           else
1731               add_response_info(sock,"%s %d %0.10e\n", data->key, data->type, data->value.u_val);
1732           break;
1733       case RD_I_CNT:
1734           add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_cnt);
1735           break;
1736       case RD_I_INT:
1737           add_response_info(sock,"%s %d %d\n", data->key, data->type, data->value.u_int);
1738           break;
1739       case RD_I_STR:
1740           add_response_info(sock,"%s %d %s\n", data->key, data->type, data->value.u_str);
1741           break;
1742       case RD_I_BLO:
1743           add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_blo.size);
1744           break;
1745       }
1746   }
1747
1748   rrd_info_free(info);
1749
1750   return send_response(sock, RESP_OK, "Info for %s follows\n",file);
1751 } /* }}} static int handle_request_info  */
1752
1753 static int handle_request_first (HANDLER_PROTO) /* {{{ */
1754 {
1755   char *i, *file, file_tmp[PATH_MAX];
1756   int status;
1757   int idx;
1758   time_t t;
1759
1760   /* obtain filename */
1761   status = buffer_get_field(&buffer, &buffer_size, &file);
1762   if (status != 0)
1763     return syntax_error(sock,cmd);
1764   /* get full pathname */
1765   get_abs_path(&file, file_tmp);
1766   if (!check_file_access(file, sock)) {
1767     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1768   }
1769
1770   status = buffer_get_field(&buffer, &buffer_size, &i);
1771   if (status != 0)
1772     return syntax_error(sock,cmd);
1773   idx = atoi(i);
1774   if(idx<0) { 
1775     return send_response(sock, RESP_ERR, "Invalid index specified (%d)\n", idx);
1776   }
1777
1778   /* get data */
1779   rrd_clear_error ();
1780   t = rrd_first_r(file,idx);
1781   if(t<1) {
1782     return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1783   }
1784   return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1785 } /* }}} static int handle_request_first  */
1786
1787
1788 static int handle_request_last (HANDLER_PROTO) /* {{{ */
1789 {
1790   char *file, file_tmp[PATH_MAX];
1791   int status;
1792   time_t t, from_file, step;
1793   rrd_file_t * rrd_file;
1794   cache_item_t * ci;
1795   rrd_t rrd; 
1796
1797   /* obtain filename */
1798   status = buffer_get_field(&buffer, &buffer_size, &file);
1799   if (status != 0)
1800     return syntax_error(sock,cmd);
1801   /* get full pathname */
1802   get_abs_path(&file, file_tmp);
1803   if (!check_file_access(file, sock)) {
1804     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1805   }
1806   rrd_clear_error();
1807   rrd_init(&rrd);
1808   rrd_file = rrd_open(file,&rrd,RRD_READONLY);
1809   if(!rrd_file) {
1810     return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1811   }
1812   from_file = rrd.live_head->last_up;
1813   step = rrd.stat_head->pdp_step;
1814   rrd_close(rrd_file);
1815   pthread_mutex_lock(&cache_lock);
1816   ci = g_tree_lookup(cache_tree, file);
1817   if (ci)
1818     t = ci->last_update_stamp;
1819   else
1820     t = from_file;
1821   pthread_mutex_unlock(&cache_lock);
1822   t -= t % step;
1823   rrd_free(&rrd);
1824   if(t<1) {
1825     return send_response(sock, RESP_ERR, "Error: rrdcached: Invalid timestamp returned\n");
1826   }
1827   return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1828 } /* }}} static int handle_request_last  */
1829
1830 static int handle_request_create (HANDLER_PROTO) /* {{{ */
1831 {
1832   char *file, file_tmp[PATH_MAX];
1833   char *tok;
1834   int ac = 0;
1835   char *av[128];
1836   int status;
1837   unsigned long step = 300;
1838   time_t last_up = time(NULL)-10;
1839   int no_overwrite = opt_no_overwrite;
1840
1841
1842   /* obtain filename */
1843   status = buffer_get_field(&buffer, &buffer_size, &file);
1844   if (status != 0)
1845     return syntax_error(sock,cmd);
1846   /* get full pathname */
1847   get_abs_path(&file, file_tmp);
1848   if (!check_file_access(file, sock)) {
1849     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1850   }
1851   RRDD_LOG(LOG_INFO, "rrdcreate request for %s",file);
1852
1853   while ((status = buffer_get_field(&buffer, &buffer_size, &tok)) == 0 && tok) {
1854     if( ! strncmp(tok,"-b",2) ) {
1855       status = buffer_get_field(&buffer, &buffer_size, &tok );
1856       if (status != 0) return syntax_error(sock,cmd);
1857       last_up = (time_t) atol(tok);
1858       continue;
1859     }
1860     if( ! strncmp(tok,"-s",2) ) {
1861       status = buffer_get_field(&buffer, &buffer_size, &tok );
1862       if (status != 0) return syntax_error(sock,cmd);
1863       step = atol(tok);
1864       continue;
1865     }
1866     if( ! strncmp(tok,"-O",2) ) {
1867       no_overwrite = 1;
1868       continue;
1869     }
1870     if( ! strncmp(tok,"DS:",3) ) { av[ac++]=tok; continue; }
1871     if( ! strncmp(tok,"RRA:",4) ) { av[ac++]=tok; continue; }
1872     return syntax_error(sock,cmd);
1873   }
1874   if(step<1) {
1875     return send_response(sock, RESP_ERR, "The step size cannot be less than 1 second.\n");
1876   }
1877   if (last_up < 3600 * 24 * 365 * 10) {
1878     return send_response(sock, RESP_ERR, "The first entry must be after 1980.\n");
1879   }
1880
1881   rrd_clear_error ();
1882   status = rrd_create_r2(file,step,last_up,no_overwrite,ac,(const char **)av);
1883
1884   if(!status) {
1885     return send_response(sock, RESP_OK, "RRD created OK\n");
1886   }
1887   return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1888 } /* }}} static int handle_request_create  */
1889
1890 /* start "BATCH" processing */
1891 static int batch_start (HANDLER_PROTO) /* {{{ */
1892 {
1893   int status;
1894   if (sock->batch_start)
1895     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1896
1897   status = send_response(sock, RESP_OK,
1898                          "Go ahead.  End with dot '.' on its own line.\n");
1899   sock->batch_start = time(NULL);
1900   sock->batch_cmd = 0;
1901
1902   return status;
1903 } /* }}} static int batch_start */
1904
1905 /* finish "BATCH" processing and return results to the client */
1906 static int batch_done (HANDLER_PROTO) /* {{{ */
1907 {
1908   assert(sock->batch_start);
1909   sock->batch_start = 0;
1910   sock->batch_cmd  = 0;
1911   return send_response(sock, RESP_OK, "errors\n");
1912 } /* }}} static int batch_done */
1913
1914 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1915 {
1916   return -1;
1917 } /* }}} static int handle_request_quit */
1918
1919 static command_t list_of_commands[] = { /* {{{ */
1920   {
1921     "UPDATE",
1922     handle_request_update,
1923     CMD_CONTEXT_ANY,
1924     "UPDATE <filename> <values> [<values> ...]\n"
1925     ,
1926     "Adds the given file to the internal cache if it is not yet known and\n"
1927     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1928     "for details.\n"
1929     "\n"
1930     "Each <values> has the following form:\n"
1931     "  <values> = <time>:<value>[:<value>[...]]\n"
1932     "See the rrdupdate(1) manpage for details.\n"
1933   },
1934   {
1935     "WROTE",
1936     handle_request_wrote,
1937     CMD_CONTEXT_JOURNAL,
1938     NULL,
1939     NULL
1940   },
1941   {
1942     "FLUSH",
1943     handle_request_flush,
1944     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1945     "FLUSH <filename>\n"
1946     ,
1947     "Adds the given filename to the head of the update queue and returns\n"
1948     "after it has been dequeued.\n"
1949   },
1950   {
1951     "FLUSHALL",
1952     handle_request_flushall,
1953     CMD_CONTEXT_CLIENT,
1954     "FLUSHALL\n"
1955     ,
1956     "Triggers writing of all pending updates.  Returns immediately.\n"
1957   },
1958   {
1959     "PENDING",
1960     handle_request_pending,
1961     CMD_CONTEXT_CLIENT,
1962     "PENDING <filename>\n"
1963     ,
1964     "Shows any 'pending' updates for a file, in order.\n"
1965     "The updates shown have not yet been written to the underlying RRD file.\n"
1966   },
1967   {
1968     "FORGET",
1969     handle_request_forget,
1970     CMD_CONTEXT_ANY,
1971     "FORGET <filename>\n"
1972     ,
1973     "Removes the file completely from the cache.\n"
1974     "Any pending updates for the file will be lost.\n"
1975   },
1976   {
1977     "QUEUE",
1978     handle_request_queue,
1979     CMD_CONTEXT_CLIENT,
1980     "QUEUE\n"
1981     ,
1982         "Shows all files in the output queue.\n"
1983     "The output is zero or more lines in the following format:\n"
1984     "(where <num_vals> is the number of values to be written)\n"
1985     "\n"
1986     "<num_vals> <filename>\n"
1987   },
1988   {
1989     "STATS",
1990     handle_request_stats,
1991     CMD_CONTEXT_CLIENT,
1992     "STATS\n"
1993     ,
1994     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1995     "a description of the values.\n"
1996   },
1997   {
1998     "HELP",
1999     handle_request_help,
2000     CMD_CONTEXT_CLIENT,
2001     "HELP [<command>]\n",
2002     NULL, /* special! */
2003   },
2004   {
2005     "BATCH",
2006     batch_start,
2007     CMD_CONTEXT_CLIENT,
2008     "BATCH\n"
2009     ,
2010     "The 'BATCH' command permits the client to initiate a bulk load\n"
2011     "   of commands to rrdcached.\n"
2012     "\n"
2013     "Usage:\n"
2014     "\n"
2015     "    client: BATCH\n"
2016     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
2017     "    client: command #1\n"
2018     "    client: command #2\n"
2019     "    client: ... and so on\n"
2020     "    client: .\n"
2021     "    server: 2 errors\n"
2022     "    server: 7 message for command #7\n"
2023     "    server: 9 message for command #9\n"
2024     "\n"
2025     "For more information, consult the rrdcached(1) documentation.\n"
2026   },
2027   {
2028     ".",   /* BATCH terminator */
2029     batch_done,
2030     CMD_CONTEXT_BATCH,
2031     NULL,
2032     NULL
2033   },
2034   {
2035     "FETCH",
2036     handle_request_fetch,
2037     CMD_CONTEXT_CLIENT,
2038     "FETCH <file> <CF> [<start> [<end>]]\n"
2039     ,
2040     "The 'FETCH' can be used by the client to retrieve values from an RRD file.\n"
2041   },
2042   {
2043     "INFO",
2044     handle_request_info,
2045     CMD_CONTEXT_CLIENT,
2046     "INFO <filename>\n",
2047     "The INFO command retrieves information about a specified RRD file.\n"
2048     "This is returned in standard rrdinfo format, a sequence of lines\n"
2049     "with the format <keyname> = <value>\n"
2050     "Note that this is the data as of the last update of the RRD file itself,\n"
2051     "not the last time data was received via rrdcached, so there may be pending\n"
2052     "updates in the queue.  If this bothers you, then first run a FLUSH.\n"
2053   },
2054   {
2055     "FIRST",
2056     handle_request_first,
2057     CMD_CONTEXT_CLIENT,
2058     "FIRST <filename> <rra index>\n",
2059     "The FIRST command retrieves the first data time for a specified RRA in\n"
2060     "an RRD file.\n"
2061   },
2062   {
2063     "LAST",
2064     handle_request_last,
2065     CMD_CONTEXT_CLIENT,
2066     "LAST <filename>\n",
2067     "The LAST command retrieves the last update time for a specified RRD file.\n"
2068     "Note that this is the time of the last update of the RRD file itself, not\n"
2069     "the last time data was received via rrdcached, so there may be pending\n"
2070     "updates in the queue.  If this bothers you, then first run a FLUSH.\n"
2071   },
2072   {
2073     "CREATE",
2074     handle_request_create,
2075     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2076     "CREATE <filename> [-b start] [-s step] [-O] <DS definitions> <RRA definitions>\n",
2077     "The CREATE command will create an RRD file, overwriting any existing file\n"
2078     "unless the -O option is given or rrdcached was started with the -O option.\n"
2079     "The start parameter needs to be in seconds since 1/1/70 (AT-style syntax is\n"
2080     "not acceptable) and the step is in seconds (default is 300).\n"
2081     "The DS and RRA definitions are as for the 'rrdtool create' command.\n"
2082   },
2083   {
2084     "QUIT",
2085     handle_request_quit,
2086     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2087     "QUIT\n"
2088     ,
2089     "Disconnect from rrdcached.\n"
2090   }
2091 }; /* }}} command_t list_of_commands[] */
2092 static size_t list_of_commands_len = sizeof (list_of_commands)
2093   / sizeof (list_of_commands[0]);
2094
2095 static command_t *find_command(char *cmd)
2096 {
2097   size_t i;
2098
2099   for (i = 0; i < list_of_commands_len; i++)
2100     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2101       return (&list_of_commands[i]);
2102   return NULL;
2103 }
2104
2105 /* We currently use the index in the `list_of_commands' array as a bit position
2106  * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
2107  * outside these functions so that switching to a more elegant storage method
2108  * is easily possible. */
2109 static ssize_t find_command_index (const char *cmd) /* {{{ */
2110 {
2111   size_t i;
2112
2113   for (i = 0; i < list_of_commands_len; i++)
2114     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2115       return ((ssize_t) i);
2116   return (-1);
2117 } /* }}} ssize_t find_command_index */
2118
2119 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
2120     const char *cmd)
2121 {
2122   ssize_t i;
2123
2124   if (JOURNAL_REPLAY(sock))
2125     return (1);
2126
2127   if (cmd == NULL)
2128     return (-1);
2129
2130   if ((strcasecmp ("QUIT", cmd) == 0)
2131       || (strcasecmp ("HELP", cmd) == 0))
2132     return (1);
2133   else if (strcmp (".", cmd) == 0)
2134     cmd = "BATCH";
2135
2136   i = find_command_index (cmd);
2137   if (i < 0)
2138     return (-1);
2139   assert (i < 32);
2140
2141   if ((sock->permissions & (1 << i)) != 0)
2142     return (1);
2143   return (0);
2144 } /* }}} int socket_permission_check */
2145
2146 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
2147     const char *cmd)
2148 {
2149   ssize_t i;
2150
2151   i = find_command_index (cmd);
2152   if (i < 0)
2153     return (-1);
2154   assert (i < 32);
2155
2156   sock->permissions |= (1 << i);
2157   return (0);
2158 } /* }}} int socket_permission_add */
2159
2160 static void socket_permission_clear (listen_socket_t *sock) /* {{{ */
2161 {
2162   sock->permissions = 0;
2163 } /* }}} socket_permission_clear */
2164
2165 static void socket_permission_copy (listen_socket_t *dest, /* {{{ */
2166     listen_socket_t *src)
2167 {
2168   dest->permissions = src->permissions;
2169 } /* }}} socket_permission_copy */
2170
2171 static void socket_permission_set_all (listen_socket_t *sock) /* {{{ */
2172 {
2173   size_t i;
2174
2175   sock->permissions = 0;
2176   for (i = 0; i < list_of_commands_len; i++)
2177     sock->permissions |= (1 << i);
2178 } /* }}} void socket_permission_set_all */
2179
2180 /* check whether commands are received in the expected context */
2181 static int command_check_context(listen_socket_t *sock, command_t *cmd)
2182 {
2183   if (JOURNAL_REPLAY(sock))
2184     return (cmd->context & CMD_CONTEXT_JOURNAL);
2185   else if (sock->batch_start)
2186     return (cmd->context & CMD_CONTEXT_BATCH);
2187   else
2188     return (cmd->context & CMD_CONTEXT_CLIENT);
2189
2190   /* NOTREACHED */
2191   assert(1==0);
2192 }
2193
2194 static int handle_request_help (HANDLER_PROTO) /* {{{ */
2195 {
2196   int status;
2197   char *cmd_str;
2198   char *resp_txt;
2199   command_t *help = NULL;
2200
2201   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
2202   if (status == 0)
2203     help = find_command(cmd_str);
2204
2205   if (help && (help->syntax || help->help))
2206   {
2207     char tmp[CMD_MAX];
2208
2209     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
2210     resp_txt = tmp;
2211
2212     if (help->syntax)
2213       add_response_info(sock, "Usage: %s\n", help->syntax);
2214
2215     if (help->help)
2216       add_response_info(sock, "%s\n", help->help);
2217   }
2218   else
2219   {
2220     size_t i;
2221
2222     resp_txt = "Command overview\n";
2223
2224     for (i = 0; i < list_of_commands_len; i++)
2225     {
2226       if (list_of_commands[i].syntax == NULL)
2227         continue;
2228       add_response_info (sock, "%s", list_of_commands[i].syntax);
2229     }
2230   }
2231
2232   return send_response(sock, RESP_OK, resp_txt);
2233 } /* }}} int handle_request_help */
2234
2235 static int handle_request (DISPATCH_PROTO) /* {{{ */
2236 {
2237   char *buffer_ptr = buffer;
2238   char *cmd_str = NULL;
2239   command_t *cmd = NULL;
2240   int status;
2241
2242   assert (buffer[buffer_size - 1] == '\0');
2243
2244   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
2245   if (status != 0)
2246   {
2247     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
2248     return (-1);
2249   }
2250
2251   if (sock != NULL && sock->batch_start)
2252     sock->batch_cmd++;
2253
2254   cmd = find_command(cmd_str);
2255   if (!cmd)
2256     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
2257
2258   if (!socket_permission_check (sock, cmd->cmd))
2259     return send_response(sock, RESP_ERR, "Permission denied.\n");
2260
2261   if (!command_check_context(sock, cmd))
2262     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
2263
2264   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
2265 } /* }}} int handle_request */
2266
2267 static void journal_set_free (journal_set *js) /* {{{ */
2268 {
2269   if (js == NULL)
2270     return;
2271
2272   rrd_free_ptrs((void ***) &js->files, &js->files_num);
2273
2274   free(js);
2275 } /* }}} journal_set_free */
2276
2277 static void journal_set_remove (journal_set *js) /* {{{ */
2278 {
2279   if (js == NULL)
2280     return;
2281
2282   for (uint i=0; i < js->files_num; i++)
2283   {
2284     RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
2285     unlink(js->files[i]);
2286   }
2287 } /* }}} journal_set_remove */
2288
2289 /* close current journal file handle.
2290  * MUST hold journal_lock before calling */
2291 static void journal_close(void) /* {{{ */
2292 {
2293   if (journal_fh != NULL)
2294   {
2295     if (fclose(journal_fh) != 0)
2296       RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
2297   }
2298
2299   journal_fh = NULL;
2300   journal_size = 0;
2301 } /* }}} journal_close */
2302
2303 /* MUST hold journal_lock before calling */
2304 static void journal_new_file(void) /* {{{ */
2305 {
2306   struct timeval now;
2307   int  new_fd;
2308   char new_file[PATH_MAX + 1];
2309
2310   assert(journal_dir != NULL);
2311   assert(journal_cur != NULL);
2312
2313   journal_close();
2314
2315   gettimeofday(&now, NULL);
2316   /* this format assures that the files sort in strcmp() order */
2317   snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
2318            journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
2319
2320   new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
2321                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
2322   if (new_fd < 0)
2323     goto error;
2324
2325   journal_fh = fdopen(new_fd, "a");
2326   if (journal_fh == NULL)
2327     goto error;
2328
2329   journal_size = ftell(journal_fh);
2330   RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
2331
2332   /* record the file in the journal set */
2333   rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
2334
2335   return;
2336
2337 error:
2338   RRDD_LOG(LOG_CRIT,
2339            "JOURNALING DISABLED: Error while trying to create %s : %s",
2340            new_file, rrd_strerror(errno));
2341   RRDD_LOG(LOG_CRIT,
2342            "JOURNALING DISABLED: All values will be flushed at shutdown");
2343
2344   close(new_fd);
2345   config_flush_at_shutdown = 1;
2346
2347 } /* }}} journal_new_file */
2348
2349 /* MUST NOT hold journal_lock before calling this */
2350 static void journal_rotate(void) /* {{{ */
2351 {
2352   journal_set *old_js = NULL;
2353
2354   if (journal_dir == NULL)
2355     return;
2356
2357   RRDD_LOG(LOG_DEBUG, "rotating journals");
2358
2359   pthread_mutex_lock(&stats_lock);
2360   ++stats_journal_rotate;
2361   pthread_mutex_unlock(&stats_lock);
2362
2363   pthread_mutex_lock(&journal_lock);
2364
2365   journal_close();
2366
2367   /* rotate the journal sets */
2368   old_js = journal_old;
2369   journal_old = journal_cur;
2370   journal_cur = calloc(1, sizeof(journal_set));
2371
2372   if (journal_cur != NULL)
2373     journal_new_file();
2374   else
2375     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2376
2377   pthread_mutex_unlock(&journal_lock);
2378
2379   journal_set_remove(old_js);
2380   journal_set_free  (old_js);
2381
2382 } /* }}} static void journal_rotate */
2383
2384 /* MUST hold journal_lock when calling */
2385 static void journal_done(void) /* {{{ */
2386 {
2387   if (journal_cur == NULL)
2388     return;
2389
2390   journal_close();
2391
2392   if (config_flush_at_shutdown)
2393   {
2394     RRDD_LOG(LOG_INFO, "removing journals");
2395     journal_set_remove(journal_old);
2396     journal_set_remove(journal_cur);
2397   }
2398   else
2399   {
2400     RRDD_LOG(LOG_INFO, "expedited shutdown; "
2401              "journals will be used at next startup");
2402   }
2403
2404   journal_set_free(journal_cur);
2405   journal_set_free(journal_old);
2406   free(journal_dir);
2407
2408 } /* }}} static void journal_done */
2409
2410 static int journal_write(char *cmd, char *args) /* {{{ */
2411 {
2412   int chars;
2413
2414   if (journal_fh == NULL)
2415     return 0;
2416
2417   pthread_mutex_lock(&journal_lock);
2418   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
2419   journal_size += chars;
2420
2421   if (journal_size > JOURNAL_MAX)
2422     journal_new_file();
2423
2424   pthread_mutex_unlock(&journal_lock);
2425
2426   if (chars > 0)
2427   {
2428     pthread_mutex_lock(&stats_lock);
2429     stats_journal_bytes += chars;
2430     pthread_mutex_unlock(&stats_lock);
2431   }
2432
2433   return chars;
2434 } /* }}} static int journal_write */
2435
2436 static int journal_replay (const char *file) /* {{{ */
2437 {
2438   FILE *fh;
2439   int entry_cnt = 0;
2440   int fail_cnt = 0;
2441   uint64_t line = 0;
2442   char entry[CMD_MAX];
2443   time_t now;
2444
2445   if (file == NULL) return 0;
2446
2447   {
2448     char *reason = "unknown error";
2449     int status = 0;
2450     struct stat statbuf;
2451
2452     memset(&statbuf, 0, sizeof(statbuf));
2453     if (stat(file, &statbuf) != 0)
2454     {
2455       reason = "stat error";
2456       status = errno;
2457     }
2458     else if (!S_ISREG(statbuf.st_mode))
2459     {
2460       reason = "not a regular file";
2461       status = EPERM;
2462     }
2463     if (statbuf.st_uid != daemon_uid)
2464     {
2465       reason = "not owned by daemon user";
2466       status = EACCES;
2467     }
2468     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2469     {
2470       reason = "must not be user/group writable";
2471       status = EACCES;
2472     }
2473
2474     if (status != 0)
2475     {
2476       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2477                file, rrd_strerror(status), reason);
2478       return 0;
2479     }
2480   }
2481
2482   fh = fopen(file, "r");
2483   if (fh == NULL)
2484   {
2485     if (errno != ENOENT)
2486       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2487                file, rrd_strerror(errno));
2488     return 0;
2489   }
2490   else
2491     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2492
2493   now = time(NULL);
2494
2495   while(!feof(fh))
2496   {
2497     size_t entry_len;
2498
2499     ++line;
2500     if (fgets(entry, sizeof(entry), fh) == NULL)
2501       break;
2502     entry_len = strlen(entry);
2503
2504     /* check \n termination in case journal writing crashed mid-line */
2505     if (entry_len == 0)
2506       continue;
2507     else if (entry[entry_len - 1] != '\n')
2508     {
2509       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2510       ++fail_cnt;
2511       continue;
2512     }
2513
2514     entry[entry_len - 1] = '\0';
2515
2516     if (handle_request(NULL, now, entry, entry_len) == 0)
2517       ++entry_cnt;
2518     else
2519       ++fail_cnt;
2520   }
2521
2522   fclose(fh);
2523
2524   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2525            entry_cnt, fail_cnt);
2526
2527   return entry_cnt > 0 ? 1 : 0;
2528 } /* }}} static int journal_replay */
2529
2530 static int journal_sort(const void *v1, const void *v2)
2531 {
2532   char **jn1 = (char **) v1;
2533   char **jn2 = (char **) v2;
2534
2535   return strcmp(*jn1,*jn2);
2536 }
2537
2538 static void journal_init(void) /* {{{ */
2539 {
2540   int had_journal = 0;
2541   DIR *dir;
2542   struct dirent *dent;
2543   char path[PATH_MAX+1];
2544
2545   if (journal_dir == NULL) return;
2546
2547   pthread_mutex_lock(&journal_lock);
2548
2549   journal_cur = calloc(1, sizeof(journal_set));
2550   if (journal_cur == NULL)
2551   {
2552     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2553     return;
2554   }
2555
2556   RRDD_LOG(LOG_INFO, "checking for journal files");
2557
2558   /* Handle old journal files during transition.  This gives them the
2559    * correct sort order.  TODO: remove after first release
2560    */
2561   {
2562     char old_path[PATH_MAX+1];
2563     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2564     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2565     rename(old_path, path);
2566
2567     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE        );
2568     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2569     rename(old_path, path);
2570   }
2571
2572   dir = opendir(journal_dir);
2573   if (!dir) {
2574     RRDD_LOG(LOG_CRIT, "journal_init: opendir(%s) failed\n", journal_dir);
2575     return;
2576   }
2577   while ((dent = readdir(dir)) != NULL)
2578   {
2579     /* looks like a journal file? */
2580     if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2581       continue;
2582
2583     snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2584
2585     if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2586     {
2587       RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2588                dent->d_name);
2589       break;
2590     }
2591   }
2592   closedir(dir);
2593
2594   qsort(journal_cur->files, journal_cur->files_num,
2595         sizeof(journal_cur->files[0]), journal_sort);
2596
2597   for (uint i=0; i < journal_cur->files_num; i++)
2598     had_journal += journal_replay(journal_cur->files[i]);
2599
2600   journal_new_file();
2601
2602   /* it must have been a crash.  start a flush */
2603   if (had_journal && config_flush_at_shutdown)
2604     flush_old_values(-1);
2605
2606   pthread_mutex_unlock(&journal_lock);
2607
2608   RRDD_LOG(LOG_INFO, "journal processing complete");
2609
2610 } /* }}} static void journal_init */
2611
2612 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2613 {
2614   assert(sock != NULL);
2615
2616   free(sock->rbuf);  sock->rbuf = NULL;
2617   free(sock->wbuf);  sock->wbuf = NULL;
2618   free(sock);
2619 } /* }}} void free_listen_socket */
2620
2621 static void close_connection(listen_socket_t *sock) /* {{{ */
2622 {
2623   if (sock->fd >= 0)
2624   {
2625     close(sock->fd);
2626     sock->fd = -1;
2627   }
2628
2629   free_listen_socket(sock);
2630
2631 } /* }}} void close_connection */
2632
2633 static void *connection_thread_main (void *args) /* {{{ */
2634 {
2635   listen_socket_t *sock;
2636   int fd;
2637
2638   sock = (listen_socket_t *) args;
2639   fd = sock->fd;
2640
2641   /* init read buffers */
2642   sock->next_read = sock->next_cmd = 0;
2643   sock->rbuf = malloc(RBUF_SIZE);
2644   if (sock->rbuf == NULL)
2645   {
2646     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2647     close_connection(sock);
2648     return NULL;
2649   }
2650
2651   pthread_mutex_lock (&connection_threads_lock);
2652 #ifdef HAVE_LIBWRAP
2653   /* LIBWRAP does not support multiple threads! By putting this code
2654      inside pthread_mutex_lock we do not have to worry about request_info
2655      getting overwritten by another thread.
2656   */
2657   struct request_info req;
2658   request_init(&req, RQ_DAEMON, "rrdcache\0", RQ_FILE, fd, NULL );
2659   fromhost(&req);
2660   if(!hosts_access(&req)) {
2661     RRDD_LOG(LOG_INFO, "refused connection from %s", eval_client(&req));
2662     pthread_mutex_unlock (&connection_threads_lock);
2663     close_connection(sock);
2664     return NULL;
2665   }
2666 #endif /* HAVE_LIBWRAP */
2667   connection_threads_num++;
2668   pthread_mutex_unlock (&connection_threads_lock);
2669
2670   while (state == RUNNING)
2671   {
2672     char *cmd;
2673     ssize_t cmd_len;
2674     ssize_t rbytes;
2675     time_t now;
2676
2677     struct pollfd pollfd;
2678     int status;
2679
2680     pollfd.fd = fd;
2681     pollfd.events = POLLIN | POLLPRI;
2682     pollfd.revents = 0;
2683
2684     status = poll (&pollfd, 1, /* timeout = */ 500);
2685     if (state != RUNNING)
2686       break;
2687     else if (status == 0) /* timeout */
2688       continue;
2689     else if (status < 0) /* error */
2690     {
2691       status = errno;
2692       if (status != EINTR)
2693         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2694       continue;
2695     }
2696
2697     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2698       break;
2699     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2700     {
2701       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2702           "poll(2) returned something unexpected: %#04hx",
2703           pollfd.revents);
2704       break;
2705     }
2706
2707     rbytes = read(fd, sock->rbuf + sock->next_read,
2708                   RBUF_SIZE - sock->next_read);
2709     if (rbytes < 0)
2710     {
2711       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2712       break;
2713     }
2714     else if (rbytes == 0)
2715       break; /* eof */
2716
2717     sock->next_read += rbytes;
2718
2719     if (sock->batch_start)
2720       now = sock->batch_start;
2721     else
2722       now = time(NULL);
2723
2724     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2725     {
2726       status = handle_request (sock, now, cmd, cmd_len+1);
2727       if (status != 0)
2728         goto out_close;
2729     }
2730   }
2731
2732 out_close:
2733   close_connection(sock);
2734
2735   /* Remove this thread from the connection threads list */
2736   pthread_mutex_lock (&connection_threads_lock);
2737   connection_threads_num--;
2738   if (connection_threads_num <= 0)
2739     pthread_cond_broadcast(&connection_threads_done);
2740   pthread_mutex_unlock (&connection_threads_lock);
2741
2742   return (NULL);
2743 } /* }}} void *connection_thread_main */
2744
2745 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2746 {
2747   int fd;
2748   struct sockaddr_un sa;
2749   listen_socket_t *temp;
2750   int status;
2751   const char *path;
2752   char *path_copy, *dir;
2753
2754   path = sock->addr;
2755   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2756     path += strlen("unix:");
2757
2758   /* dirname may modify its argument */
2759   path_copy = strdup(path);
2760   if (path_copy == NULL)
2761   {
2762     fprintf(stderr, "rrdcached: strdup(): %s\n",
2763         rrd_strerror(errno));
2764     return (-1);
2765   }
2766
2767   dir = dirname(path_copy);
2768   if (rrd_mkdir_p(dir, 0777) != 0)
2769   {
2770     fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2771         dir, rrd_strerror(errno));
2772     return (-1);
2773   }
2774
2775   free(path_copy);
2776
2777   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2778       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2779   if (temp == NULL)
2780   {
2781     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2782     return (-1);
2783   }
2784   listen_fds = temp;
2785   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2786
2787   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2788   if (fd < 0)
2789   {
2790     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2791              rrd_strerror(errno));
2792     return (-1);
2793   }
2794
2795   memset (&sa, 0, sizeof (sa));
2796   sa.sun_family = AF_UNIX;
2797   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2798
2799   /* if we've gotten this far, we own the pid file.  any daemon started
2800    * with the same args must not be alive.  therefore, ensure that we can
2801    * create the socket...
2802    */
2803   unlink(path);
2804
2805   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2806   if (status != 0)
2807   {
2808     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2809              path, rrd_strerror(errno));
2810     close (fd);
2811     return (-1);
2812   }
2813
2814   /* tweak the sockets group ownership */
2815   if (sock->socket_group != (gid_t)-1)
2816   {
2817     if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2818          (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2819     {
2820       fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2821     }
2822   }
2823
2824   if (sock->socket_permissions != (mode_t)-1)
2825   {
2826     if (chmod(path, sock->socket_permissions) != 0)
2827       fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2828           (unsigned int)sock->socket_permissions, strerror(errno));
2829   }
2830
2831   status = listen (fd, /* backlog = */ 10);
2832   if (status != 0)
2833   {
2834     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2835              path, rrd_strerror(errno));
2836     close (fd);
2837     unlink (path);
2838     return (-1);
2839   }
2840
2841   listen_fds[listen_fds_num].fd = fd;
2842   listen_fds[listen_fds_num].family = PF_UNIX;
2843   strncpy(listen_fds[listen_fds_num].addr, path,
2844           sizeof (listen_fds[listen_fds_num].addr) - 1);
2845   listen_fds_num++;
2846
2847   return (0);
2848 } /* }}} int open_listen_socket_unix */
2849
2850 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2851 {
2852   struct addrinfo ai_hints;
2853   struct addrinfo *ai_res;
2854   struct addrinfo *ai_ptr;
2855   char addr_copy[NI_MAXHOST];
2856   char *addr;
2857   char *port;
2858   int status;
2859
2860   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2861   addr_copy[sizeof (addr_copy) - 1] = 0;
2862   addr = addr_copy;
2863
2864   memset (&ai_hints, 0, sizeof (ai_hints));
2865   ai_hints.ai_flags = 0;
2866 #ifdef AI_ADDRCONFIG
2867   ai_hints.ai_flags |= AI_ADDRCONFIG;
2868 #endif
2869   ai_hints.ai_family = AF_UNSPEC;
2870   ai_hints.ai_socktype = SOCK_STREAM;
2871
2872   port = NULL;
2873   if (*addr == '[') /* IPv6+port format */
2874   {
2875     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2876     addr++;
2877
2878     port = strchr (addr, ']');
2879     if (port == NULL)
2880     {
2881       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2882       return (-1);
2883     }
2884     *port = 0;
2885     port++;
2886
2887     if (*port == ':')
2888       port++;
2889     else if (*port == 0)
2890       port = NULL;
2891     else
2892     {
2893       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2894       return (-1);
2895     }
2896   } /* if (*addr == '[') */
2897   else
2898   {
2899     port = rindex(addr, ':');
2900     if (port != NULL)
2901     {
2902       *port = 0;
2903       port++;
2904     }
2905   }
2906   ai_res = NULL;
2907   status = getaddrinfo (addr,
2908                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2909                         &ai_hints, &ai_res);
2910   if (status != 0)
2911   {
2912     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2913              addr, gai_strerror (status));
2914     return (-1);
2915   }
2916
2917   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2918   {
2919     int fd;
2920     listen_socket_t *temp;
2921     int one = 1;
2922
2923     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2924         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2925     if (temp == NULL)
2926     {
2927       fprintf (stderr,
2928                "rrdcached: open_listen_socket_network: realloc failed.\n");
2929       continue;
2930     }
2931     listen_fds = temp;
2932     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2933
2934     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2935     if (fd < 0)
2936     {
2937       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2938                rrd_strerror(errno));
2939       continue;
2940     }
2941
2942     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2943
2944     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2945     if (status != 0)
2946     {
2947       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2948                sock->addr, rrd_strerror(errno));
2949       close (fd);
2950       continue;
2951     }
2952
2953     status = listen (fd, /* backlog = */ 10);
2954     if (status != 0)
2955     {
2956       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2957                sock->addr, rrd_strerror(errno));
2958       close (fd);
2959       freeaddrinfo(ai_res);
2960       return (-1);
2961     }
2962
2963     listen_fds[listen_fds_num].fd = fd;
2964     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2965     listen_fds_num++;
2966   } /* for (ai_ptr) */
2967
2968   freeaddrinfo(ai_res);
2969   return (0);
2970 } /* }}} static int open_listen_socket_network */
2971
2972 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2973 {
2974   assert(sock != NULL);
2975   assert(sock->addr != NULL);
2976
2977   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2978       || sock->addr[0] == '/')
2979     return (open_listen_socket_unix(sock));
2980   else
2981     return (open_listen_socket_network(sock));
2982 } /* }}} int open_listen_socket */
2983
2984 static int close_listen_sockets (void) /* {{{ */
2985 {
2986   size_t i;
2987
2988   for (i = 0; i < listen_fds_num; i++)
2989   {
2990     close (listen_fds[i].fd);
2991
2992     if (listen_fds[i].family == PF_UNIX)
2993       unlink(listen_fds[i].addr);
2994   }
2995
2996   free (listen_fds);
2997   listen_fds = NULL;
2998   listen_fds_num = 0;
2999
3000   return (0);
3001 } /* }}} int close_listen_sockets */
3002
3003 static void *listen_thread_main (void UNUSED(*args)) /* {{{ */
3004 {
3005   struct pollfd *pollfds;
3006   int pollfds_num;
3007   int status;
3008   int i;
3009
3010   if (listen_fds_num < 1)
3011   {
3012     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
3013     return (NULL);
3014   }
3015
3016   pollfds_num = listen_fds_num;
3017   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
3018   if (pollfds == NULL)
3019   {
3020     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
3021     return (NULL);
3022   }
3023   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
3024
3025   RRDD_LOG(LOG_INFO, "listening for connections");
3026
3027   while (state == RUNNING)
3028   {
3029     for (i = 0; i < pollfds_num; i++)
3030     {
3031       pollfds[i].fd = listen_fds[i].fd;
3032       pollfds[i].events = POLLIN | POLLPRI;
3033       pollfds[i].revents = 0;
3034     }
3035
3036     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
3037     if (state != RUNNING)
3038       break;
3039     else if (status == 0) /* timeout */
3040       continue;
3041     else if (status < 0) /* error */
3042     {
3043       status = errno;
3044       if (status != EINTR)
3045       {
3046         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
3047       }
3048       continue;
3049     }
3050
3051     for (i = 0; i < pollfds_num; i++)
3052     {
3053       listen_socket_t *client_sock;
3054       struct sockaddr_storage client_sa;
3055       socklen_t client_sa_size;
3056       pthread_t tid;
3057       pthread_attr_t attr;
3058
3059       if (pollfds[i].revents == 0)
3060         continue;
3061
3062       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
3063       {
3064         RRDD_LOG (LOG_ERR, "listen_thread_main: "
3065             "poll(2) returned something unexpected for listen FD #%i.",
3066             pollfds[i].fd);
3067         continue;
3068       }
3069
3070       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
3071       if (client_sock == NULL)
3072       {
3073         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
3074         continue;
3075       }
3076       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
3077
3078       client_sa_size = sizeof (client_sa);
3079       client_sock->fd = accept (pollfds[i].fd,
3080           (struct sockaddr *) &client_sa, &client_sa_size);
3081       if (client_sock->fd < 0)
3082       {
3083         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
3084         free(client_sock);
3085         continue;
3086       }
3087
3088       pthread_attr_init (&attr);
3089       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
3090
3091       status = pthread_create (&tid, &attr, connection_thread_main,
3092                                client_sock);
3093       if (status != 0)
3094       {
3095         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
3096         close_connection(client_sock);
3097         continue;
3098       }
3099     } /* for (pollfds_num) */
3100   } /* while (state == RUNNING) */
3101
3102   RRDD_LOG(LOG_INFO, "starting shutdown");
3103
3104   close_listen_sockets ();
3105
3106   pthread_mutex_lock (&connection_threads_lock);
3107   while (connection_threads_num > 0)
3108     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
3109   pthread_mutex_unlock (&connection_threads_lock);
3110
3111   free(pollfds);
3112
3113   return (NULL);
3114 } /* }}} void *listen_thread_main */
3115
3116 static int daemonize (void) /* {{{ */
3117 {
3118   int pid_fd;
3119   char *base_dir;
3120
3121   daemon_uid = geteuid();
3122
3123   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
3124   if (pid_fd < 0)
3125     pid_fd = check_pidfile();
3126   if (pid_fd < 0)
3127     return pid_fd;
3128
3129   /* open all the listen sockets */
3130   if (config_listen_address_list_len > 0)
3131   {
3132     for (size_t i = 0; i < config_listen_address_list_len; i++)
3133       open_listen_socket (config_listen_address_list[i]);
3134
3135     rrd_free_ptrs((void ***) &config_listen_address_list,
3136                   &config_listen_address_list_len);
3137   }
3138   else
3139   {
3140     strncpy(default_socket.addr, RRDCACHED_DEFAULT_ADDRESS,
3141         sizeof(default_socket.addr) - 1);
3142     default_socket.addr[sizeof(default_socket.addr) - 1] = '\0';
3143
3144     if (default_socket.permissions == 0)
3145       socket_permission_set_all (&default_socket);
3146
3147     open_listen_socket (&default_socket);
3148   }
3149
3150   if (listen_fds_num < 1)
3151   {
3152     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
3153     goto error;
3154   }
3155
3156   if (!stay_foreground)
3157   {
3158     pid_t child;
3159
3160     child = fork ();
3161     if (child < 0)
3162     {
3163       fprintf (stderr, "daemonize: fork(2) failed.\n");
3164       goto error;
3165     }
3166     else if (child > 0)
3167       exit(0);
3168
3169     /* Become session leader */
3170     setsid ();
3171
3172     /* Open the first three file descriptors to /dev/null */
3173     close (2);
3174     close (1);
3175     close (0);
3176
3177     open ("/dev/null", O_RDWR);
3178     if (dup(0) == -1 || dup(0) == -1){
3179         RRDD_LOG (LOG_ERR, "faild to run dup.\n");
3180     }
3181   } /* if (!stay_foreground) */
3182
3183   /* Change into the /tmp directory. */
3184   base_dir = (config_base_dir != NULL)
3185     ? config_base_dir
3186     : "/tmp";
3187
3188   if (chdir (base_dir) != 0)
3189   {
3190     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
3191     goto error;
3192   }
3193
3194   install_signal_handlers();
3195
3196   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
3197   RRDD_LOG(LOG_INFO, "starting up");
3198
3199   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
3200                                 (GDestroyNotify) free_cache_item);
3201   if (cache_tree == NULL)
3202   {
3203     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
3204     goto error;
3205   }
3206
3207   return write_pidfile (pid_fd);
3208
3209 error:
3210   remove_pidfile();
3211   return -1;
3212 } /* }}} int daemonize */
3213
3214 static int cleanup (void) /* {{{ */
3215 {
3216   pthread_cond_broadcast (&flush_cond);
3217   pthread_join (flush_thread, NULL);
3218
3219   pthread_cond_broadcast (&queue_cond);
3220   for (int i = 0; i < config_queue_threads; i++)
3221     pthread_join (queue_threads[i], NULL);
3222
3223   if (config_flush_at_shutdown)
3224   {
3225     assert(cache_queue_head == NULL);
3226     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
3227   }
3228
3229   free(queue_threads);
3230   free(config_base_dir);
3231
3232   pthread_mutex_lock(&cache_lock);
3233   g_tree_destroy(cache_tree);
3234
3235   pthread_mutex_lock(&journal_lock);
3236   journal_done();
3237
3238   RRDD_LOG(LOG_INFO, "goodbye");
3239   closelog ();
3240
3241   remove_pidfile ();
3242   free(config_pid_file);
3243
3244   return (0);
3245 } /* }}} int cleanup */
3246
3247 static int read_options (int argc, char **argv) /* {{{ */
3248 {
3249   int option;
3250   int status = 0;
3251
3252   socket_permission_clear (&default_socket);
3253
3254   default_socket.socket_group = (gid_t)-1;
3255   default_socket.socket_permissions = (mode_t)-1;
3256
3257   while ((option = getopt(argc, argv, "Ogl:s:m:P:f:w:z:t:Bb:p:Fj:a:h?")) != -1)
3258   {
3259     switch (option)
3260     {
3261       case 'O':
3262         opt_no_overwrite = 1;
3263         break;
3264
3265       case 'g':
3266         stay_foreground=1;
3267         break;
3268
3269       case 'l':
3270       {
3271         listen_socket_t *new;
3272
3273         new = malloc(sizeof(listen_socket_t));
3274         if (new == NULL)
3275         {
3276           fprintf(stderr, "read_options: malloc failed.\n");
3277           return(2);
3278         }
3279         memset(new, 0, sizeof(listen_socket_t));
3280
3281         strncpy(new->addr, optarg, sizeof(new->addr)-1);
3282
3283         /* Add permissions to the socket {{{ */
3284         if (default_socket.permissions != 0)
3285         {
3286           socket_permission_copy (new, &default_socket);
3287         }
3288         else /* if (default_socket.permissions == 0) */
3289         {
3290           /* Add permission for ALL commands to the socket. */
3291           socket_permission_set_all (new);
3292         }
3293         /* }}} Done adding permissions. */
3294
3295         new->socket_group = default_socket.socket_group;
3296         new->socket_permissions = default_socket.socket_permissions;
3297
3298         if (!rrd_add_ptr((void ***)&config_listen_address_list,
3299                          &config_listen_address_list_len, new))
3300         {
3301           fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
3302           return (2);
3303         }
3304       }
3305       break;
3306
3307       /* set socket group permissions */
3308       case 's':
3309       {
3310         gid_t group_gid;
3311         struct group *grp;
3312
3313         group_gid = strtoul(optarg, NULL, 10);
3314         if (errno != EINVAL && group_gid>0)
3315         {
3316           /* we were passed a number */
3317           grp = getgrgid(group_gid);
3318         }
3319         else
3320         {
3321           grp = getgrnam(optarg);
3322         }
3323
3324         if (grp)
3325         {
3326           default_socket.socket_group = grp->gr_gid;
3327         }
3328         else
3329         {
3330           /* no idea what the user wanted... */
3331           fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
3332           return (5);
3333         }
3334       }
3335       break;
3336
3337       /* set socket file permissions */
3338       case 'm':
3339       {
3340         long  tmp;
3341         char *endptr = NULL;
3342
3343         tmp = strtol (optarg, &endptr, 8);
3344         if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
3345             || (tmp > 07777) || (tmp < 0)) {
3346           fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
3347               optarg);
3348           return (5);
3349         }
3350
3351         default_socket.socket_permissions = (mode_t)tmp;
3352       }
3353       break;
3354
3355       case 'P':
3356       {
3357         char *optcopy;
3358         char *saveptr;
3359         char *dummy;
3360         char *ptr;
3361
3362         socket_permission_clear (&default_socket);
3363
3364         optcopy = strdup (optarg);
3365         dummy = optcopy;
3366         saveptr = NULL;
3367         while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
3368         {
3369           dummy = NULL;
3370           status = socket_permission_add (&default_socket, ptr);
3371           if (status != 0)
3372           {
3373             fprintf (stderr, "read_options: Adding permission \"%s\" to "
3374                 "socket failed. Most likely, this permission doesn't "
3375                 "exist. Check your command line.\n", ptr);
3376             status = 4;
3377           }
3378         }
3379
3380         free (optcopy);
3381       }
3382       break;
3383
3384       case 'f':
3385       {
3386         int temp;
3387
3388         temp = atoi (optarg);
3389         if (temp > 0)
3390           config_flush_interval = temp;
3391         else
3392         {
3393           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
3394           status = 3;
3395         }
3396       }
3397       break;
3398
3399       case 'w':
3400       {
3401         int temp;
3402
3403         temp = atoi (optarg);
3404         if (temp > 0)
3405           config_write_interval = temp;
3406         else
3407         {
3408           fprintf (stderr, "Invalid write interval: %s\n", optarg);
3409           status = 2;
3410         }
3411       }
3412       break;
3413
3414       case 'z':
3415       {
3416         int temp;
3417
3418         temp = atoi(optarg);
3419         if (temp > 0)
3420           config_write_jitter = temp;
3421         else
3422         {
3423           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
3424           status = 2;
3425         }
3426
3427         break;
3428       }
3429
3430       case 't':
3431       {
3432         int threads;
3433         threads = atoi(optarg);
3434         if (threads >= 1)
3435           config_queue_threads = threads;
3436         else
3437         {
3438           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
3439           return 1;
3440         }
3441       }
3442       break;
3443
3444       case 'B':
3445         config_write_base_only = 1;
3446         break;
3447
3448       case 'b':
3449       {
3450         size_t len;
3451         char base_realpath[PATH_MAX];
3452
3453         if (config_base_dir != NULL)
3454           free (config_base_dir);
3455         config_base_dir = strdup (optarg);
3456         if (config_base_dir == NULL)
3457         {
3458           fprintf (stderr, "read_options: strdup failed.\n");
3459           return (3);
3460         }
3461
3462         if (rrd_mkdir_p (config_base_dir, 0777) != 0)
3463         {
3464           fprintf (stderr, "Failed to create base directory '%s': %s\n",
3465               config_base_dir, rrd_strerror (errno));
3466           return (3);
3467         }
3468
3469         /* make sure that the base directory is not resolved via
3470          * symbolic links.  this makes some performance-enhancing
3471          * assumptions possible (we don't have to resolve paths
3472          * that start with a "/")
3473          */
3474         if (realpath(config_base_dir, base_realpath) == NULL)
3475         {
3476           fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3477               "%s\n", config_base_dir, rrd_strerror(errno));
3478           return 5;
3479         }
3480
3481         len = strlen (config_base_dir);
3482         while ((len > 0) && (config_base_dir[len - 1] == '/'))
3483         {
3484           config_base_dir[len - 1] = 0;
3485           len--;
3486         }
3487
3488         if (len < 1)
3489         {
3490           fprintf (stderr, "Invalid base directory: %s\n", optarg);
3491           return (4);
3492         }
3493
3494         _config_base_dir_len = len;
3495
3496         len = strlen (base_realpath);
3497         while ((len > 0) && (base_realpath[len - 1] == '/'))
3498         {
3499           base_realpath[len - 1] = '\0';
3500           len--;
3501         }
3502
3503         if (strncmp(config_base_dir,
3504                          base_realpath, sizeof(base_realpath)) != 0)
3505         {
3506           fprintf(stderr,
3507                   "Base directory (-b) resolved via file system links!\n"
3508                   "Please consult rrdcached '-b' documentation!\n"
3509                   "Consider specifying the real directory (%s)\n",
3510                   base_realpath);
3511           return 5;
3512         }
3513       }
3514       break;
3515
3516       case 'p':
3517       {
3518         if (config_pid_file != NULL)
3519           free (config_pid_file);
3520         config_pid_file = strdup (optarg);
3521         if (config_pid_file == NULL)
3522         {
3523           fprintf (stderr, "read_options: strdup failed.\n");
3524           return (3);
3525         }
3526       }
3527       break;
3528
3529       case 'F':
3530         config_flush_at_shutdown = 1;
3531         break;
3532
3533       case 'j':
3534       {
3535         char journal_dir_actual[PATH_MAX];
3536         const char *dir;
3537         dir = journal_dir = strdup(realpath((const char *)optarg, journal_dir_actual));
3538
3539         status = rrd_mkdir_p(dir, 0777);
3540         if (status != 0)
3541         {
3542           fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3543               dir, rrd_strerror(errno));
3544           return 6;
3545         }
3546
3547         if (access(dir, R_OK|W_OK|X_OK) != 0)
3548         {
3549           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3550                   errno ? rrd_strerror(errno) : "");
3551           return 6;
3552         }
3553       }
3554       break;
3555
3556       case 'a':
3557       {
3558         int temp = atoi(optarg);
3559         if (temp > 0)
3560           config_alloc_chunk = temp;
3561         else
3562         {
3563           fprintf(stderr, "Invalid allocation size: %s\n", optarg);
3564           return 10;
3565         }
3566       }
3567       break;
3568
3569       case 'h':
3570       case '?':
3571         printf ("RRDCacheD %s\n"
3572             "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3573             "\n"
3574             "Usage: rrdcached [options]\n"
3575             "\n"
3576             "Valid options are:\n"
3577             "  -l <address>  Socket address to listen to.\n"
3578             "                Default: "RRDCACHED_DEFAULT_ADDRESS"\n"
3579             "  -P <perms>    Sets the permissions to assign to all following "
3580                             "sockets\n"
3581             "  -w <seconds>  Interval in which to write data.\n"
3582             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
3583             "  -t <threads>  Number of write threads.\n"
3584             "  -f <seconds>  Interval in which to flush dead data.\n"
3585             "  -p <file>     Location of the PID-file.\n"
3586             "  -b <dir>      Base directory to change to.\n"
3587             "  -B            Restrict file access to paths within -b <dir>\n"
3588             "  -g            Do not fork and run in the foreground.\n"
3589             "  -j <dir>      Directory in which to create the journal files.\n"
3590             "  -F            Always flush all updates at shutdown\n"
3591             "  -s <id|name>  Group owner of all following UNIX sockets\n"
3592             "                (the socket will also have read/write permissions "
3593                             "for that group)\n"
3594             "  -m <mode>     File permissions (octal) of all following UNIX "
3595                             "sockets\n"
3596             "  -a <size>     Memory allocation chunk size. Default is 1.\n"
3597             "  -O            Do not allow CREATE commands to overwrite existing\n"
3598             "                files, even if asked to.\n"
3599             "\n"
3600             "For more information and a detailed description of all options "
3601             "please refer\n"
3602             "to the rrdcached(1) manual page.\n",
3603             VERSION);
3604         if (option == 'h')
3605           status = -1;
3606         else
3607           status = 1;
3608         break;
3609     } /* switch (option) */
3610   } /* while (getopt) */
3611
3612   /* advise the user when values are not sane */
3613   if (config_flush_interval < 2 * config_write_interval)
3614     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3615             " 2x write interval (-w) !\n");
3616   if (config_write_jitter > config_write_interval)
3617     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3618             " write interval (-w) !\n");
3619
3620   if (config_write_base_only && config_base_dir == NULL)
3621     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3622             "  Consult the rrdcached documentation\n");
3623
3624   if (journal_dir == NULL)
3625     config_flush_at_shutdown = 1;
3626
3627   return (status);
3628 } /* }}} int read_options */
3629
3630 int main (int argc, char **argv)
3631 {
3632   int status;
3633
3634   status = read_options (argc, argv);
3635   if (status != 0)
3636   {
3637     if (status < 0)
3638       status = 0;
3639     return (status);
3640   }
3641
3642   status = daemonize ();
3643   if (status != 0)
3644   {
3645     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3646     return (1);
3647   }
3648
3649   journal_init();
3650
3651   /* start the queue threads */
3652   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3653   if (queue_threads == NULL)
3654   {
3655     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3656     cleanup();
3657     return (1);
3658   }
3659   for (int i = 0; i < config_queue_threads; i++)
3660   {
3661     memset (&queue_threads[i], 0, sizeof (*queue_threads));
3662     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3663     if (status != 0)
3664     {
3665       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3666       cleanup();
3667       return (1);
3668     }
3669   }
3670
3671   /* start the flush thread */
3672   memset(&flush_thread, 0, sizeof(flush_thread));
3673   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3674   if (status != 0)
3675   {
3676     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3677     cleanup();
3678     return (1);
3679   }
3680
3681   listen_thread_main (NULL);
3682   cleanup ();
3683
3684   return (0);
3685 } /* int main */
3686
3687 /*
3688  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3689  */