add extra space to LDFLAGS when building ruby bindings ... otherwhise this will break...
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008-2010 Florian octo Forster
4  * Copyright (C) 2008,2009 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
73
74 #include "rrd_tool.h"
75 #include "rrd_client.h"
76 #include "unused.h"
77
78 #include <stdlib.h>
79
80 #ifndef WIN32
81 #ifdef HAVE_STDINT_H
82 #  include <stdint.h>
83 #endif
84 #include <unistd.h>
85 #include <strings.h>
86 #include <inttypes.h>
87 #include <sys/socket.h>
88
89 #else
90
91 #endif
92 #include <stdio.h>
93 #include <string.h>
94
95 #include <sys/types.h>
96 #include <sys/stat.h>
97 #include <dirent.h>
98 #include <fcntl.h>
99 #include <signal.h>
100 #include <sys/un.h>
101 #include <netdb.h>
102 #include <poll.h>
103 #include <syslog.h>
104 #include <pthread.h>
105 #include <errno.h>
106 #include <assert.h>
107 #include <sys/time.h>
108 #include <time.h>
109 #include <libgen.h>
110 #include <grp.h>
111
112 #ifdef HAVE_LIBWRAP
113 #include <tcpd.h>
114 #endif /* HAVE_LIBWRAP */
115
116 #include <glib-2.0/glib.h>
117 /* }}} */
118
119 #define RRDD_LOG(severity, ...) \
120   do { \
121     if (stay_foreground) { \
122       fprintf(stderr, __VA_ARGS__); \
123       fprintf(stderr, "\n"); } \
124     syslog ((severity), __VA_ARGS__); \
125   } while (0)
126
127 /*
128  * Types
129  */
130 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
131
132 struct listen_socket_s
133 {
134   int fd;
135   char addr[PATH_MAX + 1];
136   int family;
137
138   /* state for BATCH processing */
139   time_t batch_start;
140   int batch_cmd;
141
142   /* buffered IO */
143   char *rbuf;
144   off_t next_cmd;
145   off_t next_read;
146
147   char *wbuf;
148   ssize_t wbuf_len;
149
150   uint32_t permissions;
151
152   gid_t  socket_group;
153   mode_t socket_permissions;
154 };
155 typedef struct listen_socket_s listen_socket_t;
156
157 struct command_s;
158 typedef struct command_s command_t;
159 /* note: guard against "unused" warnings in the handlers */
160 #define DISPATCH_PROTO  listen_socket_t UNUSED(*sock),\
161                         time_t UNUSED(now),\
162                         char  UNUSED(*buffer),\
163                         size_t UNUSED(buffer_size)
164
165 #define HANDLER_PROTO   command_t UNUSED(*cmd),\
166                         DISPATCH_PROTO
167
168 struct command_s {
169   char   *cmd;
170   int (*handler)(HANDLER_PROTO);
171
172   char  context;                /* where we expect to see it */
173 #define CMD_CONTEXT_CLIENT      (1<<0)
174 #define CMD_CONTEXT_BATCH       (1<<1)
175 #define CMD_CONTEXT_JOURNAL     (1<<2)
176 #define CMD_CONTEXT_ANY         (0x7f)
177
178   char *syntax;
179   char *help;
180 };
181
182 struct cache_item_s;
183 typedef struct cache_item_s cache_item_t;
184 struct cache_item_s
185 {
186   char *file;
187   char **values;
188   size_t values_num;            /* number of valid pointers */
189   size_t values_alloc;          /* number of allocated pointers */
190   time_t last_flush_time;
191   time_t last_update_stamp;
192 #define CI_FLAGS_IN_TREE  (1<<0)
193 #define CI_FLAGS_IN_QUEUE (1<<1)
194   int flags;
195   pthread_cond_t  flushed;
196   cache_item_t *prev;
197   cache_item_t *next;
198 };
199
200 struct callback_flush_data_s
201 {
202   time_t now;
203   time_t abs_timeout;
204   char **keys;
205   size_t keys_num;
206 };
207 typedef struct callback_flush_data_s callback_flush_data_t;
208
209 enum queue_side_e
210 {
211   HEAD,
212   TAIL
213 };
214 typedef enum queue_side_e queue_side_t;
215
216 /* describe a set of journal files */
217 typedef struct {
218   char **files;
219   size_t files_num;
220 } journal_set;
221
222 /* max length of socket command or response */
223 #define CMD_MAX 4096
224 #define RBUF_SIZE (CMD_MAX*2)
225
226 /*
227  * Variables
228  */
229 static int stay_foreground = 0;
230 static uid_t daemon_uid;
231
232 static listen_socket_t *listen_fds = NULL;
233 static size_t listen_fds_num = 0;
234
235 static listen_socket_t default_socket;
236
237 enum {
238   RUNNING,              /* normal operation */
239   FLUSHING,             /* flushing remaining values */
240   SHUTDOWN              /* shutting down */
241 } state = RUNNING;
242
243 static pthread_t *queue_threads;
244 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
245 static int config_queue_threads = 4;
246
247 static pthread_t flush_thread;
248 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
249
250 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
251 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
252 static int connection_threads_num = 0;
253
254 /* Cache stuff */
255 static GTree          *cache_tree = NULL;
256 static cache_item_t   *cache_queue_head = NULL;
257 static cache_item_t   *cache_queue_tail = NULL;
258 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
259
260 static int config_write_interval = 300;
261 static int config_write_jitter   = 0;
262 static int config_flush_interval = 3600;
263 static int config_flush_at_shutdown = 0;
264 static char *config_pid_file = NULL;
265 static char *config_base_dir = NULL;
266 static size_t _config_base_dir_len = 0;
267 static int config_write_base_only = 0;
268 static size_t config_alloc_chunk = 1;
269
270 static listen_socket_t **config_listen_address_list = NULL;
271 static size_t config_listen_address_list_len = 0;
272
273 static uint64_t stats_queue_length = 0;
274 static uint64_t stats_updates_received = 0;
275 static uint64_t stats_flush_received = 0;
276 static uint64_t stats_updates_written = 0;
277 static uint64_t stats_data_sets_written = 0;
278 static uint64_t stats_journal_bytes = 0;
279 static uint64_t stats_journal_rotate = 0;
280 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
281
282 static int opt_no_overwrite = 0; /* default for the daemon */
283
284 /* Journaled updates */
285 #define JOURNAL_REPLAY(s) ((s) == NULL)
286 #define JOURNAL_BASE "rrd.journal"
287 static journal_set *journal_cur = NULL;
288 static journal_set *journal_old = NULL;
289 static char *journal_dir = NULL;
290 static FILE *journal_fh = NULL;         /* current journal file handle */
291 static long  journal_size = 0;          /* current journal size */
292 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
293 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
294 static int journal_write(char *cmd, char *args);
295 static void journal_done(void);
296 static void journal_rotate(void);
297
298 /* prototypes for forward refernces */
299 static int handle_request_help (HANDLER_PROTO);
300
301 /* 
302  * Functions
303  */
304 static void sig_common (const char *sig) /* {{{ */
305 {
306   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
307   state = FLUSHING;
308   pthread_cond_broadcast(&flush_cond);
309   pthread_cond_broadcast(&queue_cond);
310 } /* }}} void sig_common */
311
312 static void sig_int_handler (int UNUSED(s)) /* {{{ */
313 {
314   sig_common("INT");
315 } /* }}} void sig_int_handler */
316
317 static void sig_term_handler (int UNUSED(s)) /* {{{ */
318 {
319   sig_common("TERM");
320 } /* }}} void sig_term_handler */
321
322 static void sig_usr1_handler (int UNUSED(s)) /* {{{ */
323 {
324   config_flush_at_shutdown = 1;
325   sig_common("USR1");
326 } /* }}} void sig_usr1_handler */
327
328 static void sig_usr2_handler (int UNUSED(s)) /* {{{ */
329 {
330   config_flush_at_shutdown = 0;
331   sig_common("USR2");
332 } /* }}} void sig_usr2_handler */
333
334 static void install_signal_handlers(void) /* {{{ */
335 {
336   /* These structures are static, because `sigaction' behaves weird if the are
337    * overwritten.. */
338   static struct sigaction sa_int;
339   static struct sigaction sa_term;
340   static struct sigaction sa_pipe;
341   static struct sigaction sa_usr1;
342   static struct sigaction sa_usr2;
343
344   /* Install signal handlers */
345   memset (&sa_int, 0, sizeof (sa_int));
346   sa_int.sa_handler = sig_int_handler;
347   sigaction (SIGINT, &sa_int, NULL);
348
349   memset (&sa_term, 0, sizeof (sa_term));
350   sa_term.sa_handler = sig_term_handler;
351   sigaction (SIGTERM, &sa_term, NULL);
352
353   memset (&sa_pipe, 0, sizeof (sa_pipe));
354   sa_pipe.sa_handler = SIG_IGN;
355   sigaction (SIGPIPE, &sa_pipe, NULL);
356
357   memset (&sa_pipe, 0, sizeof (sa_usr1));
358   sa_usr1.sa_handler = sig_usr1_handler;
359   sigaction (SIGUSR1, &sa_usr1, NULL);
360
361   memset (&sa_usr2, 0, sizeof (sa_usr2));
362   sa_usr2.sa_handler = sig_usr2_handler;
363   sigaction (SIGUSR2, &sa_usr2, NULL);
364
365 } /* }}} void install_signal_handlers */
366
367 static int open_pidfile(char *action, int oflag) /* {{{ */
368 {
369   int fd;
370   const char *file;
371   char *file_copy, *dir;
372
373   file = (config_pid_file != NULL)
374     ? config_pid_file
375     : LOCALSTATEDIR "/run/rrdcached.pid";
376
377   /* dirname may modify its argument */
378   file_copy = strdup(file);
379   if (file_copy == NULL)
380   {
381     fprintf(stderr, "rrdcached: strdup(): %s\n",
382         rrd_strerror(errno));
383     return -1;
384   }
385
386   dir = dirname(file_copy);
387   if (rrd_mkdir_p(dir, 0777) != 0)
388   {
389     fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
390         dir, rrd_strerror(errno));
391     return -1;
392   }
393
394   free(file_copy);
395
396   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
397   if (fd < 0)
398     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
399             action, file, rrd_strerror(errno));
400
401   return(fd);
402 } /* }}} static int open_pidfile */
403
404 /* check existing pid file to see whether a daemon is running */
405 static int check_pidfile(void)
406 {
407   int pid_fd;
408   pid_t pid;
409   char pid_str[16];
410
411   pid_fd = open_pidfile("open", O_RDWR);
412   if (pid_fd < 0)
413     return pid_fd;
414
415   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
416     return -1;
417
418   pid = atoi(pid_str);
419   if (pid <= 0)
420     return -1;
421
422   /* another running process that we can signal COULD be
423    * a competing rrdcached */
424   if (pid != getpid() && kill(pid, 0) == 0)
425   {
426     fprintf(stderr,
427             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
428     close(pid_fd);
429     return -1;
430   }
431
432   lseek(pid_fd, 0, SEEK_SET);
433   if (ftruncate(pid_fd, 0) == -1)
434   {
435     fprintf(stderr,
436             "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
437     close(pid_fd);
438     return -1;
439   }
440
441   fprintf(stderr,
442           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
443           "rrdcached: starting normally.\n", pid);
444
445   return pid_fd;
446 } /* }}} static int check_pidfile */
447
448 static int write_pidfile (int fd) /* {{{ */
449 {
450   pid_t pid;
451   FILE *fh;
452
453   pid = getpid ();
454
455   fh = fdopen (fd, "w");
456   if (fh == NULL)
457   {
458     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
459     close(fd);
460     return (-1);
461   }
462
463   fprintf (fh, "%i\n", (int) pid);
464   fclose (fh);
465
466   return (0);
467 } /* }}} int write_pidfile */
468
469 static int remove_pidfile (void) /* {{{ */
470 {
471   char *file;
472   int status;
473
474   file = (config_pid_file != NULL)
475     ? config_pid_file
476     : LOCALSTATEDIR "/run/rrdcached.pid";
477
478   status = unlink (file);
479   if (status == 0)
480     return (0);
481   return (errno);
482 } /* }}} int remove_pidfile */
483
484 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
485 {
486   char *eol;
487
488   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
489                sock->next_read - sock->next_cmd);
490
491   if (eol == NULL)
492   {
493     /* no commands left, move remainder back to front of rbuf */
494     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
495             sock->next_read - sock->next_cmd);
496     sock->next_read -= sock->next_cmd;
497     sock->next_cmd = 0;
498     *len = 0;
499     return NULL;
500   }
501   else
502   {
503     char *cmd = sock->rbuf + sock->next_cmd;
504     *eol = '\0';
505
506     sock->next_cmd = eol - sock->rbuf + 1;
507
508     if (eol > sock->rbuf && *(eol-1) == '\r')
509       *(--eol) = '\0'; /* handle "\r\n" EOL */
510
511     *len = eol - cmd;
512
513     return cmd;
514   }
515
516   /* NOTREACHED */
517   assert(1==0);
518 } /* }}} char *next_cmd */
519
520 /* add the characters directly to the write buffer */
521 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
522 {
523   char *new_buf;
524
525   assert(sock != NULL);
526
527   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
528   if (new_buf == NULL)
529   {
530     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
531     return -1;
532   }
533
534   strncpy(new_buf + sock->wbuf_len, str, len + 1);
535
536   sock->wbuf = new_buf;
537   sock->wbuf_len += len;
538
539   return 0;
540 } /* }}} static int add_to_wbuf */
541
542 /* add the text to the "extra" info that's sent after the status line */
543 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
544 {
545   va_list argp;
546   char buffer[CMD_MAX];
547   int len;
548
549   if (JOURNAL_REPLAY(sock)) return 0;
550   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
551
552   va_start(argp, fmt);
553 #ifdef HAVE_VSNPRINTF
554   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
555 #else
556   len = vsprintf(buffer, fmt, argp);
557 #endif
558   va_end(argp);
559   if (len < 0)
560   {
561     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
562     return -1;
563   }
564
565   return add_to_wbuf(sock, buffer, len);
566 } /* }}} static int add_response_info */
567
568 static int count_lines(char *str) /* {{{ */
569 {
570   int lines = 0;
571
572   if (str != NULL)
573   {
574     while ((str = strchr(str, '\n')) != NULL)
575     {
576       ++lines;
577       ++str;
578     }
579   }
580
581   return lines;
582 } /* }}} static int count_lines */
583
584 /* send the response back to the user.
585  * returns 0 on success, -1 on error
586  * write buffer is always zeroed after this call */
587 static int send_response (listen_socket_t *sock, response_code rc,
588                           char *fmt, ...) /* {{{ */
589 {
590   va_list argp;
591   char buffer[CMD_MAX];
592   int lines;
593   ssize_t wrote;
594   int rclen, len;
595
596   if (JOURNAL_REPLAY(sock)) return rc;
597
598   if (sock->batch_start)
599   {
600     if (rc == RESP_OK)
601       return rc; /* no response on success during BATCH */
602     lines = sock->batch_cmd;
603   }
604   else if (rc == RESP_OK)
605     lines = count_lines(sock->wbuf);
606   else
607     lines = -1;
608
609   rclen = sprintf(buffer, "%d ", lines);
610   va_start(argp, fmt);
611 #ifdef HAVE_VSNPRINTF
612   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
613 #else
614   len = vsprintf(buffer+rclen, fmt, argp);
615 #endif
616   va_end(argp);
617   if (len < 0)
618     return -1;
619
620   len += rclen;
621
622   /* append the result to the wbuf, don't write to the user */
623   if (sock->batch_start)
624     return add_to_wbuf(sock, buffer, len);
625
626   /* first write must be complete */
627   if (len != write(sock->fd, buffer, len))
628   {
629     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
630     return -1;
631   }
632
633   if (sock->wbuf != NULL && rc == RESP_OK)
634   {
635     wrote = 0;
636     while (wrote < sock->wbuf_len)
637     {
638       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
639       if (wb <= 0)
640       {
641         RRDD_LOG(LOG_INFO, "send_response: could not write results");
642         return -1;
643       }
644       wrote += wb;
645     }
646   }
647
648   free(sock->wbuf); sock->wbuf = NULL;
649   sock->wbuf_len = 0;
650
651   return 0;
652 } /* }}} */
653
654 static void wipe_ci_values(cache_item_t *ci, time_t when)
655 {
656   ci->values = NULL;
657   ci->values_num = 0;
658   ci->values_alloc = 0;
659
660   ci->last_flush_time = when;
661   if (config_write_jitter > 0)
662     ci->last_flush_time += (rrd_random() % config_write_jitter);
663 }
664
665 /* remove_from_queue
666  * remove a "cache_item_t" item from the queue.
667  * must hold 'cache_lock' when calling this
668  */
669 static void remove_from_queue(cache_item_t *ci) /* {{{ */
670 {
671   if (ci == NULL) return;
672   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
673
674   if (ci->prev == NULL)
675     cache_queue_head = ci->next; /* reset head */
676   else
677     ci->prev->next = ci->next;
678
679   if (ci->next == NULL)
680     cache_queue_tail = ci->prev; /* reset the tail */
681   else
682     ci->next->prev = ci->prev;
683
684   ci->next = ci->prev = NULL;
685   ci->flags &= ~CI_FLAGS_IN_QUEUE;
686
687   pthread_mutex_lock (&stats_lock);
688   assert (stats_queue_length > 0);
689   stats_queue_length--;
690   pthread_mutex_unlock (&stats_lock);
691
692 } /* }}} static void remove_from_queue */
693
694 /* free the resources associated with the cache_item_t
695  * must hold cache_lock when calling this function
696  */
697 static void *free_cache_item(cache_item_t *ci) /* {{{ */
698 {
699   if (ci == NULL) return NULL;
700
701   remove_from_queue(ci);
702
703   for (size_t i=0; i < ci->values_num; i++)
704     free(ci->values[i]);
705
706   free (ci->values);
707   free (ci->file);
708
709   /* in case anyone is waiting */
710   pthread_cond_broadcast(&ci->flushed);
711   pthread_cond_destroy(&ci->flushed);
712
713   free (ci);
714
715   return NULL;
716 } /* }}} static void *free_cache_item */
717
718 /*
719  * enqueue_cache_item:
720  * `cache_lock' must be acquired before calling this function!
721  */
722 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
723     queue_side_t side)
724 {
725   if (ci == NULL)
726     return (-1);
727
728   if (ci->values_num == 0)
729     return (0);
730
731   if (side == HEAD)
732   {
733     if (cache_queue_head == ci)
734       return 0;
735
736     /* remove if further down in queue */
737     remove_from_queue(ci);
738
739     ci->prev = NULL;
740     ci->next = cache_queue_head;
741     if (ci->next != NULL)
742       ci->next->prev = ci;
743     cache_queue_head = ci;
744
745     if (cache_queue_tail == NULL)
746       cache_queue_tail = cache_queue_head;
747   }
748   else /* (side == TAIL) */
749   {
750     /* We don't move values back in the list.. */
751     if (ci->flags & CI_FLAGS_IN_QUEUE)
752       return (0);
753
754     assert (ci->next == NULL);
755     assert (ci->prev == NULL);
756
757     ci->prev = cache_queue_tail;
758
759     if (cache_queue_tail == NULL)
760       cache_queue_head = ci;
761     else
762       cache_queue_tail->next = ci;
763
764     cache_queue_tail = ci;
765   }
766
767   ci->flags |= CI_FLAGS_IN_QUEUE;
768
769   pthread_cond_signal(&queue_cond);
770   pthread_mutex_lock (&stats_lock);
771   stats_queue_length++;
772   pthread_mutex_unlock (&stats_lock);
773
774   return (0);
775 } /* }}} int enqueue_cache_item */
776
777 /*
778  * tree_callback_flush:
779  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
780  * while this is in progress.
781  */
782 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
783     gpointer data)
784 {
785   cache_item_t *ci;
786   callback_flush_data_t *cfd;
787
788   ci = (cache_item_t *) value;
789   cfd = (callback_flush_data_t *) data;
790
791   if (ci->flags & CI_FLAGS_IN_QUEUE)
792     return FALSE;
793
794   if (ci->values_num > 0
795       && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
796   {
797     enqueue_cache_item (ci, TAIL);
798   }
799   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
800       && (ci->values_num <= 0))
801   {
802     assert ((char *) key == ci->file);
803     if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
804     {
805       RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
806       return (FALSE);
807     }
808   }
809
810   return (FALSE);
811 } /* }}} gboolean tree_callback_flush */
812
813 static int flush_old_values (int max_age)
814 {
815   callback_flush_data_t cfd;
816   size_t k;
817
818   memset (&cfd, 0, sizeof (cfd));
819   /* Pass the current time as user data so that we don't need to call
820    * `time' for each node. */
821   cfd.now = time (NULL);
822   cfd.keys = NULL;
823   cfd.keys_num = 0;
824
825   if (max_age > 0)
826     cfd.abs_timeout = cfd.now - max_age;
827   else
828     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
829
830   /* `tree_callback_flush' will return the keys of all values that haven't
831    * been touched in the last `config_flush_interval' seconds in `cfd'.
832    * The char*'s in this array point to the same memory as ci->file, so we
833    * don't need to free them separately. */
834   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
835
836   for (k = 0; k < cfd.keys_num; k++)
837   {
838     gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
839     /* should never fail, since we have held the cache_lock
840      * the entire time */
841     assert(status == TRUE);
842   }
843
844   if (cfd.keys != NULL)
845   {
846     free (cfd.keys);
847     cfd.keys = NULL;
848   }
849
850   return (0);
851 } /* int flush_old_values */
852
853 static void *flush_thread_main (void UNUSED(*args)) /* {{{ */
854 {
855   struct timeval now;
856   struct timespec next_flush;
857   int status;
858
859   gettimeofday (&now, NULL);
860   next_flush.tv_sec = now.tv_sec + config_flush_interval;
861   next_flush.tv_nsec = 1000 * now.tv_usec;
862
863   pthread_mutex_lock(&cache_lock);
864
865   while (state == RUNNING)
866   {
867     gettimeofday (&now, NULL);
868     if ((now.tv_sec > next_flush.tv_sec)
869         || ((now.tv_sec == next_flush.tv_sec)
870           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
871     {
872       RRDD_LOG(LOG_DEBUG, "flushing old values");
873
874       /* Determine the time of the next cache flush. */
875       next_flush.tv_sec = now.tv_sec + config_flush_interval;
876
877       /* Flush all values that haven't been written in the last
878        * `config_write_interval' seconds. */
879       flush_old_values (config_write_interval);
880
881       /* unlock the cache while we rotate so we don't block incoming
882        * updates if the fsync() blocks on disk I/O */
883       pthread_mutex_unlock(&cache_lock);
884       journal_rotate();
885       pthread_mutex_lock(&cache_lock);
886     }
887
888     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
889     if (status != 0 && status != ETIMEDOUT)
890     {
891       RRDD_LOG (LOG_ERR, "flush_thread_main: "
892                 "pthread_cond_timedwait returned %i.", status);
893     }
894   }
895
896   if (config_flush_at_shutdown)
897     flush_old_values (-1); /* flush everything */
898
899   state = SHUTDOWN;
900
901   pthread_mutex_unlock(&cache_lock);
902
903   return NULL;
904 } /* void *flush_thread_main */
905
906 static void *queue_thread_main (void UNUSED(*args)) /* {{{ */
907 {
908   pthread_mutex_lock (&cache_lock);
909
910   while (state != SHUTDOWN
911          || (cache_queue_head != NULL && config_flush_at_shutdown))
912   {
913     cache_item_t *ci;
914     char *file;
915     char **values;
916     size_t values_num;
917     int status;
918
919     /* Now, check if there's something to store away. If not, wait until
920      * something comes in. */
921     if (cache_queue_head == NULL)
922     {
923       status = pthread_cond_wait (&queue_cond, &cache_lock);
924       if ((status != 0) && (status != ETIMEDOUT))
925       {
926         RRDD_LOG (LOG_ERR, "queue_thread_main: "
927             "pthread_cond_wait returned %i.", status);
928       }
929     }
930
931     /* Check if a value has arrived. This may be NULL if we timed out or there
932      * was an interrupt such as a signal. */
933     if (cache_queue_head == NULL)
934       continue;
935
936     ci = cache_queue_head;
937
938     /* copy the relevant parts */
939     file = strdup (ci->file);
940     if (file == NULL)
941     {
942       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
943       continue;
944     }
945
946     assert(ci->values != NULL);
947     assert(ci->values_num > 0);
948
949     values = ci->values;
950     values_num = ci->values_num;
951
952     wipe_ci_values(ci, time(NULL));
953     remove_from_queue(ci);
954
955     pthread_mutex_unlock (&cache_lock);
956
957     rrd_clear_error ();
958     status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
959     if (status != 0)
960     {
961       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
962           "rrd_update_r (%s) failed with status %i. (%s)",
963           file, status, rrd_get_error());
964     }
965
966     journal_write("wrote", file);
967
968     /* Search again in the tree.  It's possible someone issued a "FORGET"
969      * while we were writing the update values. */
970     pthread_mutex_lock(&cache_lock);
971     ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
972     if (ci)
973       pthread_cond_broadcast(&ci->flushed);
974     pthread_mutex_unlock(&cache_lock);
975
976     if (status == 0)
977     {
978       pthread_mutex_lock (&stats_lock);
979       stats_updates_written++;
980       stats_data_sets_written += values_num;
981       pthread_mutex_unlock (&stats_lock);
982     }
983
984     rrd_free_ptrs((void ***) &values, &values_num);
985     free(file);
986
987     pthread_mutex_lock (&cache_lock);
988   }
989   pthread_mutex_unlock (&cache_lock);
990
991   return (NULL);
992 } /* }}} void *queue_thread_main */
993
994 static int buffer_get_field (char **buffer_ret, /* {{{ */
995     size_t *buffer_size_ret, char **field_ret)
996 {
997   char *buffer;
998   size_t buffer_pos;
999   size_t buffer_size;
1000   char *field;
1001   size_t field_size;
1002   int status;
1003
1004   buffer = *buffer_ret;
1005   buffer_pos = 0;
1006   buffer_size = *buffer_size_ret;
1007   field = *buffer_ret;
1008   field_size = 0;
1009
1010   if (buffer_size <= 0)
1011     return (-1);
1012
1013   /* This is ensured by `handle_request'. */
1014   assert (buffer[buffer_size - 1] == '\0');
1015
1016   status = -1;
1017   while (buffer_pos < buffer_size)
1018   {
1019     /* Check for end-of-field or end-of-buffer */
1020     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1021     {
1022       field[field_size] = 0;
1023       field_size++;
1024       buffer_pos++;
1025       status = 0;
1026       break;
1027     }
1028     /* Handle escaped characters. */
1029     else if (buffer[buffer_pos] == '\\')
1030     {
1031       if (buffer_pos >= (buffer_size - 1))
1032         break;
1033       buffer_pos++;
1034       field[field_size] = buffer[buffer_pos];
1035       field_size++;
1036       buffer_pos++;
1037     }
1038     /* Normal operation */ 
1039     else
1040     {
1041       field[field_size] = buffer[buffer_pos];
1042       field_size++;
1043       buffer_pos++;
1044     }
1045   } /* while (buffer_pos < buffer_size) */
1046
1047   if (status != 0)
1048     return (status);
1049
1050   *buffer_ret = buffer + buffer_pos;
1051   *buffer_size_ret = buffer_size - buffer_pos;
1052   *field_ret = field;
1053
1054   return (0);
1055 } /* }}} int buffer_get_field */
1056
1057 /* if we're restricting writes to the base directory,
1058  * check whether the file falls within the dir
1059  * returns 1 if OK, otherwise 0
1060  */
1061 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1062 {
1063   assert(file != NULL);
1064
1065   if (!config_write_base_only
1066       || JOURNAL_REPLAY(sock)
1067       || config_base_dir == NULL)
1068     return 1;
1069
1070   if (strstr(file, "../") != NULL) goto err;
1071
1072   /* relative paths without "../" are ok */
1073   if (*file != '/') return 1;
1074
1075   /* file must be of the format base + "/" + <1+ char filename> */
1076   if (strlen(file) < _config_base_dir_len + 2) goto err;
1077   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1078   if (*(file + _config_base_dir_len) != '/') goto err;
1079
1080   return 1;
1081
1082 err:
1083   if (sock != NULL && sock->fd >= 0)
1084     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1085
1086   return 0;
1087 } /* }}} static int check_file_access */
1088
1089 /* when using a base dir, convert relative paths to absolute paths.
1090  * if necessary, modifies the "filename" pointer to point
1091  * to the new path created in "tmp".  "tmp" is provided
1092  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1093  *
1094  * this allows us to optimize for the expected case (absolute path)
1095  * with a no-op.
1096  */
1097 static void get_abs_path(char **filename, char *tmp)
1098 {
1099   assert(tmp != NULL);
1100   assert(filename != NULL && *filename != NULL);
1101
1102   if (config_base_dir == NULL || **filename == '/')
1103     return;
1104
1105   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1106   *filename = tmp;
1107 } /* }}} static int get_abs_path */
1108
1109 static int flush_file (const char *filename) /* {{{ */
1110 {
1111   cache_item_t *ci;
1112
1113   pthread_mutex_lock (&cache_lock);
1114
1115   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1116   if (ci == NULL)
1117   {
1118     pthread_mutex_unlock (&cache_lock);
1119     return (ENOENT);
1120   }
1121
1122   if (ci->values_num > 0)
1123   {
1124     /* Enqueue at head */
1125     enqueue_cache_item (ci, HEAD);
1126     pthread_cond_wait(&ci->flushed, &cache_lock);
1127   }
1128
1129   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1130    * may have been purged during our cond_wait() */
1131
1132   pthread_mutex_unlock(&cache_lock);
1133
1134   return (0);
1135 } /* }}} int flush_file */
1136
1137 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1138 {
1139   char *err = "Syntax error.\n";
1140
1141   if (cmd && cmd->syntax)
1142     err = cmd->syntax;
1143
1144   return send_response(sock, RESP_ERR, "Usage: %s", err);
1145 } /* }}} static int syntax_error() */
1146
1147 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1148 {
1149   uint64_t copy_queue_length;
1150   uint64_t copy_updates_received;
1151   uint64_t copy_flush_received;
1152   uint64_t copy_updates_written;
1153   uint64_t copy_data_sets_written;
1154   uint64_t copy_journal_bytes;
1155   uint64_t copy_journal_rotate;
1156
1157   uint64_t tree_nodes_number;
1158   uint64_t tree_depth;
1159
1160   pthread_mutex_lock (&stats_lock);
1161   copy_queue_length       = stats_queue_length;
1162   copy_updates_received   = stats_updates_received;
1163   copy_flush_received     = stats_flush_received;
1164   copy_updates_written    = stats_updates_written;
1165   copy_data_sets_written  = stats_data_sets_written;
1166   copy_journal_bytes      = stats_journal_bytes;
1167   copy_journal_rotate     = stats_journal_rotate;
1168   pthread_mutex_unlock (&stats_lock);
1169
1170   pthread_mutex_lock (&cache_lock);
1171   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1172   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1173   pthread_mutex_unlock (&cache_lock);
1174
1175   add_response_info(sock,
1176                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1177   add_response_info(sock,
1178                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1179   add_response_info(sock,
1180                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1181   add_response_info(sock,
1182                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1183   add_response_info(sock,
1184                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1185   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1186   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1187   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1188   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1189
1190   send_response(sock, RESP_OK, "Statistics follow\n");
1191
1192   return (0);
1193 } /* }}} int handle_request_stats */
1194
1195 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1196 {
1197   char *file, file_tmp[PATH_MAX];
1198   int status;
1199
1200   status = buffer_get_field (&buffer, &buffer_size, &file);
1201   if (status != 0)
1202   {
1203     return syntax_error(sock,cmd);
1204   }
1205   else
1206   {
1207     pthread_mutex_lock(&stats_lock);
1208     stats_flush_received++;
1209     pthread_mutex_unlock(&stats_lock);
1210
1211     get_abs_path(&file, file_tmp);
1212     if (!check_file_access(file, sock)) return 0;
1213
1214     status = flush_file (file);
1215     if (status == 0)
1216       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1217     else if (status == ENOENT)
1218     {
1219       /* no file in our tree; see whether it exists at all */
1220       struct stat statbuf;
1221
1222       memset(&statbuf, 0, sizeof(statbuf));
1223       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1224         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1225       else
1226         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1227     }
1228     else if (status < 0)
1229       return send_response(sock, RESP_ERR, "Internal error.\n");
1230     else
1231       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1232   }
1233
1234   /* NOTREACHED */
1235   assert(1==0);
1236 } /* }}} int handle_request_flush */
1237
1238 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1239 {
1240   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1241
1242   pthread_mutex_lock(&cache_lock);
1243   flush_old_values(-1);
1244   pthread_mutex_unlock(&cache_lock);
1245
1246   return send_response(sock, RESP_OK, "Started flush.\n");
1247 } /* }}} static int handle_request_flushall */
1248
1249 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1250 {
1251   int status;
1252   char *file, file_tmp[PATH_MAX];
1253   cache_item_t *ci;
1254
1255   status = buffer_get_field(&buffer, &buffer_size, &file);
1256   if (status != 0)
1257     return syntax_error(sock,cmd);
1258
1259   get_abs_path(&file, file_tmp);
1260
1261   pthread_mutex_lock(&cache_lock);
1262   ci = g_tree_lookup(cache_tree, file);
1263   if (ci == NULL)
1264   {
1265     pthread_mutex_unlock(&cache_lock);
1266     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1267   }
1268
1269   for (size_t i=0; i < ci->values_num; i++)
1270     add_response_info(sock, "%s\n", ci->values[i]);
1271
1272   pthread_mutex_unlock(&cache_lock);
1273   return send_response(sock, RESP_OK, "updates pending\n");
1274 } /* }}} static int handle_request_pending */
1275
1276 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1277 {
1278   int status;
1279   gboolean found;
1280   char *file, file_tmp[PATH_MAX];
1281
1282   status = buffer_get_field(&buffer, &buffer_size, &file);
1283   if (status != 0)
1284     return syntax_error(sock,cmd);
1285
1286   get_abs_path(&file, file_tmp);
1287   if (!check_file_access(file, sock)) return 0;
1288
1289   pthread_mutex_lock(&cache_lock);
1290   found = g_tree_remove(cache_tree, file);
1291   pthread_mutex_unlock(&cache_lock);
1292
1293   if (found == TRUE)
1294   {
1295     if (!JOURNAL_REPLAY(sock))
1296       journal_write("forget", file);
1297
1298     return send_response(sock, RESP_OK, "Gone!\n");
1299   }
1300   else
1301     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1302
1303   /* NOTREACHED */
1304   assert(1==0);
1305 } /* }}} static int handle_request_forget */
1306
1307 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1308 {
1309   cache_item_t *ci;
1310
1311   pthread_mutex_lock(&cache_lock);
1312
1313   ci = cache_queue_head;
1314   while (ci != NULL)
1315   {
1316     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1317     ci = ci->next;
1318   }
1319
1320   pthread_mutex_unlock(&cache_lock);
1321
1322   return send_response(sock, RESP_OK, "in queue.\n");
1323 } /* }}} int handle_request_queue */
1324
1325 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1326 {
1327   char *file, file_tmp[PATH_MAX];
1328   int values_num = 0;
1329   int status;
1330   char orig_buf[CMD_MAX];
1331
1332   cache_item_t *ci;
1333
1334   /* save it for the journal later */
1335   if (!JOURNAL_REPLAY(sock))
1336     strncpy(orig_buf, buffer, buffer_size);
1337
1338   status = buffer_get_field (&buffer, &buffer_size, &file);
1339   if (status != 0)
1340     return syntax_error(sock,cmd);
1341
1342   pthread_mutex_lock(&stats_lock);
1343   stats_updates_received++;
1344   pthread_mutex_unlock(&stats_lock);
1345
1346   get_abs_path(&file, file_tmp);
1347   if (!check_file_access(file, sock)) return 0;
1348
1349   pthread_mutex_lock (&cache_lock);
1350   ci = g_tree_lookup (cache_tree, file);
1351
1352   if (ci == NULL) /* {{{ */
1353   {
1354     struct stat statbuf;
1355     cache_item_t *tmp;
1356
1357     /* don't hold the lock while we setup; stat(2) might block */
1358     pthread_mutex_unlock(&cache_lock);
1359
1360     memset (&statbuf, 0, sizeof (statbuf));
1361     status = stat (file, &statbuf);
1362     if (status != 0)
1363     {
1364       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1365
1366       status = errno;
1367       if (status == ENOENT)
1368         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1369       else
1370         return send_response(sock, RESP_ERR,
1371                              "stat failed with error %i.\n", status);
1372     }
1373     if (!S_ISREG (statbuf.st_mode))
1374       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1375
1376     if (access(file, R_OK|W_OK) != 0)
1377       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1378                            file, rrd_strerror(errno));
1379
1380     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1381     if (ci == NULL)
1382     {
1383       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1384
1385       return send_response(sock, RESP_ERR, "malloc failed.\n");
1386     }
1387     memset (ci, 0, sizeof (cache_item_t));
1388
1389     ci->file = strdup (file);
1390     if (ci->file == NULL)
1391     {
1392       free (ci);
1393       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1394
1395       return send_response(sock, RESP_ERR, "strdup failed.\n");
1396     }
1397
1398     wipe_ci_values(ci, now);
1399     ci->flags = CI_FLAGS_IN_TREE;
1400     pthread_cond_init(&ci->flushed, NULL);
1401
1402     pthread_mutex_lock(&cache_lock);
1403
1404     /* another UPDATE might have added this entry in the meantime */
1405     tmp = g_tree_lookup (cache_tree, file);
1406     if (tmp == NULL)
1407       g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1408     else
1409     {
1410       free_cache_item (ci);
1411       ci = tmp;
1412     }
1413
1414     /* state may have changed while we were unlocked */
1415     if (state == SHUTDOWN)
1416       return -1;
1417   } /* }}} */
1418   assert (ci != NULL);
1419
1420   /* don't re-write updates in replay mode */
1421   if (!JOURNAL_REPLAY(sock))
1422     journal_write("update", orig_buf);
1423
1424   while (buffer_size > 0)
1425   {
1426     char *value;
1427     time_t stamp;
1428     char *eostamp;
1429
1430     status = buffer_get_field (&buffer, &buffer_size, &value);
1431     if (status != 0)
1432     {
1433       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1434       break;
1435     }
1436
1437     /* make sure update time is always moving forward */
1438     stamp = strtol(value, &eostamp, 10);
1439     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1440     {
1441       pthread_mutex_unlock(&cache_lock);
1442       return send_response(sock, RESP_ERR,
1443                            "Cannot find timestamp in '%s'!\n", value);
1444     }
1445     else if (stamp <= ci->last_update_stamp)
1446     {
1447       pthread_mutex_unlock(&cache_lock);
1448       return send_response(sock, RESP_ERR,
1449                            "illegal attempt to update using time %ld when last"
1450                            " update time is %ld (minimum one second step)\n",
1451                            stamp, ci->last_update_stamp);
1452     }
1453     else
1454       ci->last_update_stamp = stamp;
1455
1456     if (!rrd_add_strdup_chunk(&ci->values, &ci->values_num, value,
1457                               &ci->values_alloc, config_alloc_chunk))
1458     {
1459       RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1460       continue;
1461     }
1462
1463     values_num++;
1464   }
1465
1466   if (((now - ci->last_flush_time) >= config_write_interval)
1467       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1468       && (ci->values_num > 0))
1469   {
1470     enqueue_cache_item (ci, TAIL);
1471   }
1472
1473   pthread_mutex_unlock (&cache_lock);
1474
1475   if (values_num < 1)
1476     return send_response(sock, RESP_ERR, "No values updated.\n");
1477   else
1478     return send_response(sock, RESP_OK,
1479                          "errors, enqueued %i value(s).\n", values_num);
1480
1481   /* NOTREACHED */
1482   assert(1==0);
1483
1484 } /* }}} int handle_request_update */
1485
1486 static int handle_request_fetch (HANDLER_PROTO) /* {{{ */
1487 {
1488   char *file, file_tmp[PATH_MAX];
1489   char *cf;
1490
1491   char *start_str;
1492   char *end_str;
1493   time_t start_tm;
1494   time_t end_tm;
1495
1496   unsigned long step;
1497   unsigned long ds_cnt;
1498   char **ds_namv;
1499   rrd_value_t *data;
1500
1501   int status;
1502   unsigned long i;
1503   time_t t;
1504   rrd_value_t *data_ptr;
1505
1506   file = NULL;
1507   cf = NULL;
1508   start_str = NULL;
1509   end_str = NULL;
1510
1511   /* Read the arguments */
1512   do /* while (0) */
1513   {
1514     status = buffer_get_field (&buffer, &buffer_size, &file);
1515     if (status != 0)
1516       break;
1517
1518     status = buffer_get_field (&buffer, &buffer_size, &cf);
1519     if (status != 0)
1520       break;
1521
1522     status = buffer_get_field (&buffer, &buffer_size, &start_str);
1523     if (status != 0)
1524     {
1525       start_str = NULL;
1526       status = 0;
1527       break;
1528     }
1529
1530     status = buffer_get_field (&buffer, &buffer_size, &end_str);
1531     if (status != 0)
1532     {
1533       end_str = NULL;
1534       status = 0;
1535       break;
1536     }
1537   } while (0);
1538
1539   if (status != 0)
1540     return (syntax_error(sock,cmd));
1541
1542   get_abs_path(&file, file_tmp);
1543   if (!check_file_access(file, sock)) return 0;
1544
1545   status = flush_file (file);
1546   if ((status != 0) && (status != ENOENT))
1547     return (send_response (sock, RESP_ERR,
1548           "flush_file (%s) failed with status %i.\n", file, status));
1549
1550   t = time (NULL); /* "now" */
1551
1552   /* Parse start time */
1553   if (start_str != NULL)
1554   {
1555     char *endptr;
1556     long value;
1557
1558     endptr = NULL;
1559     errno = 0;
1560     value = strtol (start_str, &endptr, /* base = */ 0);
1561     if ((endptr == start_str) || (errno != 0))
1562       return (send_response(sock, RESP_ERR,
1563             "Cannot parse start time `%s': Only simple integers are allowed.\n",
1564             start_str));
1565
1566     if (value > 0)
1567       start_tm = (time_t) value;
1568     else
1569       start_tm = (time_t) (t + value);
1570   }
1571   else
1572   {
1573     start_tm = t - 86400;
1574   }
1575
1576   /* Parse end time */
1577   if (end_str != NULL)
1578   {
1579     char *endptr;
1580     long value;
1581
1582     endptr = NULL;
1583     errno = 0;
1584     value = strtol (end_str, &endptr, /* base = */ 0);
1585     if ((endptr == end_str) || (errno != 0))
1586       return (send_response(sock, RESP_ERR,
1587             "Cannot parse end time `%s': Only simple integers are allowed.\n",
1588             end_str));
1589
1590     if (value > 0)
1591       end_tm = (time_t) value;
1592     else
1593       end_tm = (time_t) (t + value);
1594   }
1595   else
1596   {
1597     end_tm = t;
1598   }
1599
1600   step = -1;
1601   ds_cnt = 0;
1602   ds_namv = NULL;
1603   data = NULL;
1604
1605   status = rrd_fetch_r (file, cf, &start_tm, &end_tm, &step,
1606       &ds_cnt, &ds_namv, &data);
1607   if (status != 0)
1608     return (send_response(sock, RESP_ERR,
1609           "rrd_fetch_r failed: %s\n", rrd_get_error ()));
1610
1611   add_response_info (sock, "FlushVersion: %lu\n", 1);
1612   add_response_info (sock, "Start: %lu\n", (unsigned long) start_tm);
1613   add_response_info (sock, "End: %lu\n", (unsigned long) end_tm);
1614   add_response_info (sock, "Step: %lu\n", step);
1615   add_response_info (sock, "DSCount: %lu\n", ds_cnt);
1616
1617 #define SSTRCAT(buffer,str,buffer_fill) do { \
1618     size_t str_len = strlen (str); \
1619     if ((buffer_fill + str_len) > sizeof (buffer)) \
1620       str_len = sizeof (buffer) - buffer_fill; \
1621     if (str_len > 0) { \
1622       strncpy (buffer + buffer_fill, str, str_len); \
1623       buffer_fill += str_len; \
1624       assert (buffer_fill <= sizeof (buffer)); \
1625       if (buffer_fill == sizeof (buffer)) \
1626         buffer[buffer_fill - 1] = 0; \
1627       else \
1628         buffer[buffer_fill] = 0; \
1629     } \
1630   } while (0)
1631
1632   { /* Add list of DS names */
1633     char linebuf[1024];
1634     size_t linebuf_fill;
1635
1636     memset (linebuf, 0, sizeof (linebuf));
1637     linebuf_fill = 0;
1638     for (i = 0; i < ds_cnt; i++)
1639     {
1640       if (i > 0)
1641         SSTRCAT (linebuf, " ", linebuf_fill);
1642       SSTRCAT (linebuf, ds_namv[i], linebuf_fill);
1643       rrd_freemem(ds_namv[i]);
1644     }
1645     rrd_freemem(ds_namv);
1646     add_response_info (sock, "DSName: %s\n", linebuf);
1647   }
1648
1649   /* Add the actual data */
1650   assert (step > 0);
1651   data_ptr = data;
1652   for (t = start_tm + step; t <= end_tm; t += step)
1653   {
1654     char linebuf[1024];
1655     size_t linebuf_fill;
1656     char tmp[128];
1657
1658     memset (linebuf, 0, sizeof (linebuf));
1659     linebuf_fill = 0;
1660     for (i = 0; i < ds_cnt; i++)
1661     {
1662       snprintf (tmp, sizeof (tmp), " %0.10e", *data_ptr);
1663       tmp[sizeof (tmp) - 1] = 0;
1664       SSTRCAT (linebuf, tmp, linebuf_fill);
1665
1666       data_ptr++;
1667     }
1668
1669     add_response_info (sock, "%10lu:%s\n", (unsigned long) t, linebuf);
1670   } /* for (t) */
1671   rrd_freemem(data);
1672
1673   return (send_response (sock, RESP_OK, "Success\n"));
1674 #undef SSTRCAT
1675 } /* }}} int handle_request_fetch */
1676
1677 /* we came across a "WROTE" entry during journal replay.
1678  * throw away any values that we have accumulated for this file
1679  */
1680 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1681 {
1682   cache_item_t *ci;
1683   const char *file = buffer;
1684
1685   pthread_mutex_lock(&cache_lock);
1686
1687   ci = g_tree_lookup(cache_tree, file);
1688   if (ci == NULL)
1689   {
1690     pthread_mutex_unlock(&cache_lock);
1691     return (0);
1692   }
1693
1694   if (ci->values)
1695     rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1696
1697   wipe_ci_values(ci, now);
1698   remove_from_queue(ci);
1699
1700   pthread_mutex_unlock(&cache_lock);
1701   return (0);
1702 } /* }}} int handle_request_wrote */
1703
1704 static int handle_request_info (HANDLER_PROTO) /* {{{ */
1705 {
1706   char *file, file_tmp[PATH_MAX];
1707   int status;
1708   rrd_info_t *info;
1709
1710   /* obtain filename */
1711   status = buffer_get_field(&buffer, &buffer_size, &file);
1712   if (status != 0)
1713     return syntax_error(sock,cmd);
1714   /* get full pathname */
1715   get_abs_path(&file, file_tmp);
1716   if (!check_file_access(file, sock)) {
1717     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1718   }
1719   /* get data */
1720   rrd_clear_error ();
1721   info = rrd_info_r(file);
1722   if(!info) {
1723     return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1724   }
1725   for (rrd_info_t *data = info; data != NULL; data = data->next) {
1726       switch (data->type) {
1727       case RD_I_VAL:
1728           if (isnan(data->value.u_val))
1729               add_response_info(sock,"%s %d NaN\n",data->key, data->type);
1730           else
1731               add_response_info(sock,"%s %d %0.10e\n", data->key, data->type, data->value.u_val);
1732           break;
1733       case RD_I_CNT:
1734           add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_cnt);
1735           break;
1736       case RD_I_INT:
1737           add_response_info(sock,"%s %d %d\n", data->key, data->type, data->value.u_int);
1738           break;
1739       case RD_I_STR:
1740           add_response_info(sock,"%s %d %s\n", data->key, data->type, data->value.u_str);
1741           break;
1742       case RD_I_BLO:
1743           add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_blo.size);
1744           break;
1745       }
1746   }
1747
1748   rrd_info_free(info);
1749
1750   return send_response(sock, RESP_OK, "Info for %s follows\n",file);
1751 } /* }}} static int handle_request_info  */
1752
1753 static int handle_request_first (HANDLER_PROTO) /* {{{ */
1754 {
1755   char *i, *file, file_tmp[PATH_MAX];
1756   int status;
1757   int idx;
1758   time_t t;
1759
1760   /* obtain filename */
1761   status = buffer_get_field(&buffer, &buffer_size, &file);
1762   if (status != 0)
1763     return syntax_error(sock,cmd);
1764   /* get full pathname */
1765   get_abs_path(&file, file_tmp);
1766   if (!check_file_access(file, sock)) {
1767     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1768   }
1769
1770   status = buffer_get_field(&buffer, &buffer_size, &i);
1771   if (status != 0)
1772     return syntax_error(sock,cmd);
1773   idx = atoi(i);
1774   if(idx<0) { 
1775     return send_response(sock, RESP_ERR, "Invalid index specified (%d)\n", idx);
1776   }
1777
1778   /* get data */
1779   rrd_clear_error ();
1780   t = rrd_first_r(file,idx);
1781   if(t<1) {
1782     return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1783   }
1784   return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1785 } /* }}} static int handle_request_first  */
1786
1787
1788 static int handle_request_last (HANDLER_PROTO) /* {{{ */
1789 {
1790   char *file, file_tmp[PATH_MAX];
1791   int status;
1792   time_t t, from_file, step;
1793   rrd_file_t * rrd_file;
1794   cache_item_t * ci;
1795   rrd_t rrd; 
1796
1797   /* obtain filename */
1798   status = buffer_get_field(&buffer, &buffer_size, &file);
1799   if (status != 0)
1800     return syntax_error(sock,cmd);
1801   /* get full pathname */
1802   get_abs_path(&file, file_tmp);
1803   if (!check_file_access(file, sock)) {
1804     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1805   }
1806   rrd_clear_error();
1807   rrd_init(&rrd);
1808   rrd_file = rrd_open(file,&rrd,RRD_READONLY);
1809   if(!rrd_file) {
1810     return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1811   }
1812   from_file = rrd.live_head->last_up;
1813   step = rrd.stat_head->pdp_step;
1814   rrd_close(rrd_file);
1815   pthread_mutex_lock(&cache_lock);
1816   ci = g_tree_lookup(cache_tree, file);
1817   if (ci)
1818     t = ci->last_update_stamp;
1819   else
1820     t = from_file;
1821   pthread_mutex_unlock(&cache_lock);
1822   t -= t % step;
1823   rrd_free(&rrd);
1824   if(t<1) {
1825     return send_response(sock, RESP_ERR, "Error: rrdcached: Invalid timestamp returned\n");
1826   }
1827   return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1828 } /* }}} static int handle_request_last  */
1829
1830 static int handle_request_create (HANDLER_PROTO) /* {{{ */
1831 {
1832   char *file, file_tmp[PATH_MAX];
1833   char *tok;
1834   int ac = 0;
1835   char *av[128];
1836   int status;
1837   unsigned long step = 300;
1838   time_t last_up = time(NULL)-10;
1839   int no_overwrite = opt_no_overwrite;
1840
1841
1842   /* obtain filename */
1843   status = buffer_get_field(&buffer, &buffer_size, &file);
1844   if (status != 0)
1845     return syntax_error(sock,cmd);
1846   /* get full pathname */
1847   get_abs_path(&file, file_tmp);
1848   if (!check_file_access(file, sock)) {
1849     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1850   }
1851   RRDD_LOG(LOG_INFO, "rrdcreate request for %s",file);
1852
1853   while ((status = buffer_get_field(&buffer, &buffer_size, &tok)) == 0 && tok) {
1854     if( ! strncmp(tok,"-b",2) ) {
1855       status = buffer_get_field(&buffer, &buffer_size, &tok );
1856       if (status != 0) return syntax_error(sock,cmd);
1857       last_up = (time_t) atol(tok);
1858       continue;
1859     }
1860     if( ! strncmp(tok,"-s",2) ) {
1861       status = buffer_get_field(&buffer, &buffer_size, &tok );
1862       if (status != 0) return syntax_error(sock,cmd);
1863       step = atol(tok);
1864       continue;
1865     }
1866     if( ! strncmp(tok,"-O",2) ) {
1867       no_overwrite = 1;
1868       continue;
1869     }
1870     if( ! strncmp(tok,"DS:",3) ) { av[ac++]=tok; continue; }
1871     if( ! strncmp(tok,"RRA:",4) ) { av[ac++]=tok; continue; }
1872     return syntax_error(sock,cmd);
1873   }
1874   if(step<1) {
1875     return send_response(sock, RESP_ERR, "The step size cannot be less than 1 second.\n");
1876   }
1877   if (last_up < 3600 * 24 * 365 * 10) {
1878     return send_response(sock, RESP_ERR, "The first entry must be after 1980.\n");
1879   }
1880
1881   rrd_clear_error ();
1882   status = rrd_create_r2(file,step,last_up,no_overwrite,ac,(const char **)av);
1883
1884   if(!status) {
1885     return send_response(sock, RESP_OK, "RRD created OK\n");
1886   }
1887   return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1888 } /* }}} static int handle_request_create  */
1889
1890 /* start "BATCH" processing */
1891 static int batch_start (HANDLER_PROTO) /* {{{ */
1892 {
1893   int status;
1894   if (sock->batch_start)
1895     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1896
1897   status = send_response(sock, RESP_OK,
1898                          "Go ahead.  End with dot '.' on its own line.\n");
1899   sock->batch_start = time(NULL);
1900   sock->batch_cmd = 0;
1901
1902   return status;
1903 } /* }}} static int batch_start */
1904
1905 /* finish "BATCH" processing and return results to the client */
1906 static int batch_done (HANDLER_PROTO) /* {{{ */
1907 {
1908   assert(sock->batch_start);
1909   sock->batch_start = 0;
1910   sock->batch_cmd  = 0;
1911   return send_response(sock, RESP_OK, "errors\n");
1912 } /* }}} static int batch_done */
1913
1914 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1915 {
1916   return -1;
1917 } /* }}} static int handle_request_quit */
1918
1919 static command_t list_of_commands[] = { /* {{{ */
1920   {
1921     "UPDATE",
1922     handle_request_update,
1923     CMD_CONTEXT_ANY,
1924     "UPDATE <filename> <values> [<values> ...]\n"
1925     ,
1926     "Adds the given file to the internal cache if it is not yet known and\n"
1927     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1928     "for details.\n"
1929     "\n"
1930     "Each <values> has the following form:\n"
1931     "  <values> = <time>:<value>[:<value>[...]]\n"
1932     "See the rrdupdate(1) manpage for details.\n"
1933   },
1934   {
1935     "WROTE",
1936     handle_request_wrote,
1937     CMD_CONTEXT_JOURNAL,
1938     NULL,
1939     NULL
1940   },
1941   {
1942     "FLUSH",
1943     handle_request_flush,
1944     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1945     "FLUSH <filename>\n"
1946     ,
1947     "Adds the given filename to the head of the update queue and returns\n"
1948     "after it has been dequeued.\n"
1949   },
1950   {
1951     "FLUSHALL",
1952     handle_request_flushall,
1953     CMD_CONTEXT_CLIENT,
1954     "FLUSHALL\n"
1955     ,
1956     "Triggers writing of all pending updates.  Returns immediately.\n"
1957   },
1958   {
1959     "PENDING",
1960     handle_request_pending,
1961     CMD_CONTEXT_CLIENT,
1962     "PENDING <filename>\n"
1963     ,
1964     "Shows any 'pending' updates for a file, in order.\n"
1965     "The updates shown have not yet been written to the underlying RRD file.\n"
1966   },
1967   {
1968     "FORGET",
1969     handle_request_forget,
1970     CMD_CONTEXT_ANY,
1971     "FORGET <filename>\n"
1972     ,
1973     "Removes the file completely from the cache.\n"
1974     "Any pending updates for the file will be lost.\n"
1975   },
1976   {
1977     "QUEUE",
1978     handle_request_queue,
1979     CMD_CONTEXT_CLIENT,
1980     "QUEUE\n"
1981     ,
1982         "Shows all files in the output queue.\n"
1983     "The output is zero or more lines in the following format:\n"
1984     "(where <num_vals> is the number of values to be written)\n"
1985     "\n"
1986     "<num_vals> <filename>\n"
1987   },
1988   {
1989     "STATS",
1990     handle_request_stats,
1991     CMD_CONTEXT_CLIENT,
1992     "STATS\n"
1993     ,
1994     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1995     "a description of the values.\n"
1996   },
1997   {
1998     "HELP",
1999     handle_request_help,
2000     CMD_CONTEXT_CLIENT,
2001     "HELP [<command>]\n",
2002     NULL, /* special! */
2003   },
2004   {
2005     "BATCH",
2006     batch_start,
2007     CMD_CONTEXT_CLIENT,
2008     "BATCH\n"
2009     ,
2010     "The 'BATCH' command permits the client to initiate a bulk load\n"
2011     "   of commands to rrdcached.\n"
2012     "\n"
2013     "Usage:\n"
2014     "\n"
2015     "    client: BATCH\n"
2016     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
2017     "    client: command #1\n"
2018     "    client: command #2\n"
2019     "    client: ... and so on\n"
2020     "    client: .\n"
2021     "    server: 2 errors\n"
2022     "    server: 7 message for command #7\n"
2023     "    server: 9 message for command #9\n"
2024     "\n"
2025     "For more information, consult the rrdcached(1) documentation.\n"
2026   },
2027   {
2028     ".",   /* BATCH terminator */
2029     batch_done,
2030     CMD_CONTEXT_BATCH,
2031     NULL,
2032     NULL
2033   },
2034   {
2035     "FETCH",
2036     handle_request_fetch,
2037     CMD_CONTEXT_CLIENT,
2038     "FETCH <file> <CF> [<start> [<end>]]\n"
2039     ,
2040     "The 'FETCH' can be used by the client to retrieve values from an RRD file.\n"
2041   },
2042   {
2043     "INFO",
2044     handle_request_info,
2045     CMD_CONTEXT_CLIENT,
2046     "INFO <filename>\n",
2047     "The INFO command retrieves information about a specified RRD file.\n"
2048     "This is returned in standard rrdinfo format, a sequence of lines\n"
2049     "with the format <keyname> = <value>\n"
2050     "Note that this is the data as of the last update of the RRD file itself,\n"
2051     "not the last time data was received via rrdcached, so there may be pending\n"
2052     "updates in the queue.  If this bothers you, then first run a FLUSH.\n"
2053   },
2054   {
2055     "FIRST",
2056     handle_request_first,
2057     CMD_CONTEXT_CLIENT,
2058     "FIRST <filename> <rra index>\n",
2059     "The FIRST command retrieves the first data time for a specified RRA in\n"
2060     "an RRD file.\n"
2061   },
2062   {
2063     "LAST",
2064     handle_request_last,
2065     CMD_CONTEXT_CLIENT,
2066     "LAST <filename>\n",
2067     "The LAST command retrieves the last update time for a specified RRD file.\n"
2068     "Note that this is the time of the last update of the RRD file itself, not\n"
2069     "the last time data was received via rrdcached, so there may be pending\n"
2070     "updates in the queue.  If this bothers you, then first run a FLUSH.\n"
2071   },
2072   {
2073     "CREATE",
2074     handle_request_create,
2075     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2076     "CREATE <filename> [-b start] [-s step] [-O] <DS definitions> <RRA definitions>\n",
2077     "The CREATE command will create an RRD file, overwriting any existing file\n"
2078     "unless the -O option is given or rrdcached was started with the -O option.\n"
2079     "The start parameter needs to be in seconds since 1/1/70 (AT-style syntax is\n"
2080     "not acceptable) and the step is in seconds (default is 300).\n"
2081     "The DS and RRA definitions are as for the 'rrdtool create' command.\n"
2082   },
2083   {
2084     "QUIT",
2085     handle_request_quit,
2086     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2087     "QUIT\n"
2088     ,
2089     "Disconnect from rrdcached.\n"
2090   }
2091 }; /* }}} command_t list_of_commands[] */
2092 static size_t list_of_commands_len = sizeof (list_of_commands)
2093   / sizeof (list_of_commands[0]);
2094
2095 static command_t *find_command(char *cmd)
2096 {
2097   size_t i;
2098
2099   for (i = 0; i < list_of_commands_len; i++)
2100     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2101       return (&list_of_commands[i]);
2102   return NULL;
2103 }
2104
2105 /* We currently use the index in the `list_of_commands' array as a bit position
2106  * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
2107  * outside these functions so that switching to a more elegant storage method
2108  * is easily possible. */
2109 static ssize_t find_command_index (const char *cmd) /* {{{ */
2110 {
2111   size_t i;
2112
2113   for (i = 0; i < list_of_commands_len; i++)
2114     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2115       return ((ssize_t) i);
2116   return (-1);
2117 } /* }}} ssize_t find_command_index */
2118
2119 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
2120     const char *cmd)
2121 {
2122   ssize_t i;
2123
2124   if (JOURNAL_REPLAY(sock))
2125     return (1);
2126
2127   if (cmd == NULL)
2128     return (-1);
2129
2130   if ((strcasecmp ("QUIT", cmd) == 0)
2131       || (strcasecmp ("HELP", cmd) == 0))
2132     return (1);
2133   else if (strcmp (".", cmd) == 0)
2134     cmd = "BATCH";
2135
2136   i = find_command_index (cmd);
2137   if (i < 0)
2138     return (-1);
2139   assert (i < 32);
2140
2141   if ((sock->permissions & (1 << i)) != 0)
2142     return (1);
2143   return (0);
2144 } /* }}} int socket_permission_check */
2145
2146 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
2147     const char *cmd)
2148 {
2149   ssize_t i;
2150
2151   i = find_command_index (cmd);
2152   if (i < 0)
2153     return (-1);
2154   assert (i < 32);
2155
2156   sock->permissions |= (1 << i);
2157   return (0);
2158 } /* }}} int socket_permission_add */
2159
2160 static void socket_permission_clear (listen_socket_t *sock) /* {{{ */
2161 {
2162   sock->permissions = 0;
2163 } /* }}} socket_permission_clear */
2164
2165 static void socket_permission_copy (listen_socket_t *dest, /* {{{ */
2166     listen_socket_t *src)
2167 {
2168   dest->permissions = src->permissions;
2169 } /* }}} socket_permission_copy */
2170
2171 /* check whether commands are received in the expected context */
2172 static int command_check_context(listen_socket_t *sock, command_t *cmd)
2173 {
2174   if (JOURNAL_REPLAY(sock))
2175     return (cmd->context & CMD_CONTEXT_JOURNAL);
2176   else if (sock->batch_start)
2177     return (cmd->context & CMD_CONTEXT_BATCH);
2178   else
2179     return (cmd->context & CMD_CONTEXT_CLIENT);
2180
2181   /* NOTREACHED */
2182   assert(1==0);
2183 }
2184
2185 static int handle_request_help (HANDLER_PROTO) /* {{{ */
2186 {
2187   int status;
2188   char *cmd_str;
2189   char *resp_txt;
2190   command_t *help = NULL;
2191
2192   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
2193   if (status == 0)
2194     help = find_command(cmd_str);
2195
2196   if (help && (help->syntax || help->help))
2197   {
2198     char tmp[CMD_MAX];
2199
2200     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
2201     resp_txt = tmp;
2202
2203     if (help->syntax)
2204       add_response_info(sock, "Usage: %s\n", help->syntax);
2205
2206     if (help->help)
2207       add_response_info(sock, "%s\n", help->help);
2208   }
2209   else
2210   {
2211     size_t i;
2212
2213     resp_txt = "Command overview\n";
2214
2215     for (i = 0; i < list_of_commands_len; i++)
2216     {
2217       if (list_of_commands[i].syntax == NULL)
2218         continue;
2219       add_response_info (sock, "%s", list_of_commands[i].syntax);
2220     }
2221   }
2222
2223   return send_response(sock, RESP_OK, resp_txt);
2224 } /* }}} int handle_request_help */
2225
2226 static int handle_request (DISPATCH_PROTO) /* {{{ */
2227 {
2228   char *buffer_ptr = buffer;
2229   char *cmd_str = NULL;
2230   command_t *cmd = NULL;
2231   int status;
2232
2233   assert (buffer[buffer_size - 1] == '\0');
2234
2235   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
2236   if (status != 0)
2237   {
2238     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
2239     return (-1);
2240   }
2241
2242   if (sock != NULL && sock->batch_start)
2243     sock->batch_cmd++;
2244
2245   cmd = find_command(cmd_str);
2246   if (!cmd)
2247     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
2248
2249   if (!socket_permission_check (sock, cmd->cmd))
2250     return send_response(sock, RESP_ERR, "Permission denied.\n");
2251
2252   if (!command_check_context(sock, cmd))
2253     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
2254
2255   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
2256 } /* }}} int handle_request */
2257
2258 static void journal_set_free (journal_set *js) /* {{{ */
2259 {
2260   if (js == NULL)
2261     return;
2262
2263   rrd_free_ptrs((void ***) &js->files, &js->files_num);
2264
2265   free(js);
2266 } /* }}} journal_set_free */
2267
2268 static void journal_set_remove (journal_set *js) /* {{{ */
2269 {
2270   if (js == NULL)
2271     return;
2272
2273   for (uint i=0; i < js->files_num; i++)
2274   {
2275     RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
2276     unlink(js->files[i]);
2277   }
2278 } /* }}} journal_set_remove */
2279
2280 /* close current journal file handle.
2281  * MUST hold journal_lock before calling */
2282 static void journal_close(void) /* {{{ */
2283 {
2284   if (journal_fh != NULL)
2285   {
2286     if (fclose(journal_fh) != 0)
2287       RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
2288   }
2289
2290   journal_fh = NULL;
2291   journal_size = 0;
2292 } /* }}} journal_close */
2293
2294 /* MUST hold journal_lock before calling */
2295 static void journal_new_file(void) /* {{{ */
2296 {
2297   struct timeval now;
2298   int  new_fd;
2299   char new_file[PATH_MAX + 1];
2300
2301   assert(journal_dir != NULL);
2302   assert(journal_cur != NULL);
2303
2304   journal_close();
2305
2306   gettimeofday(&now, NULL);
2307   /* this format assures that the files sort in strcmp() order */
2308   snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
2309            journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
2310
2311   new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
2312                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
2313   if (new_fd < 0)
2314     goto error;
2315
2316   journal_fh = fdopen(new_fd, "a");
2317   if (journal_fh == NULL)
2318     goto error;
2319
2320   journal_size = ftell(journal_fh);
2321   RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
2322
2323   /* record the file in the journal set */
2324   rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
2325
2326   return;
2327
2328 error:
2329   RRDD_LOG(LOG_CRIT,
2330            "JOURNALING DISABLED: Error while trying to create %s : %s",
2331            new_file, rrd_strerror(errno));
2332   RRDD_LOG(LOG_CRIT,
2333            "JOURNALING DISABLED: All values will be flushed at shutdown");
2334
2335   close(new_fd);
2336   config_flush_at_shutdown = 1;
2337
2338 } /* }}} journal_new_file */
2339
2340 /* MUST NOT hold journal_lock before calling this */
2341 static void journal_rotate(void) /* {{{ */
2342 {
2343   journal_set *old_js = NULL;
2344
2345   if (journal_dir == NULL)
2346     return;
2347
2348   RRDD_LOG(LOG_DEBUG, "rotating journals");
2349
2350   pthread_mutex_lock(&stats_lock);
2351   ++stats_journal_rotate;
2352   pthread_mutex_unlock(&stats_lock);
2353
2354   pthread_mutex_lock(&journal_lock);
2355
2356   journal_close();
2357
2358   /* rotate the journal sets */
2359   old_js = journal_old;
2360   journal_old = journal_cur;
2361   journal_cur = calloc(1, sizeof(journal_set));
2362
2363   if (journal_cur != NULL)
2364     journal_new_file();
2365   else
2366     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2367
2368   pthread_mutex_unlock(&journal_lock);
2369
2370   journal_set_remove(old_js);
2371   journal_set_free  (old_js);
2372
2373 } /* }}} static void journal_rotate */
2374
2375 /* MUST hold journal_lock when calling */
2376 static void journal_done(void) /* {{{ */
2377 {
2378   if (journal_cur == NULL)
2379     return;
2380
2381   journal_close();
2382
2383   if (config_flush_at_shutdown)
2384   {
2385     RRDD_LOG(LOG_INFO, "removing journals");
2386     journal_set_remove(journal_old);
2387     journal_set_remove(journal_cur);
2388   }
2389   else
2390   {
2391     RRDD_LOG(LOG_INFO, "expedited shutdown; "
2392              "journals will be used at next startup");
2393   }
2394
2395   journal_set_free(journal_cur);
2396   journal_set_free(journal_old);
2397   free(journal_dir);
2398
2399 } /* }}} static void journal_done */
2400
2401 static int journal_write(char *cmd, char *args) /* {{{ */
2402 {
2403   int chars;
2404
2405   if (journal_fh == NULL)
2406     return 0;
2407
2408   pthread_mutex_lock(&journal_lock);
2409   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
2410   journal_size += chars;
2411
2412   if (journal_size > JOURNAL_MAX)
2413     journal_new_file();
2414
2415   pthread_mutex_unlock(&journal_lock);
2416
2417   if (chars > 0)
2418   {
2419     pthread_mutex_lock(&stats_lock);
2420     stats_journal_bytes += chars;
2421     pthread_mutex_unlock(&stats_lock);
2422   }
2423
2424   return chars;
2425 } /* }}} static int journal_write */
2426
2427 static int journal_replay (const char *file) /* {{{ */
2428 {
2429   FILE *fh;
2430   int entry_cnt = 0;
2431   int fail_cnt = 0;
2432   uint64_t line = 0;
2433   char entry[CMD_MAX];
2434   time_t now;
2435
2436   if (file == NULL) return 0;
2437
2438   {
2439     char *reason = "unknown error";
2440     int status = 0;
2441     struct stat statbuf;
2442
2443     memset(&statbuf, 0, sizeof(statbuf));
2444     if (stat(file, &statbuf) != 0)
2445     {
2446       reason = "stat error";
2447       status = errno;
2448     }
2449     else if (!S_ISREG(statbuf.st_mode))
2450     {
2451       reason = "not a regular file";
2452       status = EPERM;
2453     }
2454     if (statbuf.st_uid != daemon_uid)
2455     {
2456       reason = "not owned by daemon user";
2457       status = EACCES;
2458     }
2459     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2460     {
2461       reason = "must not be user/group writable";
2462       status = EACCES;
2463     }
2464
2465     if (status != 0)
2466     {
2467       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2468                file, rrd_strerror(status), reason);
2469       return 0;
2470     }
2471   }
2472
2473   fh = fopen(file, "r");
2474   if (fh == NULL)
2475   {
2476     if (errno != ENOENT)
2477       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2478                file, rrd_strerror(errno));
2479     return 0;
2480   }
2481   else
2482     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2483
2484   now = time(NULL);
2485
2486   while(!feof(fh))
2487   {
2488     size_t entry_len;
2489
2490     ++line;
2491     if (fgets(entry, sizeof(entry), fh) == NULL)
2492       break;
2493     entry_len = strlen(entry);
2494
2495     /* check \n termination in case journal writing crashed mid-line */
2496     if (entry_len == 0)
2497       continue;
2498     else if (entry[entry_len - 1] != '\n')
2499     {
2500       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2501       ++fail_cnt;
2502       continue;
2503     }
2504
2505     entry[entry_len - 1] = '\0';
2506
2507     if (handle_request(NULL, now, entry, entry_len) == 0)
2508       ++entry_cnt;
2509     else
2510       ++fail_cnt;
2511   }
2512
2513   fclose(fh);
2514
2515   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2516            entry_cnt, fail_cnt);
2517
2518   return entry_cnt > 0 ? 1 : 0;
2519 } /* }}} static int journal_replay */
2520
2521 static int journal_sort(const void *v1, const void *v2)
2522 {
2523   char **jn1 = (char **) v1;
2524   char **jn2 = (char **) v2;
2525
2526   return strcmp(*jn1,*jn2);
2527 }
2528
2529 static void journal_init(void) /* {{{ */
2530 {
2531   int had_journal = 0;
2532   DIR *dir;
2533   struct dirent *dent;
2534   char path[PATH_MAX+1];
2535
2536   if (journal_dir == NULL) return;
2537
2538   pthread_mutex_lock(&journal_lock);
2539
2540   journal_cur = calloc(1, sizeof(journal_set));
2541   if (journal_cur == NULL)
2542   {
2543     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2544     return;
2545   }
2546
2547   RRDD_LOG(LOG_INFO, "checking for journal files");
2548
2549   /* Handle old journal files during transition.  This gives them the
2550    * correct sort order.  TODO: remove after first release
2551    */
2552   {
2553     char old_path[PATH_MAX+1];
2554     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2555     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2556     rename(old_path, path);
2557
2558     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE        );
2559     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2560     rename(old_path, path);
2561   }
2562
2563   dir = opendir(journal_dir);
2564   if (!dir) {
2565     RRDD_LOG(LOG_CRIT, "journal_init: opendir(%s) failed\n", journal_dir);
2566     return;
2567   }
2568   while ((dent = readdir(dir)) != NULL)
2569   {
2570     /* looks like a journal file? */
2571     if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2572       continue;
2573
2574     snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2575
2576     if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2577     {
2578       RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2579                dent->d_name);
2580       break;
2581     }
2582   }
2583   closedir(dir);
2584
2585   qsort(journal_cur->files, journal_cur->files_num,
2586         sizeof(journal_cur->files[0]), journal_sort);
2587
2588   for (uint i=0; i < journal_cur->files_num; i++)
2589     had_journal += journal_replay(journal_cur->files[i]);
2590
2591   journal_new_file();
2592
2593   /* it must have been a crash.  start a flush */
2594   if (had_journal && config_flush_at_shutdown)
2595     flush_old_values(-1);
2596
2597   pthread_mutex_unlock(&journal_lock);
2598
2599   RRDD_LOG(LOG_INFO, "journal processing complete");
2600
2601 } /* }}} static void journal_init */
2602
2603 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2604 {
2605   assert(sock != NULL);
2606
2607   free(sock->rbuf);  sock->rbuf = NULL;
2608   free(sock->wbuf);  sock->wbuf = NULL;
2609   free(sock);
2610 } /* }}} void free_listen_socket */
2611
2612 static void close_connection(listen_socket_t *sock) /* {{{ */
2613 {
2614   if (sock->fd >= 0)
2615   {
2616     close(sock->fd);
2617     sock->fd = -1;
2618   }
2619
2620   free_listen_socket(sock);
2621
2622 } /* }}} void close_connection */
2623
2624 static void *connection_thread_main (void *args) /* {{{ */
2625 {
2626   listen_socket_t *sock;
2627   int fd;
2628
2629   sock = (listen_socket_t *) args;
2630   fd = sock->fd;
2631
2632   /* init read buffers */
2633   sock->next_read = sock->next_cmd = 0;
2634   sock->rbuf = malloc(RBUF_SIZE);
2635   if (sock->rbuf == NULL)
2636   {
2637     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2638     close_connection(sock);
2639     return NULL;
2640   }
2641
2642   pthread_mutex_lock (&connection_threads_lock);
2643 #ifdef HAVE_LIBWRAP
2644   /* LIBWRAP does not support multiple threads! By putting this code
2645      inside pthread_mutex_lock we do not have to worry about request_info
2646      getting overwritten by another thread.
2647   */
2648   struct request_info req;
2649   request_init(&req, RQ_DAEMON, "rrdcache\0", RQ_FILE, fd, NULL );
2650   fromhost(&req);
2651   if(!hosts_access(&req)) {
2652     RRDD_LOG(LOG_INFO, "refused connection from %s", eval_client(&req));
2653     pthread_mutex_unlock (&connection_threads_lock);
2654     close_connection(sock);
2655     return NULL;
2656   }
2657 #endif /* HAVE_LIBWRAP */
2658   connection_threads_num++;
2659   pthread_mutex_unlock (&connection_threads_lock);
2660
2661   while (state == RUNNING)
2662   {
2663     char *cmd;
2664     ssize_t cmd_len;
2665     ssize_t rbytes;
2666     time_t now;
2667
2668     struct pollfd pollfd;
2669     int status;
2670
2671     pollfd.fd = fd;
2672     pollfd.events = POLLIN | POLLPRI;
2673     pollfd.revents = 0;
2674
2675     status = poll (&pollfd, 1, /* timeout = */ 500);
2676     if (state != RUNNING)
2677       break;
2678     else if (status == 0) /* timeout */
2679       continue;
2680     else if (status < 0) /* error */
2681     {
2682       status = errno;
2683       if (status != EINTR)
2684         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2685       continue;
2686     }
2687
2688     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2689       break;
2690     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2691     {
2692       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2693           "poll(2) returned something unexpected: %#04hx",
2694           pollfd.revents);
2695       break;
2696     }
2697
2698     rbytes = read(fd, sock->rbuf + sock->next_read,
2699                   RBUF_SIZE - sock->next_read);
2700     if (rbytes < 0)
2701     {
2702       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2703       break;
2704     }
2705     else if (rbytes == 0)
2706       break; /* eof */
2707
2708     sock->next_read += rbytes;
2709
2710     if (sock->batch_start)
2711       now = sock->batch_start;
2712     else
2713       now = time(NULL);
2714
2715     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2716     {
2717       status = handle_request (sock, now, cmd, cmd_len+1);
2718       if (status != 0)
2719         goto out_close;
2720     }
2721   }
2722
2723 out_close:
2724   close_connection(sock);
2725
2726   /* Remove this thread from the connection threads list */
2727   pthread_mutex_lock (&connection_threads_lock);
2728   connection_threads_num--;
2729   if (connection_threads_num <= 0)
2730     pthread_cond_broadcast(&connection_threads_done);
2731   pthread_mutex_unlock (&connection_threads_lock);
2732
2733   return (NULL);
2734 } /* }}} void *connection_thread_main */
2735
2736 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2737 {
2738   int fd;
2739   struct sockaddr_un sa;
2740   listen_socket_t *temp;
2741   int status;
2742   const char *path;
2743   char *path_copy, *dir;
2744
2745   path = sock->addr;
2746   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2747     path += strlen("unix:");
2748
2749   /* dirname may modify its argument */
2750   path_copy = strdup(path);
2751   if (path_copy == NULL)
2752   {
2753     fprintf(stderr, "rrdcached: strdup(): %s\n",
2754         rrd_strerror(errno));
2755     return (-1);
2756   }
2757
2758   dir = dirname(path_copy);
2759   if (rrd_mkdir_p(dir, 0777) != 0)
2760   {
2761     fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2762         dir, rrd_strerror(errno));
2763     return (-1);
2764   }
2765
2766   free(path_copy);
2767
2768   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2769       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2770   if (temp == NULL)
2771   {
2772     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2773     return (-1);
2774   }
2775   listen_fds = temp;
2776   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2777
2778   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2779   if (fd < 0)
2780   {
2781     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2782              rrd_strerror(errno));
2783     return (-1);
2784   }
2785
2786   memset (&sa, 0, sizeof (sa));
2787   sa.sun_family = AF_UNIX;
2788   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2789
2790   /* if we've gotten this far, we own the pid file.  any daemon started
2791    * with the same args must not be alive.  therefore, ensure that we can
2792    * create the socket...
2793    */
2794   unlink(path);
2795
2796   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2797   if (status != 0)
2798   {
2799     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2800              path, rrd_strerror(errno));
2801     close (fd);
2802     return (-1);
2803   }
2804
2805   /* tweak the sockets group ownership */
2806   if (sock->socket_group != (gid_t)-1)
2807   {
2808     if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2809          (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2810     {
2811       fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2812     }
2813   }
2814
2815   if (sock->socket_permissions != (mode_t)-1)
2816   {
2817     if (chmod(path, sock->socket_permissions) != 0)
2818       fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2819           (unsigned int)sock->socket_permissions, strerror(errno));
2820   }
2821
2822   status = listen (fd, /* backlog = */ 10);
2823   if (status != 0)
2824   {
2825     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2826              path, rrd_strerror(errno));
2827     close (fd);
2828     unlink (path);
2829     return (-1);
2830   }
2831
2832   listen_fds[listen_fds_num].fd = fd;
2833   listen_fds[listen_fds_num].family = PF_UNIX;
2834   strncpy(listen_fds[listen_fds_num].addr, path,
2835           sizeof (listen_fds[listen_fds_num].addr) - 1);
2836   listen_fds_num++;
2837
2838   return (0);
2839 } /* }}} int open_listen_socket_unix */
2840
2841 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2842 {
2843   struct addrinfo ai_hints;
2844   struct addrinfo *ai_res;
2845   struct addrinfo *ai_ptr;
2846   char addr_copy[NI_MAXHOST];
2847   char *addr;
2848   char *port;
2849   int status;
2850
2851   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2852   addr_copy[sizeof (addr_copy) - 1] = 0;
2853   addr = addr_copy;
2854
2855   memset (&ai_hints, 0, sizeof (ai_hints));
2856   ai_hints.ai_flags = 0;
2857 #ifdef AI_ADDRCONFIG
2858   ai_hints.ai_flags |= AI_ADDRCONFIG;
2859 #endif
2860   ai_hints.ai_family = AF_UNSPEC;
2861   ai_hints.ai_socktype = SOCK_STREAM;
2862
2863   port = NULL;
2864   if (*addr == '[') /* IPv6+port format */
2865   {
2866     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2867     addr++;
2868
2869     port = strchr (addr, ']');
2870     if (port == NULL)
2871     {
2872       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2873       return (-1);
2874     }
2875     *port = 0;
2876     port++;
2877
2878     if (*port == ':')
2879       port++;
2880     else if (*port == 0)
2881       port = NULL;
2882     else
2883     {
2884       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2885       return (-1);
2886     }
2887   } /* if (*addr == '[') */
2888   else
2889   {
2890     port = rindex(addr, ':');
2891     if (port != NULL)
2892     {
2893       *port = 0;
2894       port++;
2895     }
2896   }
2897   ai_res = NULL;
2898   status = getaddrinfo (addr,
2899                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2900                         &ai_hints, &ai_res);
2901   if (status != 0)
2902   {
2903     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2904              addr, gai_strerror (status));
2905     return (-1);
2906   }
2907
2908   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2909   {
2910     int fd;
2911     listen_socket_t *temp;
2912     int one = 1;
2913
2914     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2915         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2916     if (temp == NULL)
2917     {
2918       fprintf (stderr,
2919                "rrdcached: open_listen_socket_network: realloc failed.\n");
2920       continue;
2921     }
2922     listen_fds = temp;
2923     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2924
2925     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2926     if (fd < 0)
2927     {
2928       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2929                rrd_strerror(errno));
2930       continue;
2931     }
2932
2933     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2934
2935     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2936     if (status != 0)
2937     {
2938       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2939                sock->addr, rrd_strerror(errno));
2940       close (fd);
2941       continue;
2942     }
2943
2944     status = listen (fd, /* backlog = */ 10);
2945     if (status != 0)
2946     {
2947       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2948                sock->addr, rrd_strerror(errno));
2949       close (fd);
2950       freeaddrinfo(ai_res);
2951       return (-1);
2952     }
2953
2954     listen_fds[listen_fds_num].fd = fd;
2955     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2956     listen_fds_num++;
2957   } /* for (ai_ptr) */
2958
2959   freeaddrinfo(ai_res);
2960   return (0);
2961 } /* }}} static int open_listen_socket_network */
2962
2963 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2964 {
2965   assert(sock != NULL);
2966   assert(sock->addr != NULL);
2967
2968   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2969       || sock->addr[0] == '/')
2970     return (open_listen_socket_unix(sock));
2971   else
2972     return (open_listen_socket_network(sock));
2973 } /* }}} int open_listen_socket */
2974
2975 static int close_listen_sockets (void) /* {{{ */
2976 {
2977   size_t i;
2978
2979   for (i = 0; i < listen_fds_num; i++)
2980   {
2981     close (listen_fds[i].fd);
2982
2983     if (listen_fds[i].family == PF_UNIX)
2984       unlink(listen_fds[i].addr);
2985   }
2986
2987   free (listen_fds);
2988   listen_fds = NULL;
2989   listen_fds_num = 0;
2990
2991   return (0);
2992 } /* }}} int close_listen_sockets */
2993
2994 static void *listen_thread_main (void UNUSED(*args)) /* {{{ */
2995 {
2996   struct pollfd *pollfds;
2997   int pollfds_num;
2998   int status;
2999   int i;
3000
3001   if (listen_fds_num < 1)
3002   {
3003     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
3004     return (NULL);
3005   }
3006
3007   pollfds_num = listen_fds_num;
3008   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
3009   if (pollfds == NULL)
3010   {
3011     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
3012     return (NULL);
3013   }
3014   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
3015
3016   RRDD_LOG(LOG_INFO, "listening for connections");
3017
3018   while (state == RUNNING)
3019   {
3020     for (i = 0; i < pollfds_num; i++)
3021     {
3022       pollfds[i].fd = listen_fds[i].fd;
3023       pollfds[i].events = POLLIN | POLLPRI;
3024       pollfds[i].revents = 0;
3025     }
3026
3027     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
3028     if (state != RUNNING)
3029       break;
3030     else if (status == 0) /* timeout */
3031       continue;
3032     else if (status < 0) /* error */
3033     {
3034       status = errno;
3035       if (status != EINTR)
3036       {
3037         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
3038       }
3039       continue;
3040     }
3041
3042     for (i = 0; i < pollfds_num; i++)
3043     {
3044       listen_socket_t *client_sock;
3045       struct sockaddr_storage client_sa;
3046       socklen_t client_sa_size;
3047       pthread_t tid;
3048       pthread_attr_t attr;
3049
3050       if (pollfds[i].revents == 0)
3051         continue;
3052
3053       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
3054       {
3055         RRDD_LOG (LOG_ERR, "listen_thread_main: "
3056             "poll(2) returned something unexpected for listen FD #%i.",
3057             pollfds[i].fd);
3058         continue;
3059       }
3060
3061       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
3062       if (client_sock == NULL)
3063       {
3064         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
3065         continue;
3066       }
3067       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
3068
3069       client_sa_size = sizeof (client_sa);
3070       client_sock->fd = accept (pollfds[i].fd,
3071           (struct sockaddr *) &client_sa, &client_sa_size);
3072       if (client_sock->fd < 0)
3073       {
3074         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
3075         free(client_sock);
3076         continue;
3077       }
3078
3079       pthread_attr_init (&attr);
3080       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
3081
3082       status = pthread_create (&tid, &attr, connection_thread_main,
3083                                client_sock);
3084       if (status != 0)
3085       {
3086         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
3087         close_connection(client_sock);
3088         continue;
3089       }
3090     } /* for (pollfds_num) */
3091   } /* while (state == RUNNING) */
3092
3093   RRDD_LOG(LOG_INFO, "starting shutdown");
3094
3095   close_listen_sockets ();
3096
3097   pthread_mutex_lock (&connection_threads_lock);
3098   while (connection_threads_num > 0)
3099     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
3100   pthread_mutex_unlock (&connection_threads_lock);
3101
3102   free(pollfds);
3103
3104   return (NULL);
3105 } /* }}} void *listen_thread_main */
3106
3107 static int daemonize (void) /* {{{ */
3108 {
3109   int pid_fd;
3110   char *base_dir;
3111
3112   daemon_uid = geteuid();
3113
3114   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
3115   if (pid_fd < 0)
3116     pid_fd = check_pidfile();
3117   if (pid_fd < 0)
3118     return pid_fd;
3119
3120   /* open all the listen sockets */
3121   if (config_listen_address_list_len > 0)
3122   {
3123     for (size_t i = 0; i < config_listen_address_list_len; i++)
3124       open_listen_socket (config_listen_address_list[i]);
3125
3126     rrd_free_ptrs((void ***) &config_listen_address_list,
3127                   &config_listen_address_list_len);
3128   }
3129   else
3130   {
3131     strncpy(default_socket.addr, RRDCACHED_DEFAULT_ADDRESS,
3132         sizeof(default_socket.addr) - 1);
3133     default_socket.addr[sizeof(default_socket.addr) - 1] = '\0';
3134     open_listen_socket (&default_socket);
3135   }
3136
3137   if (listen_fds_num < 1)
3138   {
3139     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
3140     goto error;
3141   }
3142
3143   if (!stay_foreground)
3144   {
3145     pid_t child;
3146
3147     child = fork ();
3148     if (child < 0)
3149     {
3150       fprintf (stderr, "daemonize: fork(2) failed.\n");
3151       goto error;
3152     }
3153     else if (child > 0)
3154       exit(0);
3155
3156     /* Become session leader */
3157     setsid ();
3158
3159     /* Open the first three file descriptors to /dev/null */
3160     close (2);
3161     close (1);
3162     close (0);
3163
3164     open ("/dev/null", O_RDWR);
3165     if (dup(0) == -1 || dup(0) == -1){
3166         RRDD_LOG (LOG_ERR, "faild to run dup.\n");
3167     }
3168   } /* if (!stay_foreground) */
3169
3170   /* Change into the /tmp directory. */
3171   base_dir = (config_base_dir != NULL)
3172     ? config_base_dir
3173     : "/tmp";
3174
3175   if (chdir (base_dir) != 0)
3176   {
3177     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
3178     goto error;
3179   }
3180
3181   install_signal_handlers();
3182
3183   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
3184   RRDD_LOG(LOG_INFO, "starting up");
3185
3186   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
3187                                 (GDestroyNotify) free_cache_item);
3188   if (cache_tree == NULL)
3189   {
3190     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
3191     goto error;
3192   }
3193
3194   return write_pidfile (pid_fd);
3195
3196 error:
3197   remove_pidfile();
3198   return -1;
3199 } /* }}} int daemonize */
3200
3201 static int cleanup (void) /* {{{ */
3202 {
3203   pthread_cond_broadcast (&flush_cond);
3204   pthread_join (flush_thread, NULL);
3205
3206   pthread_cond_broadcast (&queue_cond);
3207   for (int i = 0; i < config_queue_threads; i++)
3208     pthread_join (queue_threads[i], NULL);
3209
3210   if (config_flush_at_shutdown)
3211   {
3212     assert(cache_queue_head == NULL);
3213     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
3214   }
3215
3216   free(queue_threads);
3217   free(config_base_dir);
3218
3219   pthread_mutex_lock(&cache_lock);
3220   g_tree_destroy(cache_tree);
3221
3222   pthread_mutex_lock(&journal_lock);
3223   journal_done();
3224
3225   RRDD_LOG(LOG_INFO, "goodbye");
3226   closelog ();
3227
3228   remove_pidfile ();
3229   free(config_pid_file);
3230
3231   return (0);
3232 } /* }}} int cleanup */
3233
3234 static int read_options (int argc, char **argv) /* {{{ */
3235 {
3236   int option;
3237   int status = 0;
3238
3239   socket_permission_clear (&default_socket);
3240
3241   default_socket.socket_group = (gid_t)-1;
3242   default_socket.socket_permissions = (mode_t)-1;
3243
3244   while ((option = getopt(argc, argv, "Ogl:s:m:P:f:w:z:t:Bb:p:Fj:a:h?")) != -1)
3245   {
3246     switch (option)
3247     {
3248       case 'O':
3249         opt_no_overwrite = 1;
3250         break;
3251
3252       case 'g':
3253         stay_foreground=1;
3254         break;
3255
3256       case 'l':
3257       {
3258         listen_socket_t *new;
3259
3260         new = malloc(sizeof(listen_socket_t));
3261         if (new == NULL)
3262         {
3263           fprintf(stderr, "read_options: malloc failed.\n");
3264           return(2);
3265         }
3266         memset(new, 0, sizeof(listen_socket_t));
3267
3268         strncpy(new->addr, optarg, sizeof(new->addr)-1);
3269
3270         /* Add permissions to the socket {{{ */
3271         if (default_socket.permissions != 0)
3272         {
3273           socket_permission_copy (new, &default_socket);
3274         }
3275         else /* if (default_socket.permissions == 0) */
3276         {
3277           /* Add permission for ALL commands to the socket. */
3278           size_t i;
3279           for (i = 0; i < list_of_commands_len; i++)
3280           {
3281             status = socket_permission_add (new, list_of_commands[i].cmd);
3282             if (status != 0)
3283             {
3284               fprintf (stderr, "read_options: Adding permission \"%s\" to "
3285                   "socket failed. This should never happen, ever! Sorry.\n",
3286                   list_of_commands[i].cmd);
3287               status = 4;
3288             }
3289           }
3290         }
3291         /* }}} Done adding permissions. */
3292
3293         new->socket_group = default_socket.socket_group;
3294         new->socket_permissions = default_socket.socket_permissions;
3295
3296         if (!rrd_add_ptr((void ***)&config_listen_address_list,
3297                          &config_listen_address_list_len, new))
3298         {
3299           fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
3300           return (2);
3301         }
3302       }
3303       break;
3304
3305       /* set socket group permissions */
3306       case 's':
3307       {
3308         gid_t group_gid;
3309         struct group *grp;
3310
3311         group_gid = strtoul(optarg, NULL, 10);
3312         if (errno != EINVAL && group_gid>0)
3313         {
3314           /* we were passed a number */
3315           grp = getgrgid(group_gid);
3316         }
3317         else
3318         {
3319           grp = getgrnam(optarg);
3320         }
3321
3322         if (grp)
3323         {
3324           default_socket.socket_group = grp->gr_gid;
3325         }
3326         else
3327         {
3328           /* no idea what the user wanted... */
3329           fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
3330           return (5);
3331         }
3332       }
3333       break;
3334
3335       /* set socket file permissions */
3336       case 'm':
3337       {
3338         long  tmp;
3339         char *endptr = NULL;
3340
3341         tmp = strtol (optarg, &endptr, 8);
3342         if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
3343             || (tmp > 07777) || (tmp < 0)) {
3344           fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
3345               optarg);
3346           return (5);
3347         }
3348
3349         default_socket.socket_permissions = (mode_t)tmp;
3350       }
3351       break;
3352
3353       case 'P':
3354       {
3355         char *optcopy;
3356         char *saveptr;
3357         char *dummy;
3358         char *ptr;
3359
3360         socket_permission_clear (&default_socket);
3361
3362         optcopy = strdup (optarg);
3363         dummy = optcopy;
3364         saveptr = NULL;
3365         while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
3366         {
3367           dummy = NULL;
3368           status = socket_permission_add (&default_socket, ptr);
3369           if (status != 0)
3370           {
3371             fprintf (stderr, "read_options: Adding permission \"%s\" to "
3372                 "socket failed. Most likely, this permission doesn't "
3373                 "exist. Check your command line.\n", ptr);
3374             status = 4;
3375           }
3376         }
3377
3378         free (optcopy);
3379       }
3380       break;
3381
3382       case 'f':
3383       {
3384         int temp;
3385
3386         temp = atoi (optarg);
3387         if (temp > 0)
3388           config_flush_interval = temp;
3389         else
3390         {
3391           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
3392           status = 3;
3393         }
3394       }
3395       break;
3396
3397       case 'w':
3398       {
3399         int temp;
3400
3401         temp = atoi (optarg);
3402         if (temp > 0)
3403           config_write_interval = temp;
3404         else
3405         {
3406           fprintf (stderr, "Invalid write interval: %s\n", optarg);
3407           status = 2;
3408         }
3409       }
3410       break;
3411
3412       case 'z':
3413       {
3414         int temp;
3415
3416         temp = atoi(optarg);
3417         if (temp > 0)
3418           config_write_jitter = temp;
3419         else
3420         {
3421           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
3422           status = 2;
3423         }
3424
3425         break;
3426       }
3427
3428       case 't':
3429       {
3430         int threads;
3431         threads = atoi(optarg);
3432         if (threads >= 1)
3433           config_queue_threads = threads;
3434         else
3435         {
3436           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
3437           return 1;
3438         }
3439       }
3440       break;
3441
3442       case 'B':
3443         config_write_base_only = 1;
3444         break;
3445
3446       case 'b':
3447       {
3448         size_t len;
3449         char base_realpath[PATH_MAX];
3450
3451         if (config_base_dir != NULL)
3452           free (config_base_dir);
3453         config_base_dir = strdup (optarg);
3454         if (config_base_dir == NULL)
3455         {
3456           fprintf (stderr, "read_options: strdup failed.\n");
3457           return (3);
3458         }
3459
3460         if (rrd_mkdir_p (config_base_dir, 0777) != 0)
3461         {
3462           fprintf (stderr, "Failed to create base directory '%s': %s\n",
3463               config_base_dir, rrd_strerror (errno));
3464           return (3);
3465         }
3466
3467         /* make sure that the base directory is not resolved via
3468          * symbolic links.  this makes some performance-enhancing
3469          * assumptions possible (we don't have to resolve paths
3470          * that start with a "/")
3471          */
3472         if (realpath(config_base_dir, base_realpath) == NULL)
3473         {
3474           fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3475               "%s\n", config_base_dir, rrd_strerror(errno));
3476           return 5;
3477         }
3478
3479         len = strlen (config_base_dir);
3480         while ((len > 0) && (config_base_dir[len - 1] == '/'))
3481         {
3482           config_base_dir[len - 1] = 0;
3483           len--;
3484         }
3485
3486         if (len < 1)
3487         {
3488           fprintf (stderr, "Invalid base directory: %s\n", optarg);
3489           return (4);
3490         }
3491
3492         _config_base_dir_len = len;
3493
3494         len = strlen (base_realpath);
3495         while ((len > 0) && (base_realpath[len - 1] == '/'))
3496         {
3497           base_realpath[len - 1] = '\0';
3498           len--;
3499         }
3500
3501         if (strncmp(config_base_dir,
3502                          base_realpath, sizeof(base_realpath)) != 0)
3503         {
3504           fprintf(stderr,
3505                   "Base directory (-b) resolved via file system links!\n"
3506                   "Please consult rrdcached '-b' documentation!\n"
3507                   "Consider specifying the real directory (%s)\n",
3508                   base_realpath);
3509           return 5;
3510         }
3511       }
3512       break;
3513
3514       case 'p':
3515       {
3516         if (config_pid_file != NULL)
3517           free (config_pid_file);
3518         config_pid_file = strdup (optarg);
3519         if (config_pid_file == NULL)
3520         {
3521           fprintf (stderr, "read_options: strdup failed.\n");
3522           return (3);
3523         }
3524       }
3525       break;
3526
3527       case 'F':
3528         config_flush_at_shutdown = 1;
3529         break;
3530
3531       case 'j':
3532       {
3533         char journal_dir_actual[PATH_MAX];
3534         const char *dir;
3535         dir = journal_dir = strdup(realpath((const char *)optarg, journal_dir_actual));
3536
3537         status = rrd_mkdir_p(dir, 0777);
3538         if (status != 0)
3539         {
3540           fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3541               dir, rrd_strerror(errno));
3542           return 6;
3543         }
3544
3545         if (access(dir, R_OK|W_OK|X_OK) != 0)
3546         {
3547           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3548                   errno ? rrd_strerror(errno) : "");
3549           return 6;
3550         }
3551       }
3552       break;
3553
3554       case 'a':
3555       {
3556         int temp = atoi(optarg);
3557         if (temp > 0)
3558           config_alloc_chunk = temp;
3559         else
3560         {
3561           fprintf(stderr, "Invalid allocation size: %s\n", optarg);
3562           return 10;
3563         }
3564       }
3565       break;
3566
3567       case 'h':
3568       case '?':
3569         printf ("RRDCacheD %s\n"
3570             "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3571             "\n"
3572             "Usage: rrdcached [options]\n"
3573             "\n"
3574             "Valid options are:\n"
3575             "  -l <address>  Socket address to listen to.\n"
3576             "  -P <perms>    Sets the permissions to assign to all following "
3577                             "sockets\n"
3578             "  -w <seconds>  Interval in which to write data.\n"
3579             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
3580             "  -t <threads>  Number of write threads.\n"
3581             "  -f <seconds>  Interval in which to flush dead data.\n"
3582             "  -p <file>     Location of the PID-file.\n"
3583             "  -b <dir>      Base directory to change to.\n"
3584             "  -B            Restrict file access to paths within -b <dir>\n"
3585             "  -g            Do not fork and run in the foreground.\n"
3586             "  -j <dir>      Directory in which to create the journal files.\n"
3587             "  -F            Always flush all updates at shutdown\n"
3588             "  -s <id|name>  Group owner of all following UNIX sockets\n"
3589             "                (the socket will also have read/write permissions "
3590                             "for that group)\n"
3591             "  -m <mode>     File permissions (octal) of all following UNIX "
3592                             "sockets\n"
3593             "  -a <size>     Memory allocation chunk size. Default is 1.\n"
3594             "  -O            Do not allow CREATE commands to overwrite existing\n"
3595             "                files, even if asked to.\n"
3596             "\n"
3597             "For more information and a detailed description of all options "
3598             "please refer\n"
3599             "to the rrdcached(1) manual page.\n",
3600             VERSION);
3601         if (option == 'h')
3602           status = -1;
3603         else
3604           status = 1;
3605         break;
3606     } /* switch (option) */
3607   } /* while (getopt) */
3608
3609   /* advise the user when values are not sane */
3610   if (config_flush_interval < 2 * config_write_interval)
3611     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3612             " 2x write interval (-w) !\n");
3613   if (config_write_jitter > config_write_interval)
3614     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3615             " write interval (-w) !\n");
3616
3617   if (config_write_base_only && config_base_dir == NULL)
3618     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3619             "  Consult the rrdcached documentation\n");
3620
3621   if (journal_dir == NULL)
3622     config_flush_at_shutdown = 1;
3623
3624   return (status);
3625 } /* }}} int read_options */
3626
3627 int main (int argc, char **argv)
3628 {
3629   int status;
3630
3631   status = read_options (argc, argv);
3632   if (status != 0)
3633   {
3634     if (status < 0)
3635       status = 0;
3636     return (status);
3637   }
3638
3639   status = daemonize ();
3640   if (status != 0)
3641   {
3642     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3643     return (1);
3644   }
3645
3646   journal_init();
3647
3648   /* start the queue threads */
3649   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3650   if (queue_threads == NULL)
3651   {
3652     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3653     cleanup();
3654     return (1);
3655   }
3656   for (int i = 0; i < config_queue_threads; i++)
3657   {
3658     memset (&queue_threads[i], 0, sizeof (*queue_threads));
3659     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3660     if (status != 0)
3661     {
3662       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3663       cleanup();
3664       return (1);
3665     }
3666   }
3667
3668   /* start the flush thread */
3669   memset(&flush_thread, 0, sizeof(flush_thread));
3670   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3671   if (status != 0)
3672   {
3673     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3674     cleanup();
3675     return (1);
3676   }
3677
3678   listen_thread_main (NULL);
3679   cleanup ();
3680
3681   return (0);
3682 } /* int main */
3683
3684 /*
3685  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3686  */