The buffer length for command buffers should be controlled by a single
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008-2010 Florian octo Forster
4  * Copyright (C) 2008,2009 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
73
74 #include "rrd_tool.h"
75 #include "rrd_client.h"
76 #include "unused.h"
77
78 #include <stdlib.h>
79
80 #ifndef WIN32
81 #ifdef HAVE_STDINT_H
82 #  include <stdint.h>
83 #endif
84 #include <unistd.h>
85 #include <strings.h>
86 #include <inttypes.h>
87 #include <sys/socket.h>
88
89 #else
90
91 #endif
92 #include <stdio.h>
93 #include <string.h>
94
95 #include <sys/types.h>
96 #include <sys/stat.h>
97 #include <dirent.h>
98 #include <fcntl.h>
99 #include <signal.h>
100 #include <sys/un.h>
101 #include <netdb.h>
102 #include <poll.h>
103 #include <syslog.h>
104 #include <pthread.h>
105 #include <errno.h>
106 #include <assert.h>
107 #include <sys/time.h>
108 #include <time.h>
109 #include <libgen.h>
110 #include <grp.h>
111
112 #ifdef HAVE_LIBWRAP
113 #include <tcpd.h>
114 #endif /* HAVE_LIBWRAP */
115
116 #include <glib-2.0/glib.h>
117 /* }}} */
118
119 #define RRDD_LOG(severity, ...) \
120   do { \
121     if (stay_foreground) { \
122       fprintf(stderr, __VA_ARGS__); \
123       fprintf(stderr, "\n"); } \
124     syslog ((severity), __VA_ARGS__); \
125   } while (0)
126
127 /*
128  * Types
129  */
130 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
131
132 struct listen_socket_s
133 {
134   int fd;
135   char addr[PATH_MAX + 1];
136   int family;
137
138   /* state for BATCH processing */
139   time_t batch_start;
140   int batch_cmd;
141
142   /* buffered IO */
143   char *rbuf;
144   off_t next_cmd;
145   off_t next_read;
146
147   char *wbuf;
148   ssize_t wbuf_len;
149
150   uint32_t permissions;
151
152   gid_t  socket_group;
153   mode_t socket_permissions;
154 };
155 typedef struct listen_socket_s listen_socket_t;
156
157 struct command_s;
158 typedef struct command_s command_t;
159 /* note: guard against "unused" warnings in the handlers */
160 #define DISPATCH_PROTO  listen_socket_t UNUSED(*sock),\
161                         time_t UNUSED(now),\
162                         char  UNUSED(*buffer),\
163                         size_t UNUSED(buffer_size)
164
165 #define HANDLER_PROTO   command_t UNUSED(*cmd),\
166                         DISPATCH_PROTO
167
168 struct command_s {
169   char   *cmd;
170   int (*handler)(HANDLER_PROTO);
171
172   char  context;                /* where we expect to see it */
173 #define CMD_CONTEXT_CLIENT      (1<<0)
174 #define CMD_CONTEXT_BATCH       (1<<1)
175 #define CMD_CONTEXT_JOURNAL     (1<<2)
176 #define CMD_CONTEXT_ANY         (0x7f)
177
178   char *syntax;
179   char *help;
180 };
181
182 struct cache_item_s;
183 typedef struct cache_item_s cache_item_t;
184 struct cache_item_s
185 {
186   char *file;
187   char **values;
188   size_t values_num;            /* number of valid pointers */
189   size_t values_alloc;          /* number of allocated pointers */
190   time_t last_flush_time;
191   time_t last_update_stamp;
192 #define CI_FLAGS_IN_TREE  (1<<0)
193 #define CI_FLAGS_IN_QUEUE (1<<1)
194   int flags;
195   pthread_cond_t  flushed;
196   cache_item_t *prev;
197   cache_item_t *next;
198 };
199
200 struct callback_flush_data_s
201 {
202   time_t now;
203   time_t abs_timeout;
204   char **keys;
205   size_t keys_num;
206 };
207 typedef struct callback_flush_data_s callback_flush_data_t;
208
209 enum queue_side_e
210 {
211   HEAD,
212   TAIL
213 };
214 typedef enum queue_side_e queue_side_t;
215
216 /* describe a set of journal files */
217 typedef struct {
218   char **files;
219   size_t files_num;
220 } journal_set;
221
222 #define RBUF_SIZE (RRD_CMD_MAX*2)
223
224 /*
225  * Variables
226  */
227 static int stay_foreground = 0;
228 static uid_t daemon_uid;
229
230 static listen_socket_t *listen_fds = NULL;
231 static size_t listen_fds_num = 0;
232
233 static listen_socket_t default_socket;
234
235 enum {
236   RUNNING,              /* normal operation */
237   FLUSHING,             /* flushing remaining values */
238   SHUTDOWN              /* shutting down */
239 } state = RUNNING;
240
241 static pthread_t *queue_threads;
242 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
243 static int config_queue_threads = 4;
244
245 static pthread_t flush_thread;
246 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
247
248 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
249 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
250 static int connection_threads_num = 0;
251
252 /* Cache stuff */
253 static GTree          *cache_tree = NULL;
254 static cache_item_t   *cache_queue_head = NULL;
255 static cache_item_t   *cache_queue_tail = NULL;
256 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
257
258 static int config_write_interval = 300;
259 static int config_write_jitter   = 0;
260 static int config_flush_interval = 3600;
261 static int config_flush_at_shutdown = 0;
262 static char *config_pid_file = NULL;
263 static char *config_base_dir = NULL;
264 static size_t _config_base_dir_len = 0;
265 static int config_write_base_only = 0;
266 static size_t config_alloc_chunk = 1;
267
268 static listen_socket_t **config_listen_address_list = NULL;
269 static size_t config_listen_address_list_len = 0;
270
271 static uint64_t stats_queue_length = 0;
272 static uint64_t stats_updates_received = 0;
273 static uint64_t stats_flush_received = 0;
274 static uint64_t stats_updates_written = 0;
275 static uint64_t stats_data_sets_written = 0;
276 static uint64_t stats_journal_bytes = 0;
277 static uint64_t stats_journal_rotate = 0;
278 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
279
280 static int opt_no_overwrite = 0; /* default for the daemon */
281
282 /* Journaled updates */
283 #define JOURNAL_REPLAY(s) ((s) == NULL)
284 #define JOURNAL_BASE "rrd.journal"
285 static journal_set *journal_cur = NULL;
286 static journal_set *journal_old = NULL;
287 static char *journal_dir = NULL;
288 static FILE *journal_fh = NULL;         /* current journal file handle */
289 static long  journal_size = 0;          /* current journal size */
290 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
291 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
292 static int journal_write(char *cmd, char *args);
293 static void journal_done(void);
294 static void journal_rotate(void);
295
296 /* prototypes for forward refernces */
297 static int handle_request_help (HANDLER_PROTO);
298
299 /* 
300  * Functions
301  */
302 static void sig_common (const char *sig) /* {{{ */
303 {
304   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
305   state = FLUSHING;
306   pthread_cond_broadcast(&flush_cond);
307   pthread_cond_broadcast(&queue_cond);
308 } /* }}} void sig_common */
309
310 static void sig_int_handler (int UNUSED(s)) /* {{{ */
311 {
312   sig_common("INT");
313 } /* }}} void sig_int_handler */
314
315 static void sig_term_handler (int UNUSED(s)) /* {{{ */
316 {
317   sig_common("TERM");
318 } /* }}} void sig_term_handler */
319
320 static void sig_usr1_handler (int UNUSED(s)) /* {{{ */
321 {
322   config_flush_at_shutdown = 1;
323   sig_common("USR1");
324 } /* }}} void sig_usr1_handler */
325
326 static void sig_usr2_handler (int UNUSED(s)) /* {{{ */
327 {
328   config_flush_at_shutdown = 0;
329   sig_common("USR2");
330 } /* }}} void sig_usr2_handler */
331
332 static void install_signal_handlers(void) /* {{{ */
333 {
334   /* These structures are static, because `sigaction' behaves weird if the are
335    * overwritten.. */
336   static struct sigaction sa_int;
337   static struct sigaction sa_term;
338   static struct sigaction sa_pipe;
339   static struct sigaction sa_usr1;
340   static struct sigaction sa_usr2;
341
342   /* Install signal handlers */
343   memset (&sa_int, 0, sizeof (sa_int));
344   sa_int.sa_handler = sig_int_handler;
345   sigaction (SIGINT, &sa_int, NULL);
346
347   memset (&sa_term, 0, sizeof (sa_term));
348   sa_term.sa_handler = sig_term_handler;
349   sigaction (SIGTERM, &sa_term, NULL);
350
351   memset (&sa_pipe, 0, sizeof (sa_pipe));
352   sa_pipe.sa_handler = SIG_IGN;
353   sigaction (SIGPIPE, &sa_pipe, NULL);
354
355   memset (&sa_pipe, 0, sizeof (sa_usr1));
356   sa_usr1.sa_handler = sig_usr1_handler;
357   sigaction (SIGUSR1, &sa_usr1, NULL);
358
359   memset (&sa_usr2, 0, sizeof (sa_usr2));
360   sa_usr2.sa_handler = sig_usr2_handler;
361   sigaction (SIGUSR2, &sa_usr2, NULL);
362
363 } /* }}} void install_signal_handlers */
364
365 static int open_pidfile(char *action, int oflag) /* {{{ */
366 {
367   int fd;
368   const char *file;
369   char *file_copy, *dir;
370
371   file = (config_pid_file != NULL)
372     ? config_pid_file
373     : LOCALSTATEDIR "/run/rrdcached.pid";
374
375   /* dirname may modify its argument */
376   file_copy = strdup(file);
377   if (file_copy == NULL)
378   {
379     fprintf(stderr, "rrdcached: strdup(): %s\n",
380         rrd_strerror(errno));
381     return -1;
382   }
383
384   dir = dirname(file_copy);
385   if (rrd_mkdir_p(dir, 0777) != 0)
386   {
387     fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
388         dir, rrd_strerror(errno));
389     return -1;
390   }
391
392   free(file_copy);
393
394   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
395   if (fd < 0)
396     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
397             action, file, rrd_strerror(errno));
398
399   return(fd);
400 } /* }}} static int open_pidfile */
401
402 /* check existing pid file to see whether a daemon is running */
403 static int check_pidfile(void)
404 {
405   int pid_fd;
406   pid_t pid;
407   char pid_str[16];
408
409   pid_fd = open_pidfile("open", O_RDWR);
410   if (pid_fd < 0)
411     return pid_fd;
412
413   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
414     return -1;
415
416   pid = atoi(pid_str);
417   if (pid <= 0)
418     return -1;
419
420   /* another running process that we can signal COULD be
421    * a competing rrdcached */
422   if (pid != getpid() && kill(pid, 0) == 0)
423   {
424     fprintf(stderr,
425             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
426     close(pid_fd);
427     return -1;
428   }
429
430   lseek(pid_fd, 0, SEEK_SET);
431   if (ftruncate(pid_fd, 0) == -1)
432   {
433     fprintf(stderr,
434             "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
435     close(pid_fd);
436     return -1;
437   }
438
439   fprintf(stderr,
440           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
441           "rrdcached: starting normally.\n", pid);
442
443   return pid_fd;
444 } /* }}} static int check_pidfile */
445
446 static int write_pidfile (int fd) /* {{{ */
447 {
448   pid_t pid;
449   FILE *fh;
450
451   pid = getpid ();
452
453   fh = fdopen (fd, "w");
454   if (fh == NULL)
455   {
456     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
457     close(fd);
458     return (-1);
459   }
460
461   fprintf (fh, "%i\n", (int) pid);
462   fclose (fh);
463
464   return (0);
465 } /* }}} int write_pidfile */
466
467 static int remove_pidfile (void) /* {{{ */
468 {
469   char *file;
470   int status;
471
472   file = (config_pid_file != NULL)
473     ? config_pid_file
474     : LOCALSTATEDIR "/run/rrdcached.pid";
475
476   status = unlink (file);
477   if (status == 0)
478     return (0);
479   return (errno);
480 } /* }}} int remove_pidfile */
481
482 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
483 {
484   char *eol;
485
486   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
487                sock->next_read - sock->next_cmd);
488
489   if (eol == NULL)
490   {
491     /* no commands left, move remainder back to front of rbuf */
492     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
493             sock->next_read - sock->next_cmd);
494     sock->next_read -= sock->next_cmd;
495     sock->next_cmd = 0;
496     *len = 0;
497     return NULL;
498   }
499   else
500   {
501     char *cmd = sock->rbuf + sock->next_cmd;
502     *eol = '\0';
503
504     sock->next_cmd = eol - sock->rbuf + 1;
505
506     if (eol > sock->rbuf && *(eol-1) == '\r')
507       *(--eol) = '\0'; /* handle "\r\n" EOL */
508
509     *len = eol - cmd;
510
511     return cmd;
512   }
513
514   /* NOTREACHED */
515   assert(1==0);
516 } /* }}} char *next_cmd */
517
518 /* add the characters directly to the write buffer */
519 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
520 {
521   char *new_buf;
522
523   assert(sock != NULL);
524
525   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
526   if (new_buf == NULL)
527   {
528     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
529     return -1;
530   }
531
532   strncpy(new_buf + sock->wbuf_len, str, len + 1);
533
534   sock->wbuf = new_buf;
535   sock->wbuf_len += len;
536
537   return 0;
538 } /* }}} static int add_to_wbuf */
539
540 /* add the text to the "extra" info that's sent after the status line */
541 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
542 {
543   va_list argp;
544   char buffer[RRD_CMD_MAX];
545   int len;
546
547   if (JOURNAL_REPLAY(sock)) return 0;
548   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
549
550   va_start(argp, fmt);
551 #ifdef HAVE_VSNPRINTF
552   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
553 #else
554   len = vsprintf(buffer, fmt, argp);
555 #endif
556   va_end(argp);
557   if (len < 0)
558   {
559     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
560     return -1;
561   }
562
563   return add_to_wbuf(sock, buffer, len);
564 } /* }}} static int add_response_info */
565
566 static int count_lines(char *str) /* {{{ */
567 {
568   int lines = 0;
569
570   if (str != NULL)
571   {
572     while ((str = strchr(str, '\n')) != NULL)
573     {
574       ++lines;
575       ++str;
576     }
577   }
578
579   return lines;
580 } /* }}} static int count_lines */
581
582 /* send the response back to the user.
583  * returns 0 on success, -1 on error
584  * write buffer is always zeroed after this call */
585 static int send_response (listen_socket_t *sock, response_code rc,
586                           char *fmt, ...) /* {{{ */
587 {
588   va_list argp;
589   char buffer[RRD_CMD_MAX];
590   int lines;
591   ssize_t wrote;
592   int rclen, len;
593
594   if (JOURNAL_REPLAY(sock)) return rc;
595
596   if (sock->batch_start)
597   {
598     if (rc == RESP_OK)
599       return rc; /* no response on success during BATCH */
600     lines = sock->batch_cmd;
601   }
602   else if (rc == RESP_OK)
603     lines = count_lines(sock->wbuf);
604   else
605     lines = -1;
606
607   rclen = sprintf(buffer, "%d ", lines);
608   va_start(argp, fmt);
609 #ifdef HAVE_VSNPRINTF
610   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
611 #else
612   len = vsprintf(buffer+rclen, fmt, argp);
613 #endif
614   va_end(argp);
615   if (len < 0)
616     return -1;
617
618   len += rclen;
619
620   /* append the result to the wbuf, don't write to the user */
621   if (sock->batch_start)
622     return add_to_wbuf(sock, buffer, len);
623
624   /* first write must be complete */
625   if (len != write(sock->fd, buffer, len))
626   {
627     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
628     return -1;
629   }
630
631   if (sock->wbuf != NULL && rc == RESP_OK)
632   {
633     wrote = 0;
634     while (wrote < sock->wbuf_len)
635     {
636       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
637       if (wb <= 0)
638       {
639         RRDD_LOG(LOG_INFO, "send_response: could not write results");
640         return -1;
641       }
642       wrote += wb;
643     }
644   }
645
646   free(sock->wbuf); sock->wbuf = NULL;
647   sock->wbuf_len = 0;
648
649   return 0;
650 } /* }}} */
651
652 static void wipe_ci_values(cache_item_t *ci, time_t when)
653 {
654   ci->values = NULL;
655   ci->values_num = 0;
656   ci->values_alloc = 0;
657
658   ci->last_flush_time = when;
659   if (config_write_jitter > 0)
660     ci->last_flush_time += (rrd_random() % config_write_jitter);
661 }
662
663 /* remove_from_queue
664  * remove a "cache_item_t" item from the queue.
665  * must hold 'cache_lock' when calling this
666  */
667 static void remove_from_queue(cache_item_t *ci) /* {{{ */
668 {
669   if (ci == NULL) return;
670   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
671
672   if (ci->prev == NULL)
673     cache_queue_head = ci->next; /* reset head */
674   else
675     ci->prev->next = ci->next;
676
677   if (ci->next == NULL)
678     cache_queue_tail = ci->prev; /* reset the tail */
679   else
680     ci->next->prev = ci->prev;
681
682   ci->next = ci->prev = NULL;
683   ci->flags &= ~CI_FLAGS_IN_QUEUE;
684
685   pthread_mutex_lock (&stats_lock);
686   assert (stats_queue_length > 0);
687   stats_queue_length--;
688   pthread_mutex_unlock (&stats_lock);
689
690 } /* }}} static void remove_from_queue */
691
692 /* free the resources associated with the cache_item_t
693  * must hold cache_lock when calling this function
694  */
695 static void *free_cache_item(cache_item_t *ci) /* {{{ */
696 {
697   if (ci == NULL) return NULL;
698
699   remove_from_queue(ci);
700
701   for (size_t i=0; i < ci->values_num; i++)
702     free(ci->values[i]);
703
704   free (ci->values);
705   free (ci->file);
706
707   /* in case anyone is waiting */
708   pthread_cond_broadcast(&ci->flushed);
709   pthread_cond_destroy(&ci->flushed);
710
711   free (ci);
712
713   return NULL;
714 } /* }}} static void *free_cache_item */
715
716 /*
717  * enqueue_cache_item:
718  * `cache_lock' must be acquired before calling this function!
719  */
720 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
721     queue_side_t side)
722 {
723   if (ci == NULL)
724     return (-1);
725
726   if (ci->values_num == 0)
727     return (0);
728
729   if (side == HEAD)
730   {
731     if (cache_queue_head == ci)
732       return 0;
733
734     /* remove if further down in queue */
735     remove_from_queue(ci);
736
737     ci->prev = NULL;
738     ci->next = cache_queue_head;
739     if (ci->next != NULL)
740       ci->next->prev = ci;
741     cache_queue_head = ci;
742
743     if (cache_queue_tail == NULL)
744       cache_queue_tail = cache_queue_head;
745   }
746   else /* (side == TAIL) */
747   {
748     /* We don't move values back in the list.. */
749     if (ci->flags & CI_FLAGS_IN_QUEUE)
750       return (0);
751
752     assert (ci->next == NULL);
753     assert (ci->prev == NULL);
754
755     ci->prev = cache_queue_tail;
756
757     if (cache_queue_tail == NULL)
758       cache_queue_head = ci;
759     else
760       cache_queue_tail->next = ci;
761
762     cache_queue_tail = ci;
763   }
764
765   ci->flags |= CI_FLAGS_IN_QUEUE;
766
767   pthread_cond_signal(&queue_cond);
768   pthread_mutex_lock (&stats_lock);
769   stats_queue_length++;
770   pthread_mutex_unlock (&stats_lock);
771
772   return (0);
773 } /* }}} int enqueue_cache_item */
774
775 /*
776  * tree_callback_flush:
777  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
778  * while this is in progress.
779  */
780 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
781     gpointer data)
782 {
783   cache_item_t *ci;
784   callback_flush_data_t *cfd;
785
786   ci = (cache_item_t *) value;
787   cfd = (callback_flush_data_t *) data;
788
789   if (ci->flags & CI_FLAGS_IN_QUEUE)
790     return FALSE;
791
792   if (ci->values_num > 0
793       && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
794   {
795     enqueue_cache_item (ci, TAIL);
796   }
797   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
798       && (ci->values_num <= 0))
799   {
800     assert ((char *) key == ci->file);
801     if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
802     {
803       RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
804       return (FALSE);
805     }
806   }
807
808   return (FALSE);
809 } /* }}} gboolean tree_callback_flush */
810
811 static int flush_old_values (int max_age)
812 {
813   callback_flush_data_t cfd;
814   size_t k;
815
816   memset (&cfd, 0, sizeof (cfd));
817   /* Pass the current time as user data so that we don't need to call
818    * `time' for each node. */
819   cfd.now = time (NULL);
820   cfd.keys = NULL;
821   cfd.keys_num = 0;
822
823   if (max_age > 0)
824     cfd.abs_timeout = cfd.now - max_age;
825   else
826     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
827
828   /* `tree_callback_flush' will return the keys of all values that haven't
829    * been touched in the last `config_flush_interval' seconds in `cfd'.
830    * The char*'s in this array point to the same memory as ci->file, so we
831    * don't need to free them separately. */
832   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
833
834   for (k = 0; k < cfd.keys_num; k++)
835   {
836     gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
837     /* should never fail, since we have held the cache_lock
838      * the entire time */
839     assert(status == TRUE);
840   }
841
842   if (cfd.keys != NULL)
843   {
844     free (cfd.keys);
845     cfd.keys = NULL;
846   }
847
848   return (0);
849 } /* int flush_old_values */
850
851 static void *flush_thread_main (void UNUSED(*args)) /* {{{ */
852 {
853   struct timeval now;
854   struct timespec next_flush;
855   int status;
856
857   gettimeofday (&now, NULL);
858   next_flush.tv_sec = now.tv_sec + config_flush_interval;
859   next_flush.tv_nsec = 1000 * now.tv_usec;
860
861   pthread_mutex_lock(&cache_lock);
862
863   while (state == RUNNING)
864   {
865     gettimeofday (&now, NULL);
866     if ((now.tv_sec > next_flush.tv_sec)
867         || ((now.tv_sec == next_flush.tv_sec)
868           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
869     {
870       RRDD_LOG(LOG_DEBUG, "flushing old values");
871
872       /* Determine the time of the next cache flush. */
873       next_flush.tv_sec = now.tv_sec + config_flush_interval;
874
875       /* Flush all values that haven't been written in the last
876        * `config_write_interval' seconds. */
877       flush_old_values (config_write_interval);
878
879       /* unlock the cache while we rotate so we don't block incoming
880        * updates if the fsync() blocks on disk I/O */
881       pthread_mutex_unlock(&cache_lock);
882       journal_rotate();
883       pthread_mutex_lock(&cache_lock);
884     }
885
886     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
887     if (status != 0 && status != ETIMEDOUT)
888     {
889       RRDD_LOG (LOG_ERR, "flush_thread_main: "
890                 "pthread_cond_timedwait returned %i.", status);
891     }
892   }
893
894   if (config_flush_at_shutdown)
895     flush_old_values (-1); /* flush everything */
896
897   state = SHUTDOWN;
898
899   pthread_mutex_unlock(&cache_lock);
900
901   return NULL;
902 } /* void *flush_thread_main */
903
904 static void *queue_thread_main (void UNUSED(*args)) /* {{{ */
905 {
906   pthread_mutex_lock (&cache_lock);
907
908   while (state != SHUTDOWN
909          || (cache_queue_head != NULL && config_flush_at_shutdown))
910   {
911     cache_item_t *ci;
912     char *file;
913     char **values;
914     size_t values_num;
915     int status;
916
917     /* Now, check if there's something to store away. If not, wait until
918      * something comes in. */
919     if (cache_queue_head == NULL)
920     {
921       status = pthread_cond_wait (&queue_cond, &cache_lock);
922       if ((status != 0) && (status != ETIMEDOUT))
923       {
924         RRDD_LOG (LOG_ERR, "queue_thread_main: "
925             "pthread_cond_wait returned %i.", status);
926       }
927     }
928
929     /* Check if a value has arrived. This may be NULL if we timed out or there
930      * was an interrupt such as a signal. */
931     if (cache_queue_head == NULL)
932       continue;
933
934     ci = cache_queue_head;
935
936     /* copy the relevant parts */
937     file = strdup (ci->file);
938     if (file == NULL)
939     {
940       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
941       continue;
942     }
943
944     assert(ci->values != NULL);
945     assert(ci->values_num > 0);
946
947     values = ci->values;
948     values_num = ci->values_num;
949
950     wipe_ci_values(ci, time(NULL));
951     remove_from_queue(ci);
952
953     pthread_mutex_unlock (&cache_lock);
954
955     rrd_clear_error ();
956     status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
957     if (status != 0)
958     {
959       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
960           "rrd_update_r (%s) failed with status %i. (%s)",
961           file, status, rrd_get_error());
962     }
963
964     journal_write("wrote", file);
965
966     /* Search again in the tree.  It's possible someone issued a "FORGET"
967      * while we were writing the update values. */
968     pthread_mutex_lock(&cache_lock);
969     ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
970     if (ci)
971       pthread_cond_broadcast(&ci->flushed);
972     pthread_mutex_unlock(&cache_lock);
973
974     if (status == 0)
975     {
976       pthread_mutex_lock (&stats_lock);
977       stats_updates_written++;
978       stats_data_sets_written += values_num;
979       pthread_mutex_unlock (&stats_lock);
980     }
981
982     rrd_free_ptrs((void ***) &values, &values_num);
983     free(file);
984
985     pthread_mutex_lock (&cache_lock);
986   }
987   pthread_mutex_unlock (&cache_lock);
988
989   return (NULL);
990 } /* }}} void *queue_thread_main */
991
992 static int buffer_get_field (char **buffer_ret, /* {{{ */
993     size_t *buffer_size_ret, char **field_ret)
994 {
995   char *buffer;
996   size_t buffer_pos;
997   size_t buffer_size;
998   char *field;
999   size_t field_size;
1000   int status;
1001
1002   buffer = *buffer_ret;
1003   buffer_pos = 0;
1004   buffer_size = *buffer_size_ret;
1005   field = *buffer_ret;
1006   field_size = 0;
1007
1008   if (buffer_size <= 0)
1009     return (-1);
1010
1011   /* This is ensured by `handle_request'. */
1012   assert (buffer[buffer_size - 1] == '\0');
1013
1014   status = -1;
1015   while (buffer_pos < buffer_size)
1016   {
1017     /* Check for end-of-field or end-of-buffer */
1018     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1019     {
1020       field[field_size] = 0;
1021       field_size++;
1022       buffer_pos++;
1023       status = 0;
1024       break;
1025     }
1026     /* Handle escaped characters. */
1027     else if (buffer[buffer_pos] == '\\')
1028     {
1029       if (buffer_pos >= (buffer_size - 1))
1030         break;
1031       buffer_pos++;
1032       field[field_size] = buffer[buffer_pos];
1033       field_size++;
1034       buffer_pos++;
1035     }
1036     /* Normal operation */ 
1037     else
1038     {
1039       field[field_size] = buffer[buffer_pos];
1040       field_size++;
1041       buffer_pos++;
1042     }
1043   } /* while (buffer_pos < buffer_size) */
1044
1045   if (status != 0)
1046     return (status);
1047
1048   *buffer_ret = buffer + buffer_pos;
1049   *buffer_size_ret = buffer_size - buffer_pos;
1050   *field_ret = field;
1051
1052   return (0);
1053 } /* }}} int buffer_get_field */
1054
1055 /* if we're restricting writes to the base directory,
1056  * check whether the file falls within the dir
1057  * returns 1 if OK, otherwise 0
1058  */
1059 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1060 {
1061   assert(file != NULL);
1062
1063   if (!config_write_base_only
1064       || JOURNAL_REPLAY(sock)
1065       || config_base_dir == NULL)
1066     return 1;
1067
1068   if (strstr(file, "../") != NULL) goto err;
1069
1070   /* relative paths without "../" are ok */
1071   if (*file != '/') return 1;
1072
1073   /* file must be of the format base + "/" + <1+ char filename> */
1074   if (strlen(file) < _config_base_dir_len + 2) goto err;
1075   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1076   if (*(file + _config_base_dir_len) != '/') goto err;
1077
1078   return 1;
1079
1080 err:
1081   if (sock != NULL && sock->fd >= 0)
1082     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1083
1084   return 0;
1085 } /* }}} static int check_file_access */
1086
1087 /* when using a base dir, convert relative paths to absolute paths.
1088  * if necessary, modifies the "filename" pointer to point
1089  * to the new path created in "tmp".  "tmp" is provided
1090  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1091  *
1092  * this allows us to optimize for the expected case (absolute path)
1093  * with a no-op.
1094  */
1095 static void get_abs_path(char **filename, char *tmp)
1096 {
1097   assert(tmp != NULL);
1098   assert(filename != NULL && *filename != NULL);
1099
1100   if (config_base_dir == NULL || **filename == '/')
1101     return;
1102
1103   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1104   *filename = tmp;
1105 } /* }}} static int get_abs_path */
1106
1107 static int flush_file (const char *filename) /* {{{ */
1108 {
1109   cache_item_t *ci;
1110
1111   pthread_mutex_lock (&cache_lock);
1112
1113   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1114   if (ci == NULL)
1115   {
1116     pthread_mutex_unlock (&cache_lock);
1117     return (ENOENT);
1118   }
1119
1120   if (ci->values_num > 0)
1121   {
1122     /* Enqueue at head */
1123     enqueue_cache_item (ci, HEAD);
1124     pthread_cond_wait(&ci->flushed, &cache_lock);
1125   }
1126
1127   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1128    * may have been purged during our cond_wait() */
1129
1130   pthread_mutex_unlock(&cache_lock);
1131
1132   return (0);
1133 } /* }}} int flush_file */
1134
1135 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1136 {
1137   char *err = "Syntax error.\n";
1138
1139   if (cmd && cmd->syntax)
1140     err = cmd->syntax;
1141
1142   return send_response(sock, RESP_ERR, "Usage: %s", err);
1143 } /* }}} static int syntax_error() */
1144
1145 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1146 {
1147   uint64_t copy_queue_length;
1148   uint64_t copy_updates_received;
1149   uint64_t copy_flush_received;
1150   uint64_t copy_updates_written;
1151   uint64_t copy_data_sets_written;
1152   uint64_t copy_journal_bytes;
1153   uint64_t copy_journal_rotate;
1154
1155   uint64_t tree_nodes_number;
1156   uint64_t tree_depth;
1157
1158   pthread_mutex_lock (&stats_lock);
1159   copy_queue_length       = stats_queue_length;
1160   copy_updates_received   = stats_updates_received;
1161   copy_flush_received     = stats_flush_received;
1162   copy_updates_written    = stats_updates_written;
1163   copy_data_sets_written  = stats_data_sets_written;
1164   copy_journal_bytes      = stats_journal_bytes;
1165   copy_journal_rotate     = stats_journal_rotate;
1166   pthread_mutex_unlock (&stats_lock);
1167
1168   pthread_mutex_lock (&cache_lock);
1169   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1170   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1171   pthread_mutex_unlock (&cache_lock);
1172
1173   add_response_info(sock,
1174                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1175   add_response_info(sock,
1176                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1177   add_response_info(sock,
1178                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1179   add_response_info(sock,
1180                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1181   add_response_info(sock,
1182                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1183   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1184   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1185   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1186   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1187
1188   send_response(sock, RESP_OK, "Statistics follow\n");
1189
1190   return (0);
1191 } /* }}} int handle_request_stats */
1192
1193 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1194 {
1195   char *file, file_tmp[PATH_MAX];
1196   int status;
1197
1198   status = buffer_get_field (&buffer, &buffer_size, &file);
1199   if (status != 0)
1200   {
1201     return syntax_error(sock,cmd);
1202   }
1203   else
1204   {
1205     pthread_mutex_lock(&stats_lock);
1206     stats_flush_received++;
1207     pthread_mutex_unlock(&stats_lock);
1208
1209     get_abs_path(&file, file_tmp);
1210     if (!check_file_access(file, sock)) return 0;
1211
1212     status = flush_file (file);
1213     if (status == 0)
1214       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1215     else if (status == ENOENT)
1216     {
1217       /* no file in our tree; see whether it exists at all */
1218       struct stat statbuf;
1219
1220       memset(&statbuf, 0, sizeof(statbuf));
1221       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1222         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1223       else
1224         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1225     }
1226     else if (status < 0)
1227       return send_response(sock, RESP_ERR, "Internal error.\n");
1228     else
1229       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1230   }
1231
1232   /* NOTREACHED */
1233   assert(1==0);
1234 } /* }}} int handle_request_flush */
1235
1236 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1237 {
1238   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1239
1240   pthread_mutex_lock(&cache_lock);
1241   flush_old_values(-1);
1242   pthread_mutex_unlock(&cache_lock);
1243
1244   return send_response(sock, RESP_OK, "Started flush.\n");
1245 } /* }}} static int handle_request_flushall */
1246
1247 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1248 {
1249   int status;
1250   char *file, file_tmp[PATH_MAX];
1251   cache_item_t *ci;
1252
1253   status = buffer_get_field(&buffer, &buffer_size, &file);
1254   if (status != 0)
1255     return syntax_error(sock,cmd);
1256
1257   get_abs_path(&file, file_tmp);
1258
1259   pthread_mutex_lock(&cache_lock);
1260   ci = g_tree_lookup(cache_tree, file);
1261   if (ci == NULL)
1262   {
1263     pthread_mutex_unlock(&cache_lock);
1264     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1265   }
1266
1267   for (size_t i=0; i < ci->values_num; i++)
1268     add_response_info(sock, "%s\n", ci->values[i]);
1269
1270   pthread_mutex_unlock(&cache_lock);
1271   return send_response(sock, RESP_OK, "updates pending\n");
1272 } /* }}} static int handle_request_pending */
1273
1274 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1275 {
1276   int status;
1277   gboolean found;
1278   char *file, file_tmp[PATH_MAX];
1279
1280   status = buffer_get_field(&buffer, &buffer_size, &file);
1281   if (status != 0)
1282     return syntax_error(sock,cmd);
1283
1284   get_abs_path(&file, file_tmp);
1285   if (!check_file_access(file, sock)) return 0;
1286
1287   pthread_mutex_lock(&cache_lock);
1288   found = g_tree_remove(cache_tree, file);
1289   pthread_mutex_unlock(&cache_lock);
1290
1291   if (found == TRUE)
1292   {
1293     if (!JOURNAL_REPLAY(sock))
1294       journal_write("forget", file);
1295
1296     return send_response(sock, RESP_OK, "Gone!\n");
1297   }
1298   else
1299     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1300
1301   /* NOTREACHED */
1302   assert(1==0);
1303 } /* }}} static int handle_request_forget */
1304
1305 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1306 {
1307   cache_item_t *ci;
1308
1309   pthread_mutex_lock(&cache_lock);
1310
1311   ci = cache_queue_head;
1312   while (ci != NULL)
1313   {
1314     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1315     ci = ci->next;
1316   }
1317
1318   pthread_mutex_unlock(&cache_lock);
1319
1320   return send_response(sock, RESP_OK, "in queue.\n");
1321 } /* }}} int handle_request_queue */
1322
1323 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1324 {
1325   char *file, file_tmp[PATH_MAX];
1326   int values_num = 0;
1327   int status;
1328   char orig_buf[RRD_CMD_MAX];
1329
1330   cache_item_t *ci;
1331
1332   /* save it for the journal later */
1333   if (!JOURNAL_REPLAY(sock))
1334     strncpy(orig_buf, buffer, buffer_size);
1335
1336   status = buffer_get_field (&buffer, &buffer_size, &file);
1337   if (status != 0)
1338     return syntax_error(sock,cmd);
1339
1340   pthread_mutex_lock(&stats_lock);
1341   stats_updates_received++;
1342   pthread_mutex_unlock(&stats_lock);
1343
1344   get_abs_path(&file, file_tmp);
1345   if (!check_file_access(file, sock)) return 0;
1346
1347   pthread_mutex_lock (&cache_lock);
1348   ci = g_tree_lookup (cache_tree, file);
1349
1350   if (ci == NULL) /* {{{ */
1351   {
1352     struct stat statbuf;
1353     cache_item_t *tmp;
1354
1355     /* don't hold the lock while we setup; stat(2) might block */
1356     pthread_mutex_unlock(&cache_lock);
1357
1358     memset (&statbuf, 0, sizeof (statbuf));
1359     status = stat (file, &statbuf);
1360     if (status != 0)
1361     {
1362       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1363
1364       status = errno;
1365       if (status == ENOENT)
1366         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1367       else
1368         return send_response(sock, RESP_ERR,
1369                              "stat failed with error %i.\n", status);
1370     }
1371     if (!S_ISREG (statbuf.st_mode))
1372       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1373
1374     if (access(file, R_OK|W_OK) != 0)
1375       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1376                            file, rrd_strerror(errno));
1377
1378     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1379     if (ci == NULL)
1380     {
1381       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1382
1383       return send_response(sock, RESP_ERR, "malloc failed.\n");
1384     }
1385     memset (ci, 0, sizeof (cache_item_t));
1386
1387     ci->file = strdup (file);
1388     if (ci->file == NULL)
1389     {
1390       free (ci);
1391       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1392
1393       return send_response(sock, RESP_ERR, "strdup failed.\n");
1394     }
1395
1396     wipe_ci_values(ci, now);
1397     ci->flags = CI_FLAGS_IN_TREE;
1398     pthread_cond_init(&ci->flushed, NULL);
1399
1400     pthread_mutex_lock(&cache_lock);
1401
1402     /* another UPDATE might have added this entry in the meantime */
1403     tmp = g_tree_lookup (cache_tree, file);
1404     if (tmp == NULL)
1405       g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1406     else
1407     {
1408       free_cache_item (ci);
1409       ci = tmp;
1410     }
1411
1412     /* state may have changed while we were unlocked */
1413     if (state == SHUTDOWN)
1414       return -1;
1415   } /* }}} */
1416   assert (ci != NULL);
1417
1418   /* don't re-write updates in replay mode */
1419   if (!JOURNAL_REPLAY(sock))
1420     journal_write("update", orig_buf);
1421
1422   while (buffer_size > 0)
1423   {
1424     char *value;
1425     time_t stamp;
1426     char *eostamp;
1427
1428     status = buffer_get_field (&buffer, &buffer_size, &value);
1429     if (status != 0)
1430     {
1431       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1432       break;
1433     }
1434
1435     /* make sure update time is always moving forward */
1436     stamp = strtol(value, &eostamp, 10);
1437     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1438     {
1439       pthread_mutex_unlock(&cache_lock);
1440       return send_response(sock, RESP_ERR,
1441                            "Cannot find timestamp in '%s'!\n", value);
1442     }
1443     else if (stamp <= ci->last_update_stamp)
1444     {
1445       pthread_mutex_unlock(&cache_lock);
1446       return send_response(sock, RESP_ERR,
1447                            "illegal attempt to update using time %ld when last"
1448                            " update time is %ld (minimum one second step)\n",
1449                            stamp, ci->last_update_stamp);
1450     }
1451     else
1452       ci->last_update_stamp = stamp;
1453
1454     if (!rrd_add_strdup_chunk(&ci->values, &ci->values_num, value,
1455                               &ci->values_alloc, config_alloc_chunk))
1456     {
1457       RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1458       continue;
1459     }
1460
1461     values_num++;
1462   }
1463
1464   if (((now - ci->last_flush_time) >= config_write_interval)
1465       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1466       && (ci->values_num > 0))
1467   {
1468     enqueue_cache_item (ci, TAIL);
1469   }
1470
1471   pthread_mutex_unlock (&cache_lock);
1472
1473   if (values_num < 1)
1474     return send_response(sock, RESP_ERR, "No values updated.\n");
1475   else
1476     return send_response(sock, RESP_OK,
1477                          "errors, enqueued %i value(s).\n", values_num);
1478
1479   /* NOTREACHED */
1480   assert(1==0);
1481
1482 } /* }}} int handle_request_update */
1483
1484 static int handle_request_fetch (HANDLER_PROTO) /* {{{ */
1485 {
1486   char *file, file_tmp[PATH_MAX];
1487   char *cf;
1488
1489   char *start_str;
1490   char *end_str;
1491   time_t start_tm;
1492   time_t end_tm;
1493
1494   unsigned long step;
1495   unsigned long ds_cnt;
1496   char **ds_namv;
1497   rrd_value_t *data;
1498
1499   int status;
1500   unsigned long i;
1501   time_t t;
1502   rrd_value_t *data_ptr;
1503
1504   file = NULL;
1505   cf = NULL;
1506   start_str = NULL;
1507   end_str = NULL;
1508
1509   /* Read the arguments */
1510   do /* while (0) */
1511   {
1512     status = buffer_get_field (&buffer, &buffer_size, &file);
1513     if (status != 0)
1514       break;
1515
1516     status = buffer_get_field (&buffer, &buffer_size, &cf);
1517     if (status != 0)
1518       break;
1519
1520     status = buffer_get_field (&buffer, &buffer_size, &start_str);
1521     if (status != 0)
1522     {
1523       start_str = NULL;
1524       status = 0;
1525       break;
1526     }
1527
1528     status = buffer_get_field (&buffer, &buffer_size, &end_str);
1529     if (status != 0)
1530     {
1531       end_str = NULL;
1532       status = 0;
1533       break;
1534     }
1535   } while (0);
1536
1537   if (status != 0)
1538     return (syntax_error(sock,cmd));
1539
1540   get_abs_path(&file, file_tmp);
1541   if (!check_file_access(file, sock)) return 0;
1542
1543   status = flush_file (file);
1544   if ((status != 0) && (status != ENOENT))
1545     return (send_response (sock, RESP_ERR,
1546           "flush_file (%s) failed with status %i.\n", file, status));
1547
1548   t = time (NULL); /* "now" */
1549
1550   /* Parse start time */
1551   if (start_str != NULL)
1552   {
1553     char *endptr;
1554     long value;
1555
1556     endptr = NULL;
1557     errno = 0;
1558     value = strtol (start_str, &endptr, /* base = */ 0);
1559     if ((endptr == start_str) || (errno != 0))
1560       return (send_response(sock, RESP_ERR,
1561             "Cannot parse start time `%s': Only simple integers are allowed.\n",
1562             start_str));
1563
1564     if (value > 0)
1565       start_tm = (time_t) value;
1566     else
1567       start_tm = (time_t) (t + value);
1568   }
1569   else
1570   {
1571     start_tm = t - 86400;
1572   }
1573
1574   /* Parse end time */
1575   if (end_str != NULL)
1576   {
1577     char *endptr;
1578     long value;
1579
1580     endptr = NULL;
1581     errno = 0;
1582     value = strtol (end_str, &endptr, /* base = */ 0);
1583     if ((endptr == end_str) || (errno != 0))
1584       return (send_response(sock, RESP_ERR,
1585             "Cannot parse end time `%s': Only simple integers are allowed.\n",
1586             end_str));
1587
1588     if (value > 0)
1589       end_tm = (time_t) value;
1590     else
1591       end_tm = (time_t) (t + value);
1592   }
1593   else
1594   {
1595     end_tm = t;
1596   }
1597
1598   step = -1;
1599   ds_cnt = 0;
1600   ds_namv = NULL;
1601   data = NULL;
1602
1603   status = rrd_fetch_r (file, cf, &start_tm, &end_tm, &step,
1604       &ds_cnt, &ds_namv, &data);
1605   if (status != 0)
1606     return (send_response(sock, RESP_ERR,
1607           "rrd_fetch_r failed: %s\n", rrd_get_error ()));
1608
1609   add_response_info (sock, "FlushVersion: %lu\n", 1);
1610   add_response_info (sock, "Start: %lu\n", (unsigned long) start_tm);
1611   add_response_info (sock, "End: %lu\n", (unsigned long) end_tm);
1612   add_response_info (sock, "Step: %lu\n", step);
1613   add_response_info (sock, "DSCount: %lu\n", ds_cnt);
1614
1615 #define SSTRCAT(buffer,str,buffer_fill) do { \
1616     size_t str_len = strlen (str); \
1617     if ((buffer_fill + str_len) > sizeof (buffer)) \
1618       str_len = sizeof (buffer) - buffer_fill; \
1619     if (str_len > 0) { \
1620       strncpy (buffer + buffer_fill, str, str_len); \
1621       buffer_fill += str_len; \
1622       assert (buffer_fill <= sizeof (buffer)); \
1623       if (buffer_fill == sizeof (buffer)) \
1624         buffer[buffer_fill - 1] = 0; \
1625       else \
1626         buffer[buffer_fill] = 0; \
1627     } \
1628   } while (0)
1629
1630   { /* Add list of DS names */
1631     char linebuf[1024];
1632     size_t linebuf_fill;
1633
1634     memset (linebuf, 0, sizeof (linebuf));
1635     linebuf_fill = 0;
1636     for (i = 0; i < ds_cnt; i++)
1637     {
1638       if (i > 0)
1639         SSTRCAT (linebuf, " ", linebuf_fill);
1640       SSTRCAT (linebuf, ds_namv[i], linebuf_fill);
1641       rrd_freemem(ds_namv[i]);
1642     }
1643     rrd_freemem(ds_namv);
1644     add_response_info (sock, "DSName: %s\n", linebuf);
1645   }
1646
1647   /* Add the actual data */
1648   assert (step > 0);
1649   data_ptr = data;
1650   for (t = start_tm + step; t <= end_tm; t += step)
1651   {
1652     char linebuf[1024];
1653     size_t linebuf_fill;
1654     char tmp[128];
1655
1656     memset (linebuf, 0, sizeof (linebuf));
1657     linebuf_fill = 0;
1658     for (i = 0; i < ds_cnt; i++)
1659     {
1660       snprintf (tmp, sizeof (tmp), " %0.10e", *data_ptr);
1661       tmp[sizeof (tmp) - 1] = 0;
1662       SSTRCAT (linebuf, tmp, linebuf_fill);
1663
1664       data_ptr++;
1665     }
1666
1667     add_response_info (sock, "%10lu:%s\n", (unsigned long) t, linebuf);
1668   } /* for (t) */
1669   rrd_freemem(data);
1670
1671   return (send_response (sock, RESP_OK, "Success\n"));
1672 #undef SSTRCAT
1673 } /* }}} int handle_request_fetch */
1674
1675 /* we came across a "WROTE" entry during journal replay.
1676  * throw away any values that we have accumulated for this file
1677  */
1678 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1679 {
1680   cache_item_t *ci;
1681   const char *file = buffer;
1682
1683   pthread_mutex_lock(&cache_lock);
1684
1685   ci = g_tree_lookup(cache_tree, file);
1686   if (ci == NULL)
1687   {
1688     pthread_mutex_unlock(&cache_lock);
1689     return (0);
1690   }
1691
1692   if (ci->values)
1693     rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1694
1695   wipe_ci_values(ci, now);
1696   remove_from_queue(ci);
1697
1698   pthread_mutex_unlock(&cache_lock);
1699   return (0);
1700 } /* }}} int handle_request_wrote */
1701
1702 static int handle_request_info (HANDLER_PROTO) /* {{{ */
1703 {
1704   char *file, file_tmp[PATH_MAX];
1705   int status;
1706   rrd_info_t *info;
1707
1708   /* obtain filename */
1709   status = buffer_get_field(&buffer, &buffer_size, &file);
1710   if (status != 0)
1711     return syntax_error(sock,cmd);
1712   /* get full pathname */
1713   get_abs_path(&file, file_tmp);
1714   if (!check_file_access(file, sock)) {
1715     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1716   }
1717   /* get data */
1718   rrd_clear_error ();
1719   info = rrd_info_r(file);
1720   if(!info) {
1721     return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1722   }
1723   for (rrd_info_t *data = info; data != NULL; data = data->next) {
1724       switch (data->type) {
1725       case RD_I_VAL:
1726           if (isnan(data->value.u_val))
1727               add_response_info(sock,"%s %d NaN\n",data->key, data->type);
1728           else
1729               add_response_info(sock,"%s %d %0.10e\n", data->key, data->type, data->value.u_val);
1730           break;
1731       case RD_I_CNT:
1732           add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_cnt);
1733           break;
1734       case RD_I_INT:
1735           add_response_info(sock,"%s %d %d\n", data->key, data->type, data->value.u_int);
1736           break;
1737       case RD_I_STR:
1738           add_response_info(sock,"%s %d %s\n", data->key, data->type, data->value.u_str);
1739           break;
1740       case RD_I_BLO:
1741           add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_blo.size);
1742           break;
1743       }
1744   }
1745
1746   rrd_info_free(info);
1747
1748   return send_response(sock, RESP_OK, "Info for %s follows\n",file);
1749 } /* }}} static int handle_request_info  */
1750
1751 static int handle_request_first (HANDLER_PROTO) /* {{{ */
1752 {
1753   char *i, *file, file_tmp[PATH_MAX];
1754   int status;
1755   int idx;
1756   time_t t;
1757
1758   /* obtain filename */
1759   status = buffer_get_field(&buffer, &buffer_size, &file);
1760   if (status != 0)
1761     return syntax_error(sock,cmd);
1762   /* get full pathname */
1763   get_abs_path(&file, file_tmp);
1764   if (!check_file_access(file, sock)) {
1765     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1766   }
1767
1768   status = buffer_get_field(&buffer, &buffer_size, &i);
1769   if (status != 0)
1770     return syntax_error(sock,cmd);
1771   idx = atoi(i);
1772   if(idx<0) { 
1773     return send_response(sock, RESP_ERR, "Invalid index specified (%d)\n", idx);
1774   }
1775
1776   /* get data */
1777   rrd_clear_error ();
1778   t = rrd_first_r(file,idx);
1779   if(t<1) {
1780     return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1781   }
1782   return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1783 } /* }}} static int handle_request_first  */
1784
1785
1786 static int handle_request_last (HANDLER_PROTO) /* {{{ */
1787 {
1788   char *file, file_tmp[PATH_MAX];
1789   int status;
1790   time_t t, from_file, step;
1791   rrd_file_t * rrd_file;
1792   cache_item_t * ci;
1793   rrd_t rrd; 
1794
1795   /* obtain filename */
1796   status = buffer_get_field(&buffer, &buffer_size, &file);
1797   if (status != 0)
1798     return syntax_error(sock,cmd);
1799   /* get full pathname */
1800   get_abs_path(&file, file_tmp);
1801   if (!check_file_access(file, sock)) {
1802     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1803   }
1804   rrd_clear_error();
1805   rrd_init(&rrd);
1806   rrd_file = rrd_open(file,&rrd,RRD_READONLY);
1807   if(!rrd_file) {
1808     return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1809   }
1810   from_file = rrd.live_head->last_up;
1811   step = rrd.stat_head->pdp_step;
1812   rrd_close(rrd_file);
1813   pthread_mutex_lock(&cache_lock);
1814   ci = g_tree_lookup(cache_tree, file);
1815   if (ci)
1816     t = ci->last_update_stamp;
1817   else
1818     t = from_file;
1819   pthread_mutex_unlock(&cache_lock);
1820   t -= t % step;
1821   rrd_free(&rrd);
1822   if(t<1) {
1823     return send_response(sock, RESP_ERR, "Error: rrdcached: Invalid timestamp returned\n");
1824   }
1825   return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1826 } /* }}} static int handle_request_last  */
1827
1828 static int handle_request_create (HANDLER_PROTO) /* {{{ */
1829 {
1830   char *file, file_tmp[PATH_MAX];
1831   char *tok;
1832   int ac = 0;
1833   char *av[128];
1834   int status;
1835   unsigned long step = 300;
1836   time_t last_up = time(NULL)-10;
1837   int no_overwrite = opt_no_overwrite;
1838
1839
1840   /* obtain filename */
1841   status = buffer_get_field(&buffer, &buffer_size, &file);
1842   if (status != 0)
1843     return syntax_error(sock,cmd);
1844   /* get full pathname */
1845   get_abs_path(&file, file_tmp);
1846   if (!check_file_access(file, sock)) {
1847     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1848   }
1849   RRDD_LOG(LOG_INFO, "rrdcreate request for %s",file);
1850
1851   while ((status = buffer_get_field(&buffer, &buffer_size, &tok)) == 0 && tok) {
1852     if( ! strncmp(tok,"-b",2) ) {
1853       status = buffer_get_field(&buffer, &buffer_size, &tok );
1854       if (status != 0) return syntax_error(sock,cmd);
1855       last_up = (time_t) atol(tok);
1856       continue;
1857     }
1858     if( ! strncmp(tok,"-s",2) ) {
1859       status = buffer_get_field(&buffer, &buffer_size, &tok );
1860       if (status != 0) return syntax_error(sock,cmd);
1861       step = atol(tok);
1862       continue;
1863     }
1864     if( ! strncmp(tok,"-O",2) ) {
1865       no_overwrite = 1;
1866       continue;
1867     }
1868     if( ! strncmp(tok,"DS:",3) ) { av[ac++]=tok; continue; }
1869     if( ! strncmp(tok,"RRA:",4) ) { av[ac++]=tok; continue; }
1870     return syntax_error(sock,cmd);
1871   }
1872   if(step<1) {
1873     return send_response(sock, RESP_ERR, "The step size cannot be less than 1 second.\n");
1874   }
1875   if (last_up < 3600 * 24 * 365 * 10) {
1876     return send_response(sock, RESP_ERR, "The first entry must be after 1980.\n");
1877   }
1878
1879   rrd_clear_error ();
1880   status = rrd_create_r2(file,step,last_up,no_overwrite,ac,(const char **)av);
1881
1882   if(!status) {
1883     return send_response(sock, RESP_OK, "RRD created OK\n");
1884   }
1885   return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1886 } /* }}} static int handle_request_create  */
1887
1888 /* start "BATCH" processing */
1889 static int batch_start (HANDLER_PROTO) /* {{{ */
1890 {
1891   int status;
1892   if (sock->batch_start)
1893     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1894
1895   status = send_response(sock, RESP_OK,
1896                          "Go ahead.  End with dot '.' on its own line.\n");
1897   sock->batch_start = time(NULL);
1898   sock->batch_cmd = 0;
1899
1900   return status;
1901 } /* }}} static int batch_start */
1902
1903 /* finish "BATCH" processing and return results to the client */
1904 static int batch_done (HANDLER_PROTO) /* {{{ */
1905 {
1906   assert(sock->batch_start);
1907   sock->batch_start = 0;
1908   sock->batch_cmd  = 0;
1909   return send_response(sock, RESP_OK, "errors\n");
1910 } /* }}} static int batch_done */
1911
1912 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1913 {
1914   return -1;
1915 } /* }}} static int handle_request_quit */
1916
1917 static command_t list_of_commands[] = { /* {{{ */
1918   {
1919     "UPDATE",
1920     handle_request_update,
1921     CMD_CONTEXT_ANY,
1922     "UPDATE <filename> <values> [<values> ...]\n"
1923     ,
1924     "Adds the given file to the internal cache if it is not yet known and\n"
1925     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1926     "for details.\n"
1927     "\n"
1928     "Each <values> has the following form:\n"
1929     "  <values> = <time>:<value>[:<value>[...]]\n"
1930     "See the rrdupdate(1) manpage for details.\n"
1931   },
1932   {
1933     "WROTE",
1934     handle_request_wrote,
1935     CMD_CONTEXT_JOURNAL,
1936     NULL,
1937     NULL
1938   },
1939   {
1940     "FLUSH",
1941     handle_request_flush,
1942     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1943     "FLUSH <filename>\n"
1944     ,
1945     "Adds the given filename to the head of the update queue and returns\n"
1946     "after it has been dequeued.\n"
1947   },
1948   {
1949     "FLUSHALL",
1950     handle_request_flushall,
1951     CMD_CONTEXT_CLIENT,
1952     "FLUSHALL\n"
1953     ,
1954     "Triggers writing of all pending updates.  Returns immediately.\n"
1955   },
1956   {
1957     "PENDING",
1958     handle_request_pending,
1959     CMD_CONTEXT_CLIENT,
1960     "PENDING <filename>\n"
1961     ,
1962     "Shows any 'pending' updates for a file, in order.\n"
1963     "The updates shown have not yet been written to the underlying RRD file.\n"
1964   },
1965   {
1966     "FORGET",
1967     handle_request_forget,
1968     CMD_CONTEXT_ANY,
1969     "FORGET <filename>\n"
1970     ,
1971     "Removes the file completely from the cache.\n"
1972     "Any pending updates for the file will be lost.\n"
1973   },
1974   {
1975     "QUEUE",
1976     handle_request_queue,
1977     CMD_CONTEXT_CLIENT,
1978     "QUEUE\n"
1979     ,
1980         "Shows all files in the output queue.\n"
1981     "The output is zero or more lines in the following format:\n"
1982     "(where <num_vals> is the number of values to be written)\n"
1983     "\n"
1984     "<num_vals> <filename>\n"
1985   },
1986   {
1987     "STATS",
1988     handle_request_stats,
1989     CMD_CONTEXT_CLIENT,
1990     "STATS\n"
1991     ,
1992     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1993     "a description of the values.\n"
1994   },
1995   {
1996     "HELP",
1997     handle_request_help,
1998     CMD_CONTEXT_CLIENT,
1999     "HELP [<command>]\n",
2000     NULL, /* special! */
2001   },
2002   {
2003     "BATCH",
2004     batch_start,
2005     CMD_CONTEXT_CLIENT,
2006     "BATCH\n"
2007     ,
2008     "The 'BATCH' command permits the client to initiate a bulk load\n"
2009     "   of commands to rrdcached.\n"
2010     "\n"
2011     "Usage:\n"
2012     "\n"
2013     "    client: BATCH\n"
2014     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
2015     "    client: command #1\n"
2016     "    client: command #2\n"
2017     "    client: ... and so on\n"
2018     "    client: .\n"
2019     "    server: 2 errors\n"
2020     "    server: 7 message for command #7\n"
2021     "    server: 9 message for command #9\n"
2022     "\n"
2023     "For more information, consult the rrdcached(1) documentation.\n"
2024   },
2025   {
2026     ".",   /* BATCH terminator */
2027     batch_done,
2028     CMD_CONTEXT_BATCH,
2029     NULL,
2030     NULL
2031   },
2032   {
2033     "FETCH",
2034     handle_request_fetch,
2035     CMD_CONTEXT_CLIENT,
2036     "FETCH <file> <CF> [<start> [<end>]]\n"
2037     ,
2038     "The 'FETCH' can be used by the client to retrieve values from an RRD file.\n"
2039   },
2040   {
2041     "INFO",
2042     handle_request_info,
2043     CMD_CONTEXT_CLIENT,
2044     "INFO <filename>\n",
2045     "The INFO command retrieves information about a specified RRD file.\n"
2046     "This is returned in standard rrdinfo format, a sequence of lines\n"
2047     "with the format <keyname> = <value>\n"
2048     "Note that this is the data as of the last update of the RRD file itself,\n"
2049     "not the last time data was received via rrdcached, so there may be pending\n"
2050     "updates in the queue.  If this bothers you, then first run a FLUSH.\n"
2051   },
2052   {
2053     "FIRST",
2054     handle_request_first,
2055     CMD_CONTEXT_CLIENT,
2056     "FIRST <filename> <rra index>\n",
2057     "The FIRST command retrieves the first data time for a specified RRA in\n"
2058     "an RRD file.\n"
2059   },
2060   {
2061     "LAST",
2062     handle_request_last,
2063     CMD_CONTEXT_CLIENT,
2064     "LAST <filename>\n",
2065     "The LAST command retrieves the last update time for a specified RRD file.\n"
2066     "Note that this is the time of the last update of the RRD file itself, not\n"
2067     "the last time data was received via rrdcached, so there may be pending\n"
2068     "updates in the queue.  If this bothers you, then first run a FLUSH.\n"
2069   },
2070   {
2071     "CREATE",
2072     handle_request_create,
2073     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2074     "CREATE <filename> [-b start] [-s step] [-O] <DS definitions> <RRA definitions>\n",
2075     "The CREATE command will create an RRD file, overwriting any existing file\n"
2076     "unless the -O option is given or rrdcached was started with the -O option.\n"
2077     "The start parameter needs to be in seconds since 1/1/70 (AT-style syntax is\n"
2078     "not acceptable) and the step is in seconds (default is 300).\n"
2079     "The DS and RRA definitions are as for the 'rrdtool create' command.\n"
2080   },
2081   {
2082     "QUIT",
2083     handle_request_quit,
2084     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2085     "QUIT\n"
2086     ,
2087     "Disconnect from rrdcached.\n"
2088   }
2089 }; /* }}} command_t list_of_commands[] */
2090 static size_t list_of_commands_len = sizeof (list_of_commands)
2091   / sizeof (list_of_commands[0]);
2092
2093 static command_t *find_command(char *cmd)
2094 {
2095   size_t i;
2096
2097   for (i = 0; i < list_of_commands_len; i++)
2098     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2099       return (&list_of_commands[i]);
2100   return NULL;
2101 }
2102
2103 /* We currently use the index in the `list_of_commands' array as a bit position
2104  * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
2105  * outside these functions so that switching to a more elegant storage method
2106  * is easily possible. */
2107 static ssize_t find_command_index (const char *cmd) /* {{{ */
2108 {
2109   size_t i;
2110
2111   for (i = 0; i < list_of_commands_len; i++)
2112     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2113       return ((ssize_t) i);
2114   return (-1);
2115 } /* }}} ssize_t find_command_index */
2116
2117 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
2118     const char *cmd)
2119 {
2120   ssize_t i;
2121
2122   if (JOURNAL_REPLAY(sock))
2123     return (1);
2124
2125   if (cmd == NULL)
2126     return (-1);
2127
2128   if ((strcasecmp ("QUIT", cmd) == 0)
2129       || (strcasecmp ("HELP", cmd) == 0))
2130     return (1);
2131   else if (strcmp (".", cmd) == 0)
2132     cmd = "BATCH";
2133
2134   i = find_command_index (cmd);
2135   if (i < 0)
2136     return (-1);
2137   assert (i < 32);
2138
2139   if ((sock->permissions & (1 << i)) != 0)
2140     return (1);
2141   return (0);
2142 } /* }}} int socket_permission_check */
2143
2144 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
2145     const char *cmd)
2146 {
2147   ssize_t i;
2148
2149   i = find_command_index (cmd);
2150   if (i < 0)
2151     return (-1);
2152   assert (i < 32);
2153
2154   sock->permissions |= (1 << i);
2155   return (0);
2156 } /* }}} int socket_permission_add */
2157
2158 static void socket_permission_clear (listen_socket_t *sock) /* {{{ */
2159 {
2160   sock->permissions = 0;
2161 } /* }}} socket_permission_clear */
2162
2163 static void socket_permission_copy (listen_socket_t *dest, /* {{{ */
2164     listen_socket_t *src)
2165 {
2166   dest->permissions = src->permissions;
2167 } /* }}} socket_permission_copy */
2168
2169 static void socket_permission_set_all (listen_socket_t *sock) /* {{{ */
2170 {
2171   size_t i;
2172
2173   sock->permissions = 0;
2174   for (i = 0; i < list_of_commands_len; i++)
2175     sock->permissions |= (1 << i);
2176 } /* }}} void socket_permission_set_all */
2177
2178 /* check whether commands are received in the expected context */
2179 static int command_check_context(listen_socket_t *sock, command_t *cmd)
2180 {
2181   if (JOURNAL_REPLAY(sock))
2182     return (cmd->context & CMD_CONTEXT_JOURNAL);
2183   else if (sock->batch_start)
2184     return (cmd->context & CMD_CONTEXT_BATCH);
2185   else
2186     return (cmd->context & CMD_CONTEXT_CLIENT);
2187
2188   /* NOTREACHED */
2189   assert(1==0);
2190 }
2191
2192 static int handle_request_help (HANDLER_PROTO) /* {{{ */
2193 {
2194   int status;
2195   char *cmd_str;
2196   char *resp_txt;
2197   command_t *help = NULL;
2198
2199   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
2200   if (status == 0)
2201     help = find_command(cmd_str);
2202
2203   if (help && (help->syntax || help->help))
2204   {
2205     char tmp[RRD_CMD_MAX];
2206
2207     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
2208     resp_txt = tmp;
2209
2210     if (help->syntax)
2211       add_response_info(sock, "Usage: %s\n", help->syntax);
2212
2213     if (help->help)
2214       add_response_info(sock, "%s\n", help->help);
2215   }
2216   else
2217   {
2218     size_t i;
2219
2220     resp_txt = "Command overview\n";
2221
2222     for (i = 0; i < list_of_commands_len; i++)
2223     {
2224       if (list_of_commands[i].syntax == NULL)
2225         continue;
2226       add_response_info (sock, "%s", list_of_commands[i].syntax);
2227     }
2228   }
2229
2230   return send_response(sock, RESP_OK, resp_txt);
2231 } /* }}} int handle_request_help */
2232
2233 static int handle_request (DISPATCH_PROTO) /* {{{ */
2234 {
2235   char *buffer_ptr = buffer;
2236   char *cmd_str = NULL;
2237   command_t *cmd = NULL;
2238   int status;
2239
2240   assert (buffer[buffer_size - 1] == '\0');
2241
2242   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
2243   if (status != 0)
2244   {
2245     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
2246     return (-1);
2247   }
2248
2249   if (sock != NULL && sock->batch_start)
2250     sock->batch_cmd++;
2251
2252   cmd = find_command(cmd_str);
2253   if (!cmd)
2254     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
2255
2256   if (!socket_permission_check (sock, cmd->cmd))
2257     return send_response(sock, RESP_ERR, "Permission denied.\n");
2258
2259   if (!command_check_context(sock, cmd))
2260     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
2261
2262   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
2263 } /* }}} int handle_request */
2264
2265 static void journal_set_free (journal_set *js) /* {{{ */
2266 {
2267   if (js == NULL)
2268     return;
2269
2270   rrd_free_ptrs((void ***) &js->files, &js->files_num);
2271
2272   free(js);
2273 } /* }}} journal_set_free */
2274
2275 static void journal_set_remove (journal_set *js) /* {{{ */
2276 {
2277   if (js == NULL)
2278     return;
2279
2280   for (uint i=0; i < js->files_num; i++)
2281   {
2282     RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
2283     unlink(js->files[i]);
2284   }
2285 } /* }}} journal_set_remove */
2286
2287 /* close current journal file handle.
2288  * MUST hold journal_lock before calling */
2289 static void journal_close(void) /* {{{ */
2290 {
2291   if (journal_fh != NULL)
2292   {
2293     if (fclose(journal_fh) != 0)
2294       RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
2295   }
2296
2297   journal_fh = NULL;
2298   journal_size = 0;
2299 } /* }}} journal_close */
2300
2301 /* MUST hold journal_lock before calling */
2302 static void journal_new_file(void) /* {{{ */
2303 {
2304   struct timeval now;
2305   int  new_fd;
2306   char new_file[PATH_MAX + 1];
2307
2308   assert(journal_dir != NULL);
2309   assert(journal_cur != NULL);
2310
2311   journal_close();
2312
2313   gettimeofday(&now, NULL);
2314   /* this format assures that the files sort in strcmp() order */
2315   snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
2316            journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
2317
2318   new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
2319                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
2320   if (new_fd < 0)
2321     goto error;
2322
2323   journal_fh = fdopen(new_fd, "a");
2324   if (journal_fh == NULL)
2325     goto error;
2326
2327   journal_size = ftell(journal_fh);
2328   RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
2329
2330   /* record the file in the journal set */
2331   rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
2332
2333   return;
2334
2335 error:
2336   RRDD_LOG(LOG_CRIT,
2337            "JOURNALING DISABLED: Error while trying to create %s : %s",
2338            new_file, rrd_strerror(errno));
2339   RRDD_LOG(LOG_CRIT,
2340            "JOURNALING DISABLED: All values will be flushed at shutdown");
2341
2342   close(new_fd);
2343   config_flush_at_shutdown = 1;
2344
2345 } /* }}} journal_new_file */
2346
2347 /* MUST NOT hold journal_lock before calling this */
2348 static void journal_rotate(void) /* {{{ */
2349 {
2350   journal_set *old_js = NULL;
2351
2352   if (journal_dir == NULL)
2353     return;
2354
2355   RRDD_LOG(LOG_DEBUG, "rotating journals");
2356
2357   pthread_mutex_lock(&stats_lock);
2358   ++stats_journal_rotate;
2359   pthread_mutex_unlock(&stats_lock);
2360
2361   pthread_mutex_lock(&journal_lock);
2362
2363   journal_close();
2364
2365   /* rotate the journal sets */
2366   old_js = journal_old;
2367   journal_old = journal_cur;
2368   journal_cur = calloc(1, sizeof(journal_set));
2369
2370   if (journal_cur != NULL)
2371     journal_new_file();
2372   else
2373     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2374
2375   pthread_mutex_unlock(&journal_lock);
2376
2377   journal_set_remove(old_js);
2378   journal_set_free  (old_js);
2379
2380 } /* }}} static void journal_rotate */
2381
2382 /* MUST hold journal_lock when calling */
2383 static void journal_done(void) /* {{{ */
2384 {
2385   if (journal_cur == NULL)
2386     return;
2387
2388   journal_close();
2389
2390   if (config_flush_at_shutdown)
2391   {
2392     RRDD_LOG(LOG_INFO, "removing journals");
2393     journal_set_remove(journal_old);
2394     journal_set_remove(journal_cur);
2395   }
2396   else
2397   {
2398     RRDD_LOG(LOG_INFO, "expedited shutdown; "
2399              "journals will be used at next startup");
2400   }
2401
2402   journal_set_free(journal_cur);
2403   journal_set_free(journal_old);
2404   free(journal_dir);
2405
2406 } /* }}} static void journal_done */
2407
2408 static int journal_write(char *cmd, char *args) /* {{{ */
2409 {
2410   int chars;
2411
2412   if (journal_fh == NULL)
2413     return 0;
2414
2415   pthread_mutex_lock(&journal_lock);
2416   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
2417   journal_size += chars;
2418
2419   if (journal_size > JOURNAL_MAX)
2420     journal_new_file();
2421
2422   pthread_mutex_unlock(&journal_lock);
2423
2424   if (chars > 0)
2425   {
2426     pthread_mutex_lock(&stats_lock);
2427     stats_journal_bytes += chars;
2428     pthread_mutex_unlock(&stats_lock);
2429   }
2430
2431   return chars;
2432 } /* }}} static int journal_write */
2433
2434 static int journal_replay (const char *file) /* {{{ */
2435 {
2436   FILE *fh;
2437   int entry_cnt = 0;
2438   int fail_cnt = 0;
2439   uint64_t line = 0;
2440   char entry[RRD_CMD_MAX];
2441   time_t now;
2442
2443   if (file == NULL) return 0;
2444
2445   {
2446     char *reason = "unknown error";
2447     int status = 0;
2448     struct stat statbuf;
2449
2450     memset(&statbuf, 0, sizeof(statbuf));
2451     if (stat(file, &statbuf) != 0)
2452     {
2453       reason = "stat error";
2454       status = errno;
2455     }
2456     else if (!S_ISREG(statbuf.st_mode))
2457     {
2458       reason = "not a regular file";
2459       status = EPERM;
2460     }
2461     if (statbuf.st_uid != daemon_uid)
2462     {
2463       reason = "not owned by daemon user";
2464       status = EACCES;
2465     }
2466     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2467     {
2468       reason = "must not be user/group writable";
2469       status = EACCES;
2470     }
2471
2472     if (status != 0)
2473     {
2474       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2475                file, rrd_strerror(status), reason);
2476       return 0;
2477     }
2478   }
2479
2480   fh = fopen(file, "r");
2481   if (fh == NULL)
2482   {
2483     if (errno != ENOENT)
2484       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2485                file, rrd_strerror(errno));
2486     return 0;
2487   }
2488   else
2489     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2490
2491   now = time(NULL);
2492
2493   while(!feof(fh))
2494   {
2495     size_t entry_len;
2496
2497     ++line;
2498     if (fgets(entry, sizeof(entry), fh) == NULL)
2499       break;
2500     entry_len = strlen(entry);
2501
2502     /* check \n termination in case journal writing crashed mid-line */
2503     if (entry_len == 0)
2504       continue;
2505     else if (entry[entry_len - 1] != '\n')
2506     {
2507       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2508       ++fail_cnt;
2509       continue;
2510     }
2511
2512     entry[entry_len - 1] = '\0';
2513
2514     if (handle_request(NULL, now, entry, entry_len) == 0)
2515       ++entry_cnt;
2516     else
2517       ++fail_cnt;
2518   }
2519
2520   fclose(fh);
2521
2522   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2523            entry_cnt, fail_cnt);
2524
2525   return entry_cnt > 0 ? 1 : 0;
2526 } /* }}} static int journal_replay */
2527
2528 static int journal_sort(const void *v1, const void *v2)
2529 {
2530   char **jn1 = (char **) v1;
2531   char **jn2 = (char **) v2;
2532
2533   return strcmp(*jn1,*jn2);
2534 }
2535
2536 static void journal_init(void) /* {{{ */
2537 {
2538   int had_journal = 0;
2539   DIR *dir;
2540   struct dirent *dent;
2541   char path[PATH_MAX+1];
2542
2543   if (journal_dir == NULL) return;
2544
2545   pthread_mutex_lock(&journal_lock);
2546
2547   journal_cur = calloc(1, sizeof(journal_set));
2548   if (journal_cur == NULL)
2549   {
2550     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2551     return;
2552   }
2553
2554   RRDD_LOG(LOG_INFO, "checking for journal files");
2555
2556   /* Handle old journal files during transition.  This gives them the
2557    * correct sort order.  TODO: remove after first release
2558    */
2559   {
2560     char old_path[PATH_MAX+1];
2561     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2562     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2563     rename(old_path, path);
2564
2565     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE        );
2566     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2567     rename(old_path, path);
2568   }
2569
2570   dir = opendir(journal_dir);
2571   if (!dir) {
2572     RRDD_LOG(LOG_CRIT, "journal_init: opendir(%s) failed\n", journal_dir);
2573     return;
2574   }
2575   while ((dent = readdir(dir)) != NULL)
2576   {
2577     /* looks like a journal file? */
2578     if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2579       continue;
2580
2581     snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2582
2583     if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2584     {
2585       RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2586                dent->d_name);
2587       break;
2588     }
2589   }
2590   closedir(dir);
2591
2592   qsort(journal_cur->files, journal_cur->files_num,
2593         sizeof(journal_cur->files[0]), journal_sort);
2594
2595   for (uint i=0; i < journal_cur->files_num; i++)
2596     had_journal += journal_replay(journal_cur->files[i]);
2597
2598   journal_new_file();
2599
2600   /* it must have been a crash.  start a flush */
2601   if (had_journal && config_flush_at_shutdown)
2602     flush_old_values(-1);
2603
2604   pthread_mutex_unlock(&journal_lock);
2605
2606   RRDD_LOG(LOG_INFO, "journal processing complete");
2607
2608 } /* }}} static void journal_init */
2609
2610 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2611 {
2612   assert(sock != NULL);
2613
2614   free(sock->rbuf);  sock->rbuf = NULL;
2615   free(sock->wbuf);  sock->wbuf = NULL;
2616   free(sock);
2617 } /* }}} void free_listen_socket */
2618
2619 static void close_connection(listen_socket_t *sock) /* {{{ */
2620 {
2621   if (sock->fd >= 0)
2622   {
2623     close(sock->fd);
2624     sock->fd = -1;
2625   }
2626
2627   free_listen_socket(sock);
2628
2629 } /* }}} void close_connection */
2630
2631 static void *connection_thread_main (void *args) /* {{{ */
2632 {
2633   listen_socket_t *sock;
2634   int fd;
2635
2636   sock = (listen_socket_t *) args;
2637   fd = sock->fd;
2638
2639   /* init read buffers */
2640   sock->next_read = sock->next_cmd = 0;
2641   sock->rbuf = malloc(RBUF_SIZE);
2642   if (sock->rbuf == NULL)
2643   {
2644     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2645     close_connection(sock);
2646     return NULL;
2647   }
2648
2649   pthread_mutex_lock (&connection_threads_lock);
2650 #ifdef HAVE_LIBWRAP
2651   /* LIBWRAP does not support multiple threads! By putting this code
2652      inside pthread_mutex_lock we do not have to worry about request_info
2653      getting overwritten by another thread.
2654   */
2655   struct request_info req;
2656   request_init(&req, RQ_DAEMON, "rrdcache\0", RQ_FILE, fd, NULL );
2657   fromhost(&req);
2658   if(!hosts_access(&req)) {
2659     RRDD_LOG(LOG_INFO, "refused connection from %s", eval_client(&req));
2660     pthread_mutex_unlock (&connection_threads_lock);
2661     close_connection(sock);
2662     return NULL;
2663   }
2664 #endif /* HAVE_LIBWRAP */
2665   connection_threads_num++;
2666   pthread_mutex_unlock (&connection_threads_lock);
2667
2668   while (state == RUNNING)
2669   {
2670     char *cmd;
2671     ssize_t cmd_len;
2672     ssize_t rbytes;
2673     time_t now;
2674
2675     struct pollfd pollfd;
2676     int status;
2677
2678     pollfd.fd = fd;
2679     pollfd.events = POLLIN | POLLPRI;
2680     pollfd.revents = 0;
2681
2682     status = poll (&pollfd, 1, /* timeout = */ 500);
2683     if (state != RUNNING)
2684       break;
2685     else if (status == 0) /* timeout */
2686       continue;
2687     else if (status < 0) /* error */
2688     {
2689       status = errno;
2690       if (status != EINTR)
2691         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2692       continue;
2693     }
2694
2695     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2696       break;
2697     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2698     {
2699       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2700           "poll(2) returned something unexpected: %#04hx",
2701           pollfd.revents);
2702       break;
2703     }
2704
2705     rbytes = read(fd, sock->rbuf + sock->next_read,
2706                   RBUF_SIZE - sock->next_read);
2707     if (rbytes < 0)
2708     {
2709       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2710       break;
2711     }
2712     else if (rbytes == 0)
2713       break; /* eof */
2714
2715     sock->next_read += rbytes;
2716
2717     if (sock->batch_start)
2718       now = sock->batch_start;
2719     else
2720       now = time(NULL);
2721
2722     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2723     {
2724       status = handle_request (sock, now, cmd, cmd_len+1);
2725       if (status != 0)
2726         goto out_close;
2727     }
2728   }
2729
2730 out_close:
2731   close_connection(sock);
2732
2733   /* Remove this thread from the connection threads list */
2734   pthread_mutex_lock (&connection_threads_lock);
2735   connection_threads_num--;
2736   if (connection_threads_num <= 0)
2737     pthread_cond_broadcast(&connection_threads_done);
2738   pthread_mutex_unlock (&connection_threads_lock);
2739
2740   return (NULL);
2741 } /* }}} void *connection_thread_main */
2742
2743 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2744 {
2745   int fd;
2746   struct sockaddr_un sa;
2747   listen_socket_t *temp;
2748   int status;
2749   const char *path;
2750   char *path_copy, *dir;
2751
2752   path = sock->addr;
2753   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2754     path += strlen("unix:");
2755
2756   /* dirname may modify its argument */
2757   path_copy = strdup(path);
2758   if (path_copy == NULL)
2759   {
2760     fprintf(stderr, "rrdcached: strdup(): %s\n",
2761         rrd_strerror(errno));
2762     return (-1);
2763   }
2764
2765   dir = dirname(path_copy);
2766   if (rrd_mkdir_p(dir, 0777) != 0)
2767   {
2768     fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2769         dir, rrd_strerror(errno));
2770     return (-1);
2771   }
2772
2773   free(path_copy);
2774
2775   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2776       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2777   if (temp == NULL)
2778   {
2779     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2780     return (-1);
2781   }
2782   listen_fds = temp;
2783   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2784
2785   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2786   if (fd < 0)
2787   {
2788     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2789              rrd_strerror(errno));
2790     return (-1);
2791   }
2792
2793   memset (&sa, 0, sizeof (sa));
2794   sa.sun_family = AF_UNIX;
2795   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2796
2797   /* if we've gotten this far, we own the pid file.  any daemon started
2798    * with the same args must not be alive.  therefore, ensure that we can
2799    * create the socket...
2800    */
2801   unlink(path);
2802
2803   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2804   if (status != 0)
2805   {
2806     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2807              path, rrd_strerror(errno));
2808     close (fd);
2809     return (-1);
2810   }
2811
2812   /* tweak the sockets group ownership */
2813   if (sock->socket_group != (gid_t)-1)
2814   {
2815     if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2816          (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2817     {
2818       fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2819     }
2820   }
2821
2822   if (sock->socket_permissions != (mode_t)-1)
2823   {
2824     if (chmod(path, sock->socket_permissions) != 0)
2825       fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2826           (unsigned int)sock->socket_permissions, strerror(errno));
2827   }
2828
2829   status = listen (fd, /* backlog = */ 10);
2830   if (status != 0)
2831   {
2832     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2833              path, rrd_strerror(errno));
2834     close (fd);
2835     unlink (path);
2836     return (-1);
2837   }
2838
2839   listen_fds[listen_fds_num].fd = fd;
2840   listen_fds[listen_fds_num].family = PF_UNIX;
2841   strncpy(listen_fds[listen_fds_num].addr, path,
2842           sizeof (listen_fds[listen_fds_num].addr) - 1);
2843   listen_fds_num++;
2844
2845   return (0);
2846 } /* }}} int open_listen_socket_unix */
2847
2848 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2849 {
2850   struct addrinfo ai_hints;
2851   struct addrinfo *ai_res;
2852   struct addrinfo *ai_ptr;
2853   char addr_copy[NI_MAXHOST];
2854   char *addr;
2855   char *port;
2856   int status;
2857
2858   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2859   addr_copy[sizeof (addr_copy) - 1] = 0;
2860   addr = addr_copy;
2861
2862   memset (&ai_hints, 0, sizeof (ai_hints));
2863   ai_hints.ai_flags = 0;
2864 #ifdef AI_ADDRCONFIG
2865   ai_hints.ai_flags |= AI_ADDRCONFIG;
2866 #endif
2867   ai_hints.ai_family = AF_UNSPEC;
2868   ai_hints.ai_socktype = SOCK_STREAM;
2869
2870   port = NULL;
2871   if (*addr == '[') /* IPv6+port format */
2872   {
2873     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2874     addr++;
2875
2876     port = strchr (addr, ']');
2877     if (port == NULL)
2878     {
2879       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2880       return (-1);
2881     }
2882     *port = 0;
2883     port++;
2884
2885     if (*port == ':')
2886       port++;
2887     else if (*port == 0)
2888       port = NULL;
2889     else
2890     {
2891       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2892       return (-1);
2893     }
2894   } /* if (*addr == '[') */
2895   else
2896   {
2897     port = rindex(addr, ':');
2898     if (port != NULL)
2899     {
2900       *port = 0;
2901       port++;
2902     }
2903   }
2904   ai_res = NULL;
2905   status = getaddrinfo (addr,
2906                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2907                         &ai_hints, &ai_res);
2908   if (status != 0)
2909   {
2910     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2911              addr, gai_strerror (status));
2912     return (-1);
2913   }
2914
2915   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2916   {
2917     int fd;
2918     listen_socket_t *temp;
2919     int one = 1;
2920
2921     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2922         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2923     if (temp == NULL)
2924     {
2925       fprintf (stderr,
2926                "rrdcached: open_listen_socket_network: realloc failed.\n");
2927       continue;
2928     }
2929     listen_fds = temp;
2930     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2931
2932     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2933     if (fd < 0)
2934     {
2935       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2936                rrd_strerror(errno));
2937       continue;
2938     }
2939
2940     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2941
2942     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2943     if (status != 0)
2944     {
2945       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2946                sock->addr, rrd_strerror(errno));
2947       close (fd);
2948       continue;
2949     }
2950
2951     status = listen (fd, /* backlog = */ 10);
2952     if (status != 0)
2953     {
2954       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2955                sock->addr, rrd_strerror(errno));
2956       close (fd);
2957       freeaddrinfo(ai_res);
2958       return (-1);
2959     }
2960
2961     listen_fds[listen_fds_num].fd = fd;
2962     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2963     listen_fds_num++;
2964   } /* for (ai_ptr) */
2965
2966   freeaddrinfo(ai_res);
2967   return (0);
2968 } /* }}} static int open_listen_socket_network */
2969
2970 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2971 {
2972   assert(sock != NULL);
2973   assert(sock->addr != NULL);
2974
2975   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2976       || sock->addr[0] == '/')
2977     return (open_listen_socket_unix(sock));
2978   else
2979     return (open_listen_socket_network(sock));
2980 } /* }}} int open_listen_socket */
2981
2982 static int close_listen_sockets (void) /* {{{ */
2983 {
2984   size_t i;
2985
2986   for (i = 0; i < listen_fds_num; i++)
2987   {
2988     close (listen_fds[i].fd);
2989
2990     if (listen_fds[i].family == PF_UNIX)
2991       unlink(listen_fds[i].addr);
2992   }
2993
2994   free (listen_fds);
2995   listen_fds = NULL;
2996   listen_fds_num = 0;
2997
2998   return (0);
2999 } /* }}} int close_listen_sockets */
3000
3001 static void *listen_thread_main (void UNUSED(*args)) /* {{{ */
3002 {
3003   struct pollfd *pollfds;
3004   int pollfds_num;
3005   int status;
3006   int i;
3007
3008   if (listen_fds_num < 1)
3009   {
3010     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
3011     return (NULL);
3012   }
3013
3014   pollfds_num = listen_fds_num;
3015   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
3016   if (pollfds == NULL)
3017   {
3018     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
3019     return (NULL);
3020   }
3021   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
3022
3023   RRDD_LOG(LOG_INFO, "listening for connections");
3024
3025   while (state == RUNNING)
3026   {
3027     for (i = 0; i < pollfds_num; i++)
3028     {
3029       pollfds[i].fd = listen_fds[i].fd;
3030       pollfds[i].events = POLLIN | POLLPRI;
3031       pollfds[i].revents = 0;
3032     }
3033
3034     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
3035     if (state != RUNNING)
3036       break;
3037     else if (status == 0) /* timeout */
3038       continue;
3039     else if (status < 0) /* error */
3040     {
3041       status = errno;
3042       if (status != EINTR)
3043       {
3044         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
3045       }
3046       continue;
3047     }
3048
3049     for (i = 0; i < pollfds_num; i++)
3050     {
3051       listen_socket_t *client_sock;
3052       struct sockaddr_storage client_sa;
3053       socklen_t client_sa_size;
3054       pthread_t tid;
3055       pthread_attr_t attr;
3056
3057       if (pollfds[i].revents == 0)
3058         continue;
3059
3060       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
3061       {
3062         RRDD_LOG (LOG_ERR, "listen_thread_main: "
3063             "poll(2) returned something unexpected for listen FD #%i.",
3064             pollfds[i].fd);
3065         continue;
3066       }
3067
3068       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
3069       if (client_sock == NULL)
3070       {
3071         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
3072         continue;
3073       }
3074       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
3075
3076       client_sa_size = sizeof (client_sa);
3077       client_sock->fd = accept (pollfds[i].fd,
3078           (struct sockaddr *) &client_sa, &client_sa_size);
3079       if (client_sock->fd < 0)
3080       {
3081         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
3082         free(client_sock);
3083         continue;
3084       }
3085
3086       pthread_attr_init (&attr);
3087       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
3088
3089       status = pthread_create (&tid, &attr, connection_thread_main,
3090                                client_sock);
3091       if (status != 0)
3092       {
3093         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
3094         close_connection(client_sock);
3095         continue;
3096       }
3097     } /* for (pollfds_num) */
3098   } /* while (state == RUNNING) */
3099
3100   RRDD_LOG(LOG_INFO, "starting shutdown");
3101
3102   close_listen_sockets ();
3103
3104   pthread_mutex_lock (&connection_threads_lock);
3105   while (connection_threads_num > 0)
3106     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
3107   pthread_mutex_unlock (&connection_threads_lock);
3108
3109   free(pollfds);
3110
3111   return (NULL);
3112 } /* }}} void *listen_thread_main */
3113
3114 static int daemonize (void) /* {{{ */
3115 {
3116   int pid_fd;
3117   char *base_dir;
3118
3119   daemon_uid = geteuid();
3120
3121   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
3122   if (pid_fd < 0)
3123     pid_fd = check_pidfile();
3124   if (pid_fd < 0)
3125     return pid_fd;
3126
3127   /* open all the listen sockets */
3128   if (config_listen_address_list_len > 0)
3129   {
3130     for (size_t i = 0; i < config_listen_address_list_len; i++)
3131       open_listen_socket (config_listen_address_list[i]);
3132
3133     rrd_free_ptrs((void ***) &config_listen_address_list,
3134                   &config_listen_address_list_len);
3135   }
3136   else
3137   {
3138     strncpy(default_socket.addr, RRDCACHED_DEFAULT_ADDRESS,
3139         sizeof(default_socket.addr) - 1);
3140     default_socket.addr[sizeof(default_socket.addr) - 1] = '\0';
3141
3142     if (default_socket.permissions == 0)
3143       socket_permission_set_all (&default_socket);
3144
3145     open_listen_socket (&default_socket);
3146   }
3147
3148   if (listen_fds_num < 1)
3149   {
3150     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
3151     goto error;
3152   }
3153
3154   if (!stay_foreground)
3155   {
3156     pid_t child;
3157
3158     child = fork ();
3159     if (child < 0)
3160     {
3161       fprintf (stderr, "daemonize: fork(2) failed.\n");
3162       goto error;
3163     }
3164     else if (child > 0)
3165       exit(0);
3166
3167     /* Become session leader */
3168     setsid ();
3169
3170     /* Open the first three file descriptors to /dev/null */
3171     close (2);
3172     close (1);
3173     close (0);
3174
3175     open ("/dev/null", O_RDWR);
3176     if (dup(0) == -1 || dup(0) == -1){
3177         RRDD_LOG (LOG_ERR, "faild to run dup.\n");
3178     }
3179   } /* if (!stay_foreground) */
3180
3181   /* Change into the /tmp directory. */
3182   base_dir = (config_base_dir != NULL)
3183     ? config_base_dir
3184     : "/tmp";
3185
3186   if (chdir (base_dir) != 0)
3187   {
3188     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
3189     goto error;
3190   }
3191
3192   install_signal_handlers();
3193
3194   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
3195   RRDD_LOG(LOG_INFO, "starting up");
3196
3197   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
3198                                 (GDestroyNotify) free_cache_item);
3199   if (cache_tree == NULL)
3200   {
3201     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
3202     goto error;
3203   }
3204
3205   return write_pidfile (pid_fd);
3206
3207 error:
3208   remove_pidfile();
3209   return -1;
3210 } /* }}} int daemonize */
3211
3212 static int cleanup (void) /* {{{ */
3213 {
3214   pthread_cond_broadcast (&flush_cond);
3215   pthread_join (flush_thread, NULL);
3216
3217   pthread_cond_broadcast (&queue_cond);
3218   for (int i = 0; i < config_queue_threads; i++)
3219     pthread_join (queue_threads[i], NULL);
3220
3221   if (config_flush_at_shutdown)
3222   {
3223     assert(cache_queue_head == NULL);
3224     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
3225   }
3226
3227   free(queue_threads);
3228   free(config_base_dir);
3229
3230   pthread_mutex_lock(&cache_lock);
3231   g_tree_destroy(cache_tree);
3232
3233   pthread_mutex_lock(&journal_lock);
3234   journal_done();
3235
3236   RRDD_LOG(LOG_INFO, "goodbye");
3237   closelog ();
3238
3239   remove_pidfile ();
3240   free(config_pid_file);
3241
3242   return (0);
3243 } /* }}} int cleanup */
3244
3245 static int read_options (int argc, char **argv) /* {{{ */
3246 {
3247   int option;
3248   int status = 0;
3249
3250   socket_permission_clear (&default_socket);
3251
3252   default_socket.socket_group = (gid_t)-1;
3253   default_socket.socket_permissions = (mode_t)-1;
3254
3255   while ((option = getopt(argc, argv, "Ogl:s:m:P:f:w:z:t:Bb:p:Fj:a:h?")) != -1)
3256   {
3257     switch (option)
3258     {
3259       case 'O':
3260         opt_no_overwrite = 1;
3261         break;
3262
3263       case 'g':
3264         stay_foreground=1;
3265         break;
3266
3267       case 'l':
3268       {
3269         listen_socket_t *new;
3270
3271         new = malloc(sizeof(listen_socket_t));
3272         if (new == NULL)
3273         {
3274           fprintf(stderr, "read_options: malloc failed.\n");
3275           return(2);
3276         }
3277         memset(new, 0, sizeof(listen_socket_t));
3278
3279         strncpy(new->addr, optarg, sizeof(new->addr)-1);
3280
3281         /* Add permissions to the socket {{{ */
3282         if (default_socket.permissions != 0)
3283         {
3284           socket_permission_copy (new, &default_socket);
3285         }
3286         else /* if (default_socket.permissions == 0) */
3287         {
3288           /* Add permission for ALL commands to the socket. */
3289           socket_permission_set_all (new);
3290         }
3291         /* }}} Done adding permissions. */
3292
3293         new->socket_group = default_socket.socket_group;
3294         new->socket_permissions = default_socket.socket_permissions;
3295
3296         if (!rrd_add_ptr((void ***)&config_listen_address_list,
3297                          &config_listen_address_list_len, new))
3298         {
3299           fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
3300           return (2);
3301         }
3302       }
3303       break;
3304
3305       /* set socket group permissions */
3306       case 's':
3307       {
3308         gid_t group_gid;
3309         struct group *grp;
3310
3311         group_gid = strtoul(optarg, NULL, 10);
3312         if (errno != EINVAL && group_gid>0)
3313         {
3314           /* we were passed a number */
3315           grp = getgrgid(group_gid);
3316         }
3317         else
3318         {
3319           grp = getgrnam(optarg);
3320         }
3321
3322         if (grp)
3323         {
3324           default_socket.socket_group = grp->gr_gid;
3325         }
3326         else
3327         {
3328           /* no idea what the user wanted... */
3329           fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
3330           return (5);
3331         }
3332       }
3333       break;
3334
3335       /* set socket file permissions */
3336       case 'm':
3337       {
3338         long  tmp;
3339         char *endptr = NULL;
3340
3341         tmp = strtol (optarg, &endptr, 8);
3342         if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
3343             || (tmp > 07777) || (tmp < 0)) {
3344           fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
3345               optarg);
3346           return (5);
3347         }
3348
3349         default_socket.socket_permissions = (mode_t)tmp;
3350       }
3351       break;
3352
3353       case 'P':
3354       {
3355         char *optcopy;
3356         char *saveptr;
3357         char *dummy;
3358         char *ptr;
3359
3360         socket_permission_clear (&default_socket);
3361
3362         optcopy = strdup (optarg);
3363         dummy = optcopy;
3364         saveptr = NULL;
3365         while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
3366         {
3367           dummy = NULL;
3368           status = socket_permission_add (&default_socket, ptr);
3369           if (status != 0)
3370           {
3371             fprintf (stderr, "read_options: Adding permission \"%s\" to "
3372                 "socket failed. Most likely, this permission doesn't "
3373                 "exist. Check your command line.\n", ptr);
3374             status = 4;
3375           }
3376         }
3377
3378         free (optcopy);
3379       }
3380       break;
3381
3382       case 'f':
3383       {
3384         int temp;
3385
3386         temp = atoi (optarg);
3387         if (temp > 0)
3388           config_flush_interval = temp;
3389         else
3390         {
3391           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
3392           status = 3;
3393         }
3394       }
3395       break;
3396
3397       case 'w':
3398       {
3399         int temp;
3400
3401         temp = atoi (optarg);
3402         if (temp > 0)
3403           config_write_interval = temp;
3404         else
3405         {
3406           fprintf (stderr, "Invalid write interval: %s\n", optarg);
3407           status = 2;
3408         }
3409       }
3410       break;
3411
3412       case 'z':
3413       {
3414         int temp;
3415
3416         temp = atoi(optarg);
3417         if (temp > 0)
3418           config_write_jitter = temp;
3419         else
3420         {
3421           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
3422           status = 2;
3423         }
3424
3425         break;
3426       }
3427
3428       case 't':
3429       {
3430         int threads;
3431         threads = atoi(optarg);
3432         if (threads >= 1)
3433           config_queue_threads = threads;
3434         else
3435         {
3436           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
3437           return 1;
3438         }
3439       }
3440       break;
3441
3442       case 'B':
3443         config_write_base_only = 1;
3444         break;
3445
3446       case 'b':
3447       {
3448         size_t len;
3449         char base_realpath[PATH_MAX];
3450
3451         if (config_base_dir != NULL)
3452           free (config_base_dir);
3453         config_base_dir = strdup (optarg);
3454         if (config_base_dir == NULL)
3455         {
3456           fprintf (stderr, "read_options: strdup failed.\n");
3457           return (3);
3458         }
3459
3460         if (rrd_mkdir_p (config_base_dir, 0777) != 0)
3461         {
3462           fprintf (stderr, "Failed to create base directory '%s': %s\n",
3463               config_base_dir, rrd_strerror (errno));
3464           return (3);
3465         }
3466
3467         /* make sure that the base directory is not resolved via
3468          * symbolic links.  this makes some performance-enhancing
3469          * assumptions possible (we don't have to resolve paths
3470          * that start with a "/")
3471          */
3472         if (realpath(config_base_dir, base_realpath) == NULL)
3473         {
3474           fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3475               "%s\n", config_base_dir, rrd_strerror(errno));
3476           return 5;
3477         }
3478
3479         len = strlen (config_base_dir);
3480         while ((len > 0) && (config_base_dir[len - 1] == '/'))
3481         {
3482           config_base_dir[len - 1] = 0;
3483           len--;
3484         }
3485
3486         if (len < 1)
3487         {
3488           fprintf (stderr, "Invalid base directory: %s\n", optarg);
3489           return (4);
3490         }
3491
3492         _config_base_dir_len = len;
3493
3494         len = strlen (base_realpath);
3495         while ((len > 0) && (base_realpath[len - 1] == '/'))
3496         {
3497           base_realpath[len - 1] = '\0';
3498           len--;
3499         }
3500
3501         if (strncmp(config_base_dir,
3502                          base_realpath, sizeof(base_realpath)) != 0)
3503         {
3504           fprintf(stderr,
3505                   "Base directory (-b) resolved via file system links!\n"
3506                   "Please consult rrdcached '-b' documentation!\n"
3507                   "Consider specifying the real directory (%s)\n",
3508                   base_realpath);
3509           return 5;
3510         }
3511       }
3512       break;
3513
3514       case 'p':
3515       {
3516         if (config_pid_file != NULL)
3517           free (config_pid_file);
3518         config_pid_file = strdup (optarg);
3519         if (config_pid_file == NULL)
3520         {
3521           fprintf (stderr, "read_options: strdup failed.\n");
3522           return (3);
3523         }
3524       }
3525       break;
3526
3527       case 'F':
3528         config_flush_at_shutdown = 1;
3529         break;
3530
3531       case 'j':
3532       {
3533         char journal_dir_actual[PATH_MAX];
3534         journal_dir = realpath((const char *)optarg, journal_dir_actual);
3535         if (journal_dir)
3536         {
3537           // if we were able to properly resolve the path, lets have a copy
3538           // for use outside this block.
3539           journal_dir = strdup(journal_dir);           
3540           status = rrd_mkdir_p(journal_dir, 0777);
3541           if (status != 0)
3542           {
3543             fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3544                     journal_dir, rrd_strerror(errno));
3545             return 6;
3546           }
3547           if (access(journal_dir, R_OK|W_OK|X_OK) != 0)
3548           {
3549             fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3550                     errno ? rrd_strerror(errno) : "");
3551             return 6;
3552           }
3553         } else {
3554           fprintf(stderr, "Unable to resolve journal path (%s,%s)\n", optarg,
3555                   errno ? rrd_strerror(errno) : "");
3556           return 6;
3557         }
3558       }
3559       break;
3560
3561       case 'a':
3562       {
3563         int temp = atoi(optarg);
3564         if (temp > 0)
3565           config_alloc_chunk = temp;
3566         else
3567         {
3568           fprintf(stderr, "Invalid allocation size: %s\n", optarg);
3569           return 10;
3570         }
3571       }
3572       break;
3573
3574       case 'h':
3575       case '?':
3576         printf ("RRDCacheD %s\n"
3577             "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3578             "\n"
3579             "Usage: rrdcached [options]\n"
3580             "\n"
3581             "Valid options are:\n"
3582             "  -l <address>  Socket address to listen to.\n"
3583             "                Default: "RRDCACHED_DEFAULT_ADDRESS"\n"
3584             "  -P <perms>    Sets the permissions to assign to all following "
3585                             "sockets\n"
3586             "  -w <seconds>  Interval in which to write data.\n"
3587             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
3588             "  -t <threads>  Number of write threads.\n"
3589             "  -f <seconds>  Interval in which to flush dead data.\n"
3590             "  -p <file>     Location of the PID-file.\n"
3591             "  -b <dir>      Base directory to change to.\n"
3592             "  -B            Restrict file access to paths within -b <dir>\n"
3593             "  -g            Do not fork and run in the foreground.\n"
3594             "  -j <dir>      Directory in which to create the journal files.\n"
3595             "  -F            Always flush all updates at shutdown\n"
3596             "  -s <id|name>  Group owner of all following UNIX sockets\n"
3597             "                (the socket will also have read/write permissions "
3598                             "for that group)\n"
3599             "  -m <mode>     File permissions (octal) of all following UNIX "
3600                             "sockets\n"
3601             "  -a <size>     Memory allocation chunk size. Default is 1.\n"
3602             "  -O            Do not allow CREATE commands to overwrite existing\n"
3603             "                files, even if asked to.\n"
3604             "\n"
3605             "For more information and a detailed description of all options "
3606             "please refer\n"
3607             "to the rrdcached(1) manual page.\n",
3608             VERSION);
3609         if (option == 'h')
3610           status = -1;
3611         else
3612           status = 1;
3613         break;
3614     } /* switch (option) */
3615   } /* while (getopt) */
3616
3617   /* advise the user when values are not sane */
3618   if (config_flush_interval < 2 * config_write_interval)
3619     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3620             " 2x write interval (-w) !\n");
3621   if (config_write_jitter > config_write_interval)
3622     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3623             " write interval (-w) !\n");
3624
3625   if (config_write_base_only && config_base_dir == NULL)
3626     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3627             "  Consult the rrdcached documentation\n");
3628
3629   if (journal_dir == NULL)
3630     config_flush_at_shutdown = 1;
3631
3632   return (status);
3633 } /* }}} int read_options */
3634
3635 int main (int argc, char **argv)
3636 {
3637   int status;
3638
3639   status = read_options (argc, argv);
3640   if (status != 0)
3641   {
3642     if (status < 0)
3643       status = 0;
3644     return (status);
3645   }
3646
3647   status = daemonize ();
3648   if (status != 0)
3649   {
3650     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3651     return (1);
3652   }
3653
3654   journal_init();
3655
3656   /* start the queue threads */
3657   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3658   if (queue_threads == NULL)
3659   {
3660     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3661     cleanup();
3662     return (1);
3663   }
3664   for (int i = 0; i < config_queue_threads; i++)
3665   {
3666     memset (&queue_threads[i], 0, sizeof (*queue_threads));
3667     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3668     if (status != 0)
3669     {
3670       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3671       cleanup();
3672       return (1);
3673     }
3674   }
3675
3676   /* start the flush thread */
3677   memset(&flush_thread, 0, sizeof(flush_thread));
3678   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3679   if (status != 0)
3680   {
3681     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3682     cleanup();
3683     return (1);
3684   }
3685
3686   listen_thread_main (NULL);
3687   cleanup ();
3688
3689   return (0);
3690 } /* int main */
3691
3692 /*
3693  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3694  */