fix flush race in rrdcached -- Christian Hitz
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008-2010 Florian octo Forster
4  * Copyright (C) 2008,2009 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66
67 #include "rrd_tool.h"
68 #include "rrd_client.h"
69 #include "unused.h"
70
71 #include <stdlib.h>
72
73 #ifndef WIN32
74 #ifdef HAVE_STDINT_H
75 #  include <stdint.h>
76 #endif
77 #include <unistd.h>
78 #include <strings.h>
79 #include <inttypes.h>
80 #include <sys/socket.h>
81
82 #else
83
84 #endif
85 #include <stdio.h>
86 #include <string.h>
87
88 #include <sys/types.h>
89 #include <sys/stat.h>
90 #include <dirent.h>
91 #include <fcntl.h>
92 #include <signal.h>
93 #include <sys/un.h>
94 #include <netdb.h>
95 #include <poll.h>
96 #include <syslog.h>
97 #include <pthread.h>
98 #include <errno.h>
99 #include <assert.h>
100 #include <sys/time.h>
101 #include <time.h>
102 #include <libgen.h>
103 #include <grp.h>
104
105 #ifdef HAVE_LIBWRAP
106 #include <tcpd.h>
107 #endif /* HAVE_LIBWRAP */
108
109 #include <glib-2.0/glib.h>
110 /* }}} */
111
112 #define RRDD_LOG(severity, ...) \
113   do { \
114     if (stay_foreground) { \
115       fprintf(stderr, __VA_ARGS__); \
116       fprintf(stderr, "\n"); } \
117     syslog ((severity), __VA_ARGS__); \
118   } while (0)
119
120 /*
121  * Types
122  */
123 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
124
125 struct listen_socket_s
126 {
127   int fd;
128   char addr[PATH_MAX + 1];
129   int family;
130
131   /* state for BATCH processing */
132   time_t batch_start;
133   int batch_cmd;
134
135   /* buffered IO */
136   char *rbuf;
137   off_t next_cmd;
138   off_t next_read;
139
140   char *wbuf;
141   ssize_t wbuf_len;
142
143   uint32_t permissions;
144
145   gid_t  socket_group;
146   mode_t socket_permissions;
147 };
148 typedef struct listen_socket_s listen_socket_t;
149
150 struct command_s;
151 typedef struct command_s command_t;
152 /* note: guard against "unused" warnings in the handlers */
153 #define DISPATCH_PROTO  listen_socket_t UNUSED(*sock),\
154                         time_t UNUSED(now),\
155                         char  UNUSED(*buffer),\
156                         size_t UNUSED(buffer_size)
157
158 #define HANDLER_PROTO   command_t UNUSED(*cmd),\
159                         DISPATCH_PROTO
160
161 struct command_s {
162   char   *cmd;
163   int (*handler)(HANDLER_PROTO);
164
165   char  context;                /* where we expect to see it */
166 #define CMD_CONTEXT_CLIENT      (1<<0)
167 #define CMD_CONTEXT_BATCH       (1<<1)
168 #define CMD_CONTEXT_JOURNAL     (1<<2)
169 #define CMD_CONTEXT_ANY         (0x7f)
170
171   char *syntax;
172   char *help;
173 };
174
175 struct cache_item_s;
176 typedef struct cache_item_s cache_item_t;
177 struct cache_item_s
178 {
179   char *file;
180   char **values;
181   size_t values_num;            /* number of valid pointers */
182   size_t values_alloc;          /* number of allocated pointers */
183   time_t last_flush_time;
184   double last_update_stamp;
185 #define CI_FLAGS_IN_TREE  (1<<0)
186 #define CI_FLAGS_IN_QUEUE (1<<1)
187   int flags;
188   pthread_cond_t  flushed;
189   cache_item_t *prev;
190   cache_item_t *next;
191 };
192
193 struct callback_flush_data_s
194 {
195   time_t now;
196   time_t abs_timeout;
197   char **keys;
198   size_t keys_num;
199 };
200 typedef struct callback_flush_data_s callback_flush_data_t;
201
202 enum queue_side_e
203 {
204   HEAD,
205   TAIL
206 };
207 typedef enum queue_side_e queue_side_t;
208
209 /* describe a set of journal files */
210 typedef struct {
211   char **files;
212   size_t files_num;
213 } journal_set;
214
215 #define RBUF_SIZE (RRD_CMD_MAX*2)
216
217 /*
218  * Variables
219  */
220 static int stay_foreground = 0;
221 static uid_t daemon_uid;
222
223 static listen_socket_t *listen_fds = NULL;
224 static size_t listen_fds_num = 0;
225
226 static listen_socket_t default_socket;
227
228 enum {
229   RUNNING,              /* normal operation */
230   FLUSHING,             /* flushing remaining values */
231   SHUTDOWN              /* shutting down */
232 } state = RUNNING;
233
234 static pthread_t *queue_threads;
235 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
236 static int config_queue_threads = 4;
237
238 static pthread_t flush_thread;
239 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
240
241 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
242 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
243 static int connection_threads_num = 0;
244
245 /* Cache stuff */
246 static GTree          *cache_tree = NULL;
247 static cache_item_t   *cache_queue_head = NULL;
248 static cache_item_t   *cache_queue_tail = NULL;
249 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
250
251 static int config_write_interval = 300;
252 static int config_write_jitter   = 0;
253 static int config_flush_interval = 3600;
254 static int config_flush_at_shutdown = 0;
255 static char *config_pid_file = NULL;
256 static char *config_base_dir = NULL;
257 static size_t _config_base_dir_len = 0;
258 static int config_write_base_only = 0;
259 static size_t config_alloc_chunk = 1;
260
261 static listen_socket_t **config_listen_address_list = NULL;
262 static size_t config_listen_address_list_len = 0;
263
264 static uint64_t stats_queue_length = 0;
265 static uint64_t stats_updates_received = 0;
266 static uint64_t stats_flush_received = 0;
267 static uint64_t stats_updates_written = 0;
268 static uint64_t stats_data_sets_written = 0;
269 static uint64_t stats_journal_bytes = 0;
270 static uint64_t stats_journal_rotate = 0;
271 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
272
273 static int opt_no_overwrite = 0; /* default for the daemon */
274
275 /* Journaled updates */
276 #define JOURNAL_REPLAY(s) ((s) == NULL)
277 #define JOURNAL_BASE "rrd.journal"
278 static journal_set *journal_cur = NULL;
279 static journal_set *journal_old = NULL;
280 static char *journal_dir = NULL;
281 static FILE *journal_fh = NULL;         /* current journal file handle */
282 static long  journal_size = 0;          /* current journal size */
283 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
284 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
285 static int journal_write(char *cmd, char *args);
286 static void journal_done(void);
287 static void journal_rotate(void);
288
289 /* prototypes for forward refernces */
290 static int handle_request_help (HANDLER_PROTO);
291
292 /* 
293  * Functions
294  */
295 static void sig_common (const char *sig) /* {{{ */
296 {
297   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
298   if (state == RUNNING) {
299       state = FLUSHING;
300   }
301   pthread_cond_broadcast(&flush_cond);
302   pthread_cond_broadcast(&queue_cond);
303 } /* }}} void sig_common */
304
305 static void sig_int_handler (int UNUSED(s)) /* {{{ */
306 {
307   sig_common("INT");
308 } /* }}} void sig_int_handler */
309
310 static void sig_term_handler (int UNUSED(s)) /* {{{ */
311 {
312   sig_common("TERM");
313 } /* }}} void sig_term_handler */
314
315 static void sig_usr1_handler (int UNUSED(s)) /* {{{ */
316 {
317   config_flush_at_shutdown = 1;
318   sig_common("USR1");
319 } /* }}} void sig_usr1_handler */
320
321 static void sig_usr2_handler (int UNUSED(s)) /* {{{ */
322 {
323   config_flush_at_shutdown = 0;
324   sig_common("USR2");
325 } /* }}} void sig_usr2_handler */
326
327 static void install_signal_handlers(void) /* {{{ */
328 {
329   /* These structures are static, because `sigaction' behaves weird if the are
330    * overwritten.. */
331   static struct sigaction sa_int;
332   static struct sigaction sa_term;
333   static struct sigaction sa_pipe;
334   static struct sigaction sa_usr1;
335   static struct sigaction sa_usr2;
336
337   /* Install signal handlers */
338   memset (&sa_int, 0, sizeof (sa_int));
339   sa_int.sa_handler = sig_int_handler;
340   sigaction (SIGINT, &sa_int, NULL);
341
342   memset (&sa_term, 0, sizeof (sa_term));
343   sa_term.sa_handler = sig_term_handler;
344   sigaction (SIGTERM, &sa_term, NULL);
345
346   memset (&sa_pipe, 0, sizeof (sa_pipe));
347   sa_pipe.sa_handler = SIG_IGN;
348   sigaction (SIGPIPE, &sa_pipe, NULL);
349
350   memset (&sa_pipe, 0, sizeof (sa_usr1));
351   sa_usr1.sa_handler = sig_usr1_handler;
352   sigaction (SIGUSR1, &sa_usr1, NULL);
353
354   memset (&sa_usr2, 0, sizeof (sa_usr2));
355   sa_usr2.sa_handler = sig_usr2_handler;
356   sigaction (SIGUSR2, &sa_usr2, NULL);
357
358 } /* }}} void install_signal_handlers */
359
360 static int open_pidfile(char *action, int oflag) /* {{{ */
361 {
362   int fd;
363   const char *file;
364   char *file_copy, *dir;
365
366   file = (config_pid_file != NULL)
367     ? config_pid_file
368     : LOCALSTATEDIR "/run/rrdcached.pid";
369
370   /* dirname may modify its argument */
371   file_copy = strdup(file);
372   if (file_copy == NULL)
373   {
374     fprintf(stderr, "rrdcached: strdup(): %s\n",
375         rrd_strerror(errno));
376     return -1;
377   }
378
379   dir = dirname(file_copy);
380   if (rrd_mkdir_p(dir, 0777) != 0)
381   {
382     fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
383         dir, rrd_strerror(errno));
384     return -1;
385   }
386
387   free(file_copy);
388
389   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
390   if (fd < 0)
391     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
392             action, file, rrd_strerror(errno));
393
394   return(fd);
395 } /* }}} static int open_pidfile */
396
397 /* check existing pid file to see whether a daemon is running */
398 static int check_pidfile(void)
399 {
400   int pid_fd;
401   pid_t pid;
402   char pid_str[16];
403
404   pid_fd = open_pidfile("open", O_RDWR);
405   if (pid_fd < 0)
406     return pid_fd;
407
408   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
409     return -1;
410
411   pid = atoi(pid_str);
412   if (pid <= 0)
413     return -1;
414
415   /* another running process that we can signal COULD be
416    * a competing rrdcached */
417   if (pid != getpid() && kill(pid, 0) == 0)
418   {
419     fprintf(stderr,
420             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
421     close(pid_fd);
422     return -1;
423   }
424
425   lseek(pid_fd, 0, SEEK_SET);
426   if (ftruncate(pid_fd, 0) == -1)
427   {
428     fprintf(stderr,
429             "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
430     close(pid_fd);
431     return -1;
432   }
433
434   fprintf(stderr,
435           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
436           "rrdcached: starting normally.\n", pid);
437
438   return pid_fd;
439 } /* }}} static int check_pidfile */
440
441 static int write_pidfile (int fd) /* {{{ */
442 {
443   pid_t pid;
444   FILE *fh;
445
446   pid = getpid ();
447
448   fh = fdopen (fd, "w");
449   if (fh == NULL)
450   {
451     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
452     close(fd);
453     return (-1);
454   }
455
456   fprintf (fh, "%i\n", (int) pid);
457   fclose (fh);
458
459   return (0);
460 } /* }}} int write_pidfile */
461
462 static int remove_pidfile (void) /* {{{ */
463 {
464   char *file;
465   int status;
466
467   file = (config_pid_file != NULL)
468     ? config_pid_file
469     : LOCALSTATEDIR "/run/rrdcached.pid";
470
471   status = unlink (file);
472   if (status == 0)
473     return (0);
474   return (errno);
475 } /* }}} int remove_pidfile */
476
477 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
478 {
479   char *eol;
480
481   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
482                sock->next_read - sock->next_cmd);
483
484   if (eol == NULL)
485   {
486     /* no commands left, move remainder back to front of rbuf */
487     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
488             sock->next_read - sock->next_cmd);
489     sock->next_read -= sock->next_cmd;
490     sock->next_cmd = 0;
491     *len = 0;
492     return NULL;
493   }
494   else
495   {
496     char *cmd = sock->rbuf + sock->next_cmd;
497     *eol = '\0';
498
499     sock->next_cmd = eol - sock->rbuf + 1;
500
501     if (eol > sock->rbuf && *(eol-1) == '\r')
502       *(--eol) = '\0'; /* handle "\r\n" EOL */
503
504     *len = eol - cmd;
505
506     return cmd;
507   }
508
509   /* NOTREACHED */
510   assert(1==0);
511 } /* }}} char *next_cmd */
512
513 /* add the characters directly to the write buffer */
514 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
515 {
516   char *new_buf;
517
518   assert(sock != NULL);
519
520   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
521   if (new_buf == NULL)
522   {
523     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
524     return -1;
525   }
526
527   strncpy(new_buf + sock->wbuf_len, str, len + 1);
528
529   sock->wbuf = new_buf;
530   sock->wbuf_len += len;
531
532   return 0;
533 } /* }}} static int add_to_wbuf */
534
535 /* add the text to the "extra" info that's sent after the status line */
536 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
537 {
538   va_list argp;
539   char buffer[RRD_CMD_MAX];
540   int len;
541
542   if (JOURNAL_REPLAY(sock)) return 0;
543   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
544
545   va_start(argp, fmt);
546 #ifdef HAVE_VSNPRINTF
547   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
548 #else
549   len = vsprintf(buffer, fmt, argp);
550 #endif
551   va_end(argp);
552   if (len < 0)
553   {
554     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
555     return -1;
556   }
557
558   return add_to_wbuf(sock, buffer, len);
559 } /* }}} static int add_response_info */
560
561 static int count_lines(char *str) /* {{{ */
562 {
563   int lines = 0;
564
565   if (str != NULL)
566   {
567     while ((str = strchr(str, '\n')) != NULL)
568     {
569       ++lines;
570       ++str;
571     }
572   }
573
574   return lines;
575 } /* }}} static int count_lines */
576
577 /* send the response back to the user.
578  * returns 0 on success, -1 on error
579  * write buffer is always zeroed after this call */
580 static int send_response (listen_socket_t *sock, response_code rc,
581                           char *fmt, ...) /* {{{ */
582 {
583   va_list argp;
584   char buffer[RRD_CMD_MAX];
585   int lines;
586   ssize_t wrote;
587   int rclen, len;
588
589   if (JOURNAL_REPLAY(sock)) return rc;
590
591   if (sock->batch_start)
592   {
593     if (rc == RESP_OK)
594       return rc; /* no response on success during BATCH */
595     lines = sock->batch_cmd;
596   }
597   else if (rc == RESP_OK)
598     lines = count_lines(sock->wbuf);
599   else
600     lines = -1;
601
602   rclen = sprintf(buffer, "%d ", lines);
603   va_start(argp, fmt);
604 #ifdef HAVE_VSNPRINTF
605   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
606 #else
607   len = vsprintf(buffer+rclen, fmt, argp);
608 #endif
609   va_end(argp);
610   if (len < 0)
611     return -1;
612
613   len += rclen;
614
615   /* append the result to the wbuf, don't write to the user */
616   if (sock->batch_start)
617     return add_to_wbuf(sock, buffer, len);
618
619   /* first write must be complete */
620   if (len != write(sock->fd, buffer, len))
621   {
622     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
623     return -1;
624   }
625
626   if (sock->wbuf != NULL && rc == RESP_OK)
627   {
628     wrote = 0;
629     while (wrote < sock->wbuf_len)
630     {
631       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
632       if (wb <= 0)
633       {
634         RRDD_LOG(LOG_INFO, "send_response: could not write results");
635         return -1;
636       }
637       wrote += wb;
638     }
639   }
640
641   free(sock->wbuf); sock->wbuf = NULL;
642   sock->wbuf_len = 0;
643
644   return 0;
645 } /* }}} */
646
647 static void wipe_ci_values(cache_item_t *ci, time_t when)
648 {
649   ci->values = NULL;
650   ci->values_num = 0;
651   ci->values_alloc = 0;
652
653   ci->last_flush_time = when;
654   if (config_write_jitter > 0)
655     ci->last_flush_time += (rrd_random() % config_write_jitter);
656 }
657
658 /* remove_from_queue
659  * remove a "cache_item_t" item from the queue.
660  * must hold 'cache_lock' when calling this
661  */
662 static void remove_from_queue(cache_item_t *ci) /* {{{ */
663 {
664   if (ci == NULL) return;
665   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
666
667   if (ci->prev == NULL)
668     cache_queue_head = ci->next; /* reset head */
669   else
670     ci->prev->next = ci->next;
671
672   if (ci->next == NULL)
673     cache_queue_tail = ci->prev; /* reset the tail */
674   else
675     ci->next->prev = ci->prev;
676
677   ci->next = ci->prev = NULL;
678   ci->flags &= ~CI_FLAGS_IN_QUEUE;
679
680   pthread_mutex_lock (&stats_lock);
681   assert (stats_queue_length > 0);
682   stats_queue_length--;
683   pthread_mutex_unlock (&stats_lock);
684
685 } /* }}} static void remove_from_queue */
686
687 /* free the resources associated with the cache_item_t
688  * must hold cache_lock when calling this function
689  */
690 static void *free_cache_item(cache_item_t *ci) /* {{{ */
691 {
692   if (ci == NULL) return NULL;
693
694   remove_from_queue(ci);
695
696   for (size_t i=0; i < ci->values_num; i++)
697     free(ci->values[i]);
698
699   free (ci->values);
700   free (ci->file);
701
702   /* in case anyone is waiting */
703   pthread_cond_broadcast(&ci->flushed);
704   pthread_cond_destroy(&ci->flushed);
705
706   free (ci);
707
708   return NULL;
709 } /* }}} static void *free_cache_item */
710
711 /*
712  * enqueue_cache_item:
713  * `cache_lock' must be acquired before calling this function!
714  */
715 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
716     queue_side_t side)
717 {
718   if (ci == NULL)
719     return (-1);
720
721   if (ci->values_num == 0)
722     return (0);
723
724   if (side == HEAD)
725   {
726     if (cache_queue_head == ci)
727       return 0;
728
729     /* remove if further down in queue */
730     remove_from_queue(ci);
731
732     ci->prev = NULL;
733     ci->next = cache_queue_head;
734     if (ci->next != NULL)
735       ci->next->prev = ci;
736     cache_queue_head = ci;
737
738     if (cache_queue_tail == NULL)
739       cache_queue_tail = cache_queue_head;
740   }
741   else /* (side == TAIL) */
742   {
743     /* We don't move values back in the list.. */
744     if (ci->flags & CI_FLAGS_IN_QUEUE)
745       return (0);
746
747     assert (ci->next == NULL);
748     assert (ci->prev == NULL);
749
750     ci->prev = cache_queue_tail;
751
752     if (cache_queue_tail == NULL)
753       cache_queue_head = ci;
754     else
755       cache_queue_tail->next = ci;
756
757     cache_queue_tail = ci;
758   }
759
760   ci->flags |= CI_FLAGS_IN_QUEUE;
761
762   pthread_cond_signal(&queue_cond);
763   pthread_mutex_lock (&stats_lock);
764   stats_queue_length++;
765   pthread_mutex_unlock (&stats_lock);
766
767   return (0);
768 } /* }}} int enqueue_cache_item */
769
770 /*
771  * tree_callback_flush:
772  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
773  * while this is in progress.
774  */
775 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
776     gpointer data)
777 {
778   cache_item_t *ci;
779   callback_flush_data_t *cfd;
780
781   ci = (cache_item_t *) value;
782   cfd = (callback_flush_data_t *) data;
783
784   if (ci->flags & CI_FLAGS_IN_QUEUE)
785     return FALSE;
786
787   if (ci->values_num > 0
788       && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
789   {
790     enqueue_cache_item (ci, TAIL);
791   }
792   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
793       && (ci->values_num <= 0))
794   {
795     assert ((char *) key == ci->file);
796     if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
797     {
798       RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
799       return (FALSE);
800     }
801   }
802
803   return (FALSE);
804 } /* }}} gboolean tree_callback_flush */
805
806 static int flush_old_values (int max_age)
807 {
808   callback_flush_data_t cfd;
809   size_t k;
810
811   memset (&cfd, 0, sizeof (cfd));
812   /* Pass the current time as user data so that we don't need to call
813    * `time' for each node. */
814   cfd.now = time (NULL);
815   cfd.keys = NULL;
816   cfd.keys_num = 0;
817
818   if (max_age > 0)
819     cfd.abs_timeout = cfd.now - max_age;
820   else
821     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
822
823   /* `tree_callback_flush' will return the keys of all values that haven't
824    * been touched in the last `config_flush_interval' seconds in `cfd'.
825    * The char*'s in this array point to the same memory as ci->file, so we
826    * don't need to free them separately. */
827   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
828
829   for (k = 0; k < cfd.keys_num; k++)
830   {
831     gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
832     /* should never fail, since we have held the cache_lock
833      * the entire time */
834     assert(status == TRUE);
835   }
836
837   if (cfd.keys != NULL)
838   {
839     free (cfd.keys);
840     cfd.keys = NULL;
841   }
842
843   return (0);
844 } /* int flush_old_values */
845
846 static void *flush_thread_main (void UNUSED(*args)) /* {{{ */
847 {
848   struct timeval now;
849   struct timespec next_flush;
850   int status;
851
852   gettimeofday (&now, NULL);
853   next_flush.tv_sec = now.tv_sec + config_flush_interval;
854   next_flush.tv_nsec = 1000 * now.tv_usec;
855
856   pthread_mutex_lock(&cache_lock);
857
858   while (state == RUNNING)
859   {
860     gettimeofday (&now, NULL);
861     if ((now.tv_sec > next_flush.tv_sec)
862         || ((now.tv_sec == next_flush.tv_sec)
863           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
864     {
865       RRDD_LOG(LOG_DEBUG, "flushing old values");
866
867       /* Determine the time of the next cache flush. */
868       next_flush.tv_sec = now.tv_sec + config_flush_interval;
869
870       /* Flush all values that haven't been written in the last
871        * `config_write_interval' seconds. */
872       flush_old_values (config_write_interval);
873
874       /* unlock the cache while we rotate so we don't block incoming
875        * updates if the fsync() blocks on disk I/O */
876       pthread_mutex_unlock(&cache_lock);
877       journal_rotate();
878       pthread_mutex_lock(&cache_lock);
879     }
880
881     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
882     if (status != 0 && status != ETIMEDOUT)
883     {
884       RRDD_LOG (LOG_ERR, "flush_thread_main: "
885                 "pthread_cond_timedwait returned %i.", status);
886     }
887   }
888
889   if (config_flush_at_shutdown)
890     flush_old_values (-1); /* flush everything */
891
892   state = SHUTDOWN;
893
894   pthread_mutex_unlock(&cache_lock);
895
896   return NULL;
897 } /* void *flush_thread_main */
898
899 static void *queue_thread_main (void UNUSED(*args)) /* {{{ */
900 {
901   pthread_mutex_lock (&cache_lock);
902
903   while (state != SHUTDOWN
904          || (cache_queue_head != NULL && config_flush_at_shutdown))
905   {
906     cache_item_t *ci;
907     char *file;
908     char **values;
909     size_t values_num;
910     int status;
911
912     /* Now, check if there's something to store away. If not, wait until
913      * something comes in. */
914     if (cache_queue_head == NULL)
915     {
916       status = pthread_cond_wait (&queue_cond, &cache_lock);
917       if ((status != 0) && (status != ETIMEDOUT))
918       {
919         RRDD_LOG (LOG_ERR, "queue_thread_main: "
920             "pthread_cond_wait returned %i.", status);
921       }
922     }
923
924     /* Check if a value has arrived. This may be NULL if we timed out or there
925      * was an interrupt such as a signal. */
926     if (cache_queue_head == NULL)
927       continue;
928
929     ci = cache_queue_head;
930
931     /* copy the relevant parts */
932     file = strdup (ci->file);
933     if (file == NULL)
934     {
935       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
936       continue;
937     }
938
939     assert(ci->values != NULL);
940     assert(ci->values_num > 0);
941
942     values = ci->values;
943     values_num = ci->values_num;
944
945     wipe_ci_values(ci, time(NULL));
946     remove_from_queue(ci);
947
948     pthread_mutex_unlock (&cache_lock);
949
950     rrd_clear_error ();
951     status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
952     if (status != 0)
953     {
954       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
955           "rrd_update_r (%s) failed with status %i. (%s)",
956           file, status, rrd_get_error());
957     }
958
959     journal_write("wrote", file);
960
961     /* Search again in the tree.  It's possible someone issued a "FORGET"
962      * while we were writing the update values. */
963     pthread_mutex_lock(&cache_lock);
964     ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
965     if (ci)
966       pthread_cond_broadcast(&ci->flushed);
967     pthread_mutex_unlock(&cache_lock);
968
969     if (status == 0)
970     {
971       pthread_mutex_lock (&stats_lock);
972       stats_updates_written++;
973       stats_data_sets_written += values_num;
974       pthread_mutex_unlock (&stats_lock);
975     }
976
977     rrd_free_ptrs((void ***) &values, &values_num);
978     free(file);
979
980     pthread_mutex_lock (&cache_lock);
981   }
982   pthread_mutex_unlock (&cache_lock);
983
984   return (NULL);
985 } /* }}} void *queue_thread_main */
986
987 static int buffer_get_field (char **buffer_ret, /* {{{ */
988     size_t *buffer_size_ret, char **field_ret)
989 {
990   char *buffer;
991   size_t buffer_pos;
992   size_t buffer_size;
993   char *field;
994   size_t field_size;
995   int status;
996
997   buffer = *buffer_ret;
998   buffer_pos = 0;
999   buffer_size = *buffer_size_ret;
1000   field = *buffer_ret;
1001   field_size = 0;
1002
1003   if (buffer_size <= 0)
1004     return (-1);
1005
1006   /* This is ensured by `handle_request'. */
1007   assert (buffer[buffer_size - 1] == '\0');
1008
1009   status = -1;
1010   while (buffer_pos < buffer_size)
1011   {
1012     /* Check for end-of-field or end-of-buffer */
1013     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1014     {
1015       field[field_size] = 0;
1016       field_size++;
1017       buffer_pos++;
1018       status = 0;
1019       break;
1020     }
1021     /* Handle escaped characters. */
1022     else if (buffer[buffer_pos] == '\\')
1023     {
1024       if (buffer_pos >= (buffer_size - 1))
1025         break;
1026       buffer_pos++;
1027       field[field_size] = buffer[buffer_pos];
1028       field_size++;
1029       buffer_pos++;
1030     }
1031     /* Normal operation */ 
1032     else
1033     {
1034       field[field_size] = buffer[buffer_pos];
1035       field_size++;
1036       buffer_pos++;
1037     }
1038   } /* while (buffer_pos < buffer_size) */
1039
1040   if (status != 0)
1041     return (status);
1042
1043   *buffer_ret = buffer + buffer_pos;
1044   *buffer_size_ret = buffer_size - buffer_pos;
1045   *field_ret = field;
1046
1047   return (0);
1048 } /* }}} int buffer_get_field */
1049
1050 /* if we're restricting writes to the base directory,
1051  * check whether the file falls within the dir
1052  * returns 1 if OK, otherwise 0
1053  */
1054 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1055 {
1056   assert(file != NULL);
1057
1058   if (!config_write_base_only
1059       || JOURNAL_REPLAY(sock)
1060       || config_base_dir == NULL)
1061     return 1;
1062
1063   if (strstr(file, "../") != NULL) goto err;
1064
1065   /* relative paths without "../" are ok */
1066   if (*file != '/') return 1;
1067
1068   /* file must be of the format base + "/" + <1+ char filename> */
1069   if (strlen(file) < _config_base_dir_len + 2) goto err;
1070   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1071   if (*(file + _config_base_dir_len) != '/') goto err;
1072
1073   return 1;
1074
1075 err:
1076   if (sock != NULL && sock->fd >= 0)
1077     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1078
1079   return 0;
1080 } /* }}} static int check_file_access */
1081
1082 /* when using a base dir, convert relative paths to absolute paths.
1083  * if necessary, modifies the "filename" pointer to point
1084  * to the new path created in "tmp".  "tmp" is provided
1085  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1086  *
1087  * this allows us to optimize for the expected case (absolute path)
1088  * with a no-op.
1089  */
1090 static void get_abs_path(char **filename, char *tmp)
1091 {
1092   assert(tmp != NULL);
1093   assert(filename != NULL && *filename != NULL);
1094
1095   if (config_base_dir == NULL || **filename == '/')
1096     return;
1097
1098   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1099   *filename = tmp;
1100 } /* }}} static int get_abs_path */
1101
1102 static int flush_file (const char *filename) /* {{{ */
1103 {
1104   cache_item_t *ci;
1105
1106   pthread_mutex_lock (&cache_lock);
1107
1108   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1109   if (ci == NULL)
1110   {
1111     pthread_mutex_unlock (&cache_lock);
1112     return (ENOENT);
1113   }
1114
1115   if (ci->values_num > 0)
1116   {
1117     /* Enqueue at head */
1118     enqueue_cache_item (ci, HEAD);
1119     pthread_cond_wait(&ci->flushed, &cache_lock);
1120   }
1121
1122   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1123    * may have been purged during our cond_wait() */
1124
1125   pthread_mutex_unlock(&cache_lock);
1126
1127   return (0);
1128 } /* }}} int flush_file */
1129
1130 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1131 {
1132   char *err = "Syntax error.\n";
1133
1134   if (cmd && cmd->syntax)
1135     err = cmd->syntax;
1136
1137   return send_response(sock, RESP_ERR, "Usage: %s", err);
1138 } /* }}} static int syntax_error() */
1139
1140 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1141 {
1142   uint64_t copy_queue_length;
1143   uint64_t copy_updates_received;
1144   uint64_t copy_flush_received;
1145   uint64_t copy_updates_written;
1146   uint64_t copy_data_sets_written;
1147   uint64_t copy_journal_bytes;
1148   uint64_t copy_journal_rotate;
1149
1150   uint64_t tree_nodes_number;
1151   uint64_t tree_depth;
1152
1153   pthread_mutex_lock (&stats_lock);
1154   copy_queue_length       = stats_queue_length;
1155   copy_updates_received   = stats_updates_received;
1156   copy_flush_received     = stats_flush_received;
1157   copy_updates_written    = stats_updates_written;
1158   copy_data_sets_written  = stats_data_sets_written;
1159   copy_journal_bytes      = stats_journal_bytes;
1160   copy_journal_rotate     = stats_journal_rotate;
1161   pthread_mutex_unlock (&stats_lock);
1162
1163   pthread_mutex_lock (&cache_lock);
1164   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1165   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1166   pthread_mutex_unlock (&cache_lock);
1167
1168   add_response_info(sock,
1169                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1170   add_response_info(sock,
1171                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1172   add_response_info(sock,
1173                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1174   add_response_info(sock,
1175                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1176   add_response_info(sock,
1177                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1178   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1179   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1180   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1181   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1182
1183   send_response(sock, RESP_OK, "Statistics follow\n");
1184
1185   return (0);
1186 } /* }}} int handle_request_stats */
1187
1188 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1189 {
1190   char *file, file_tmp[PATH_MAX];
1191   int status;
1192
1193   status = buffer_get_field (&buffer, &buffer_size, &file);
1194   if (status != 0)
1195   {
1196     return syntax_error(sock,cmd);
1197   }
1198   else
1199   {
1200     pthread_mutex_lock(&stats_lock);
1201     stats_flush_received++;
1202     pthread_mutex_unlock(&stats_lock);
1203
1204     get_abs_path(&file, file_tmp);
1205     if (!check_file_access(file, sock)) return 0;
1206
1207     status = flush_file (file);
1208     if (status == 0)
1209       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1210     else if (status == ENOENT)
1211     {
1212       /* no file in our tree; see whether it exists at all */
1213       struct stat statbuf;
1214
1215       memset(&statbuf, 0, sizeof(statbuf));
1216       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1217         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1218       else
1219         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1220     }
1221     else if (status < 0)
1222       return send_response(sock, RESP_ERR, "Internal error.\n");
1223     else
1224       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1225   }
1226
1227   /* NOTREACHED */
1228   assert(1==0);
1229 } /* }}} int handle_request_flush */
1230
1231 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1232 {
1233   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1234
1235   pthread_mutex_lock(&cache_lock);
1236   flush_old_values(-1);
1237   pthread_mutex_unlock(&cache_lock);
1238
1239   return send_response(sock, RESP_OK, "Started flush.\n");
1240 } /* }}} static int handle_request_flushall */
1241
1242 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1243 {
1244   int status;
1245   char *file, file_tmp[PATH_MAX];
1246   cache_item_t *ci;
1247
1248   status = buffer_get_field(&buffer, &buffer_size, &file);
1249   if (status != 0)
1250     return syntax_error(sock,cmd);
1251
1252   get_abs_path(&file, file_tmp);
1253
1254   pthread_mutex_lock(&cache_lock);
1255   ci = g_tree_lookup(cache_tree, file);
1256   if (ci == NULL)
1257   {
1258     pthread_mutex_unlock(&cache_lock);
1259     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1260   }
1261
1262   for (size_t i=0; i < ci->values_num; i++)
1263     add_response_info(sock, "%s\n", ci->values[i]);
1264
1265   pthread_mutex_unlock(&cache_lock);
1266   return send_response(sock, RESP_OK, "updates pending\n");
1267 } /* }}} static int handle_request_pending */
1268
1269 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1270 {
1271   int status;
1272   gboolean found;
1273   char *file, file_tmp[PATH_MAX];
1274
1275   status = buffer_get_field(&buffer, &buffer_size, &file);
1276   if (status != 0)
1277     return syntax_error(sock,cmd);
1278
1279   get_abs_path(&file, file_tmp);
1280   if (!check_file_access(file, sock)) return 0;
1281
1282   pthread_mutex_lock(&cache_lock);
1283   found = g_tree_remove(cache_tree, file);
1284   pthread_mutex_unlock(&cache_lock);
1285
1286   if (found == TRUE)
1287   {
1288     if (!JOURNAL_REPLAY(sock))
1289       journal_write("forget", file);
1290
1291     return send_response(sock, RESP_OK, "Gone!\n");
1292   }
1293   else
1294     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1295
1296   /* NOTREACHED */
1297   assert(1==0);
1298 } /* }}} static int handle_request_forget */
1299
1300 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1301 {
1302   cache_item_t *ci;
1303
1304   pthread_mutex_lock(&cache_lock);
1305
1306   ci = cache_queue_head;
1307   while (ci != NULL)
1308   {
1309     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1310     ci = ci->next;
1311   }
1312
1313   pthread_mutex_unlock(&cache_lock);
1314
1315   return send_response(sock, RESP_OK, "in queue.\n");
1316 } /* }}} int handle_request_queue */
1317
1318 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1319 {
1320   char *file, file_tmp[PATH_MAX];
1321   int values_num = 0;
1322   int status;
1323   char orig_buf[RRD_CMD_MAX];
1324
1325   cache_item_t *ci;
1326
1327   /* save it for the journal later */
1328   if (!JOURNAL_REPLAY(sock))
1329     strncpy(orig_buf, buffer, min(RRD_CMD_MAX,buffer_size));
1330
1331   status = buffer_get_field (&buffer, &buffer_size, &file);
1332   if (status != 0)
1333     return syntax_error(sock,cmd);
1334
1335   pthread_mutex_lock(&stats_lock);
1336   stats_updates_received++;
1337   pthread_mutex_unlock(&stats_lock);
1338
1339   get_abs_path(&file, file_tmp);
1340   if (!check_file_access(file, sock)) return 0;
1341
1342   pthread_mutex_lock (&cache_lock);
1343   ci = g_tree_lookup (cache_tree, file);
1344
1345   if (ci == NULL) /* {{{ */
1346   {
1347     struct stat statbuf;
1348     cache_item_t *tmp;
1349
1350     /* don't hold the lock while we setup; stat(2) might block */
1351     pthread_mutex_unlock(&cache_lock);
1352
1353     memset (&statbuf, 0, sizeof (statbuf));
1354     status = stat (file, &statbuf);
1355     if (status != 0)
1356     {
1357       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1358
1359       status = errno;
1360       if (status == ENOENT)
1361         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1362       else
1363         return send_response(sock, RESP_ERR,
1364                              "stat failed with error %i.\n", status);
1365     }
1366     if (!S_ISREG (statbuf.st_mode))
1367       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1368
1369     if (access(file, R_OK|W_OK) != 0)
1370       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1371                            file, rrd_strerror(errno));
1372
1373     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1374     if (ci == NULL)
1375     {
1376       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1377
1378       return send_response(sock, RESP_ERR, "malloc failed.\n");
1379     }
1380     memset (ci, 0, sizeof (cache_item_t));
1381
1382     ci->file = strdup (file);
1383     if (ci->file == NULL)
1384     {
1385       free (ci);
1386       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1387
1388       return send_response(sock, RESP_ERR, "strdup failed.\n");
1389     }
1390
1391     wipe_ci_values(ci, now);
1392     ci->flags = CI_FLAGS_IN_TREE;
1393     pthread_cond_init(&ci->flushed, NULL);
1394
1395     pthread_mutex_lock(&cache_lock);
1396
1397     /* another UPDATE might have added this entry in the meantime */
1398     tmp = g_tree_lookup (cache_tree, file);
1399     if (tmp == NULL)
1400       g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1401     else
1402     {
1403       free_cache_item (ci);
1404       ci = tmp;
1405     }
1406
1407     /* state may have changed while we were unlocked */
1408     if (state == SHUTDOWN)
1409       return -1;
1410   } /* }}} */
1411   assert (ci != NULL);
1412
1413   /* don't re-write updates in replay mode */
1414   if (!JOURNAL_REPLAY(sock))
1415     journal_write("update", orig_buf);
1416
1417   while (buffer_size > 0)
1418   {
1419     char *value;
1420     double stamp;
1421     char *eostamp;
1422
1423     status = buffer_get_field (&buffer, &buffer_size, &value);
1424     if (status != 0)
1425     {
1426       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1427       break;
1428     }
1429
1430     /* make sure update time is always moving forward. We use double here since
1431        update does support subsecond precision for timestamps ... */
1432     stamp = strtod(value, &eostamp);
1433     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1434     {
1435       pthread_mutex_unlock(&cache_lock);
1436       return send_response(sock, RESP_ERR,
1437                            "Cannot find timestamp in '%s'!\n", value);
1438     }
1439     else if (stamp <= ci->last_update_stamp)
1440     {
1441       pthread_mutex_unlock(&cache_lock);
1442       return send_response(sock, RESP_ERR,
1443                            "illegal attempt to update using time %lf when last"
1444                            " update time is %lf (minimum one second step)\n",
1445                            stamp, ci->last_update_stamp);
1446     }
1447     else
1448       ci->last_update_stamp = stamp;
1449
1450     if (!rrd_add_strdup_chunk(&ci->values, &ci->values_num, value,
1451                               &ci->values_alloc, config_alloc_chunk))
1452     {
1453       RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1454       continue;
1455     }
1456
1457     values_num++;
1458   }
1459
1460   if (((now - ci->last_flush_time) >= config_write_interval)
1461       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1462       && (ci->values_num > 0))
1463   {
1464     enqueue_cache_item (ci, TAIL);
1465   }
1466
1467   pthread_mutex_unlock (&cache_lock);
1468
1469   if (values_num < 1)
1470     return send_response(sock, RESP_ERR, "No values updated.\n");
1471   else
1472     return send_response(sock, RESP_OK,
1473                          "errors, enqueued %i value(s).\n", values_num);
1474
1475   /* NOTREACHED */
1476   assert(1==0);
1477
1478 } /* }}} int handle_request_update */
1479
1480 static int handle_request_fetch (HANDLER_PROTO) /* {{{ */
1481 {
1482   char *file, file_tmp[PATH_MAX];
1483   char *cf;
1484
1485   char *start_str;
1486   char *end_str;
1487   time_t start_tm;
1488   time_t end_tm;
1489
1490   unsigned long step;
1491   unsigned long ds_cnt;
1492   char **ds_namv;
1493   rrd_value_t *data;
1494
1495   int status;
1496   unsigned long i;
1497   time_t t;
1498   rrd_value_t *data_ptr;
1499
1500   file = NULL;
1501   cf = NULL;
1502   start_str = NULL;
1503   end_str = NULL;
1504
1505   /* Read the arguments */
1506   do /* while (0) */
1507   {
1508     status = buffer_get_field (&buffer, &buffer_size, &file);
1509     if (status != 0)
1510       break;
1511
1512     status = buffer_get_field (&buffer, &buffer_size, &cf);
1513     if (status != 0)
1514       break;
1515
1516     status = buffer_get_field (&buffer, &buffer_size, &start_str);
1517     if (status != 0)
1518     {
1519       start_str = NULL;
1520       status = 0;
1521       break;
1522     }
1523
1524     status = buffer_get_field (&buffer, &buffer_size, &end_str);
1525     if (status != 0)
1526     {
1527       end_str = NULL;
1528       status = 0;
1529       break;
1530     }
1531   } while (0);
1532
1533   if (status != 0)
1534     return (syntax_error(sock,cmd));
1535
1536   get_abs_path(&file, file_tmp);
1537   if (!check_file_access(file, sock)) return 0;
1538
1539   status = flush_file (file);
1540   if ((status != 0) && (status != ENOENT))
1541     return (send_response (sock, RESP_ERR,
1542           "flush_file (%s) failed with status %i.\n", file, status));
1543
1544   t = time (NULL); /* "now" */
1545
1546   /* Parse start time */
1547   if (start_str != NULL)
1548   {
1549     char *endptr;
1550     long value;
1551
1552     endptr = NULL;
1553     errno = 0;
1554     value = strtol (start_str, &endptr, /* base = */ 0);
1555     if ((endptr == start_str) || (errno != 0))
1556       return (send_response(sock, RESP_ERR,
1557             "Cannot parse start time `%s': Only simple integers are allowed.\n",
1558             start_str));
1559
1560     if (value > 0)
1561       start_tm = (time_t) value;
1562     else
1563       start_tm = (time_t) (t + value);
1564   }
1565   else
1566   {
1567     start_tm = t - 86400;
1568   }
1569
1570   /* Parse end time */
1571   if (end_str != NULL)
1572   {
1573     char *endptr;
1574     long value;
1575
1576     endptr = NULL;
1577     errno = 0;
1578     value = strtol (end_str, &endptr, /* base = */ 0);
1579     if ((endptr == end_str) || (errno != 0))
1580       return (send_response(sock, RESP_ERR,
1581             "Cannot parse end time `%s': Only simple integers are allowed.\n",
1582             end_str));
1583
1584     if (value > 0)
1585       end_tm = (time_t) value;
1586     else
1587       end_tm = (time_t) (t + value);
1588   }
1589   else
1590   {
1591     end_tm = t;
1592   }
1593
1594   step = -1;
1595   ds_cnt = 0;
1596   ds_namv = NULL;
1597   data = NULL;
1598
1599   status = rrd_fetch_r (file, cf, &start_tm, &end_tm, &step,
1600       &ds_cnt, &ds_namv, &data);
1601   if (status != 0)
1602     return (send_response(sock, RESP_ERR,
1603           "rrd_fetch_r failed: %s\n", rrd_get_error ()));
1604
1605   add_response_info (sock, "FlushVersion: %lu\n", 1);
1606   add_response_info (sock, "Start: %lu\n", (unsigned long) start_tm);
1607   add_response_info (sock, "End: %lu\n", (unsigned long) end_tm);
1608   add_response_info (sock, "Step: %lu\n", step);
1609   add_response_info (sock, "DSCount: %lu\n", ds_cnt);
1610
1611 #define SSTRCAT(buffer,str,buffer_fill) do { \
1612     size_t str_len = strlen (str); \
1613     if ((buffer_fill + str_len) > sizeof (buffer)) \
1614       str_len = sizeof (buffer) - buffer_fill; \
1615     if (str_len > 0) { \
1616       strncpy (buffer + buffer_fill, str, str_len); \
1617       buffer_fill += str_len; \
1618       assert (buffer_fill <= sizeof (buffer)); \
1619       if (buffer_fill == sizeof (buffer)) \
1620         buffer[buffer_fill - 1] = 0; \
1621       else \
1622         buffer[buffer_fill] = 0; \
1623     } \
1624   } while (0)
1625
1626   { /* Add list of DS names */
1627     char linebuf[1024];
1628     size_t linebuf_fill;
1629
1630     memset (linebuf, 0, sizeof (linebuf));
1631     linebuf_fill = 0;
1632     for (i = 0; i < ds_cnt; i++)
1633     {
1634       if (i > 0)
1635         SSTRCAT (linebuf, " ", linebuf_fill);
1636       SSTRCAT (linebuf, ds_namv[i], linebuf_fill);
1637       rrd_freemem(ds_namv[i]);
1638     }
1639     rrd_freemem(ds_namv);
1640     add_response_info (sock, "DSName: %s\n", linebuf);
1641   }
1642
1643   /* Add the actual data */
1644   assert (step > 0);
1645   data_ptr = data;
1646   for (t = start_tm + step; t <= end_tm; t += step)
1647   {
1648     char linebuf[1024];
1649     size_t linebuf_fill;
1650     char tmp[128];
1651
1652     memset (linebuf, 0, sizeof (linebuf));
1653     linebuf_fill = 0;
1654     for (i = 0; i < ds_cnt; i++)
1655     {
1656       snprintf (tmp, sizeof (tmp), " %0.10e", *data_ptr);
1657       tmp[sizeof (tmp) - 1] = 0;
1658       SSTRCAT (linebuf, tmp, linebuf_fill);
1659
1660       data_ptr++;
1661     }
1662
1663     add_response_info (sock, "%10lu:%s\n", (unsigned long) t, linebuf);
1664   } /* for (t) */
1665   rrd_freemem(data);
1666
1667   return (send_response (sock, RESP_OK, "Success\n"));
1668 #undef SSTRCAT
1669 } /* }}} int handle_request_fetch */
1670
1671 /* we came across a "WROTE" entry during journal replay.
1672  * throw away any values that we have accumulated for this file
1673  */
1674 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1675 {
1676   cache_item_t *ci;
1677   const char *file = buffer;
1678
1679   pthread_mutex_lock(&cache_lock);
1680
1681   ci = g_tree_lookup(cache_tree, file);
1682   if (ci == NULL)
1683   {
1684     pthread_mutex_unlock(&cache_lock);
1685     return (0);
1686   }
1687
1688   if (ci->values)
1689     rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1690
1691   wipe_ci_values(ci, now);
1692   remove_from_queue(ci);
1693
1694   pthread_mutex_unlock(&cache_lock);
1695   return (0);
1696 } /* }}} int handle_request_wrote */
1697
1698 static int handle_request_info (HANDLER_PROTO) /* {{{ */
1699 {
1700   char *file, file_tmp[PATH_MAX];
1701   int status;
1702   rrd_info_t *info;
1703
1704   /* obtain filename */
1705   status = buffer_get_field(&buffer, &buffer_size, &file);
1706   if (status != 0)
1707     return syntax_error(sock,cmd);
1708   /* get full pathname */
1709   get_abs_path(&file, file_tmp);
1710   if (!check_file_access(file, sock)) {
1711     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1712   }
1713   /* get data */
1714   rrd_clear_error ();
1715   info = rrd_info_r(file);
1716   if(!info) {
1717     return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1718   }
1719   for (rrd_info_t *data = info; data != NULL; data = data->next) {
1720       switch (data->type) {
1721       case RD_I_VAL:
1722           if (isnan(data->value.u_val))
1723               add_response_info(sock,"%s %d NaN\n",data->key, data->type);
1724           else
1725               add_response_info(sock,"%s %d %0.10e\n", data->key, data->type, data->value.u_val);
1726           break;
1727       case RD_I_CNT:
1728           add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_cnt);
1729           break;
1730       case RD_I_INT:
1731           add_response_info(sock,"%s %d %d\n", data->key, data->type, data->value.u_int);
1732           break;
1733       case RD_I_STR:
1734           add_response_info(sock,"%s %d %s\n", data->key, data->type, data->value.u_str);
1735           break;
1736       case RD_I_BLO:
1737           add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_blo.size);
1738           break;
1739       }
1740   }
1741
1742   rrd_info_free(info);
1743
1744   return send_response(sock, RESP_OK, "Info for %s follows\n",file);
1745 } /* }}} static int handle_request_info  */
1746
1747 static int handle_request_first (HANDLER_PROTO) /* {{{ */
1748 {
1749   char *i, *file, file_tmp[PATH_MAX];
1750   int status;
1751   int idx;
1752   time_t t;
1753
1754   /* obtain filename */
1755   status = buffer_get_field(&buffer, &buffer_size, &file);
1756   if (status != 0)
1757     return syntax_error(sock,cmd);
1758   /* get full pathname */
1759   get_abs_path(&file, file_tmp);
1760   if (!check_file_access(file, sock)) {
1761     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1762   }
1763
1764   status = buffer_get_field(&buffer, &buffer_size, &i);
1765   if (status != 0)
1766     return syntax_error(sock,cmd);
1767   idx = atoi(i);
1768   if(idx<0) { 
1769     return send_response(sock, RESP_ERR, "Invalid index specified (%d)\n", idx);
1770   }
1771
1772   /* get data */
1773   rrd_clear_error ();
1774   t = rrd_first_r(file,idx);
1775   if(t<1) {
1776     return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1777   }
1778   return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1779 } /* }}} static int handle_request_first  */
1780
1781
1782 static int handle_request_last (HANDLER_PROTO) /* {{{ */
1783 {
1784   char *file, file_tmp[PATH_MAX];
1785   int status;
1786   time_t t, from_file, step;
1787   rrd_file_t * rrd_file;
1788   cache_item_t * ci;
1789   rrd_t rrd; 
1790
1791   /* obtain filename */
1792   status = buffer_get_field(&buffer, &buffer_size, &file);
1793   if (status != 0)
1794     return syntax_error(sock,cmd);
1795   /* get full pathname */
1796   get_abs_path(&file, file_tmp);
1797   if (!check_file_access(file, sock)) {
1798     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1799   }
1800   rrd_clear_error();
1801   rrd_init(&rrd);
1802   rrd_file = rrd_open(file,&rrd,RRD_READONLY);
1803   if(!rrd_file) {
1804     return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1805   }
1806   from_file = rrd.live_head->last_up;
1807   step = rrd.stat_head->pdp_step;
1808   rrd_close(rrd_file);
1809   pthread_mutex_lock(&cache_lock);
1810   ci = g_tree_lookup(cache_tree, file);
1811   if (ci)
1812     t = ci->last_update_stamp;
1813   else
1814     t = from_file;
1815   pthread_mutex_unlock(&cache_lock);
1816   t -= t % step;
1817   rrd_free(&rrd);
1818   if(t<1) {
1819     return send_response(sock, RESP_ERR, "Error: rrdcached: Invalid timestamp returned\n");
1820   }
1821   return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1822 } /* }}} static int handle_request_last  */
1823
1824 static int handle_request_create (HANDLER_PROTO) /* {{{ */
1825 {
1826   char *file, file_tmp[PATH_MAX];
1827   char *tok;
1828   int ac = 0;
1829   char *av[128];
1830   int status;
1831   unsigned long step = 300;
1832   time_t last_up = time(NULL)-10;
1833   int no_overwrite = opt_no_overwrite;
1834
1835
1836   /* obtain filename */
1837   status = buffer_get_field(&buffer, &buffer_size, &file);
1838   if (status != 0)
1839     return syntax_error(sock,cmd);
1840   /* get full pathname */
1841   get_abs_path(&file, file_tmp);
1842   if (!check_file_access(file, sock)) {
1843     return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1844   }
1845   RRDD_LOG(LOG_INFO, "rrdcreate request for %s",file);
1846
1847   while ((status = buffer_get_field(&buffer, &buffer_size, &tok)) == 0 && tok) {
1848     if( ! strncmp(tok,"-b",2) ) {
1849       status = buffer_get_field(&buffer, &buffer_size, &tok );
1850       if (status != 0) return syntax_error(sock,cmd);
1851       last_up = (time_t) atol(tok);
1852       continue;
1853     }
1854     if( ! strncmp(tok,"-s",2) ) {
1855       status = buffer_get_field(&buffer, &buffer_size, &tok );
1856       if (status != 0) return syntax_error(sock,cmd);
1857       step = atol(tok);
1858       continue;
1859     }
1860     if( ! strncmp(tok,"-O",2) ) {
1861       no_overwrite = 1;
1862       continue;
1863     }
1864     if( ! strncmp(tok,"DS:",3) ) { av[ac++]=tok; continue; }
1865     if( ! strncmp(tok,"RRA:",4) ) { av[ac++]=tok; continue; }
1866     return syntax_error(sock,cmd);
1867   }
1868   if(step<1) {
1869     return send_response(sock, RESP_ERR, "The step size cannot be less than 1 second.\n");
1870   }
1871   if (last_up < 3600 * 24 * 365 * 10) {
1872     return send_response(sock, RESP_ERR, "The first entry must be after 1980.\n");
1873   }
1874
1875   rrd_clear_error ();
1876   status = rrd_create_r2(file,step,last_up,no_overwrite,ac,(const char **)av);
1877
1878   if(!status) {
1879     return send_response(sock, RESP_OK, "RRD created OK\n");
1880   }
1881   return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1882 } /* }}} static int handle_request_create  */
1883
1884 /* start "BATCH" processing */
1885 static int batch_start (HANDLER_PROTO) /* {{{ */
1886 {
1887   int status;
1888   if (sock->batch_start)
1889     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1890
1891   status = send_response(sock, RESP_OK,
1892                          "Go ahead.  End with dot '.' on its own line.\n");
1893   sock->batch_start = time(NULL);
1894   sock->batch_cmd = 0;
1895
1896   return status;
1897 } /* }}} static int batch_start */
1898
1899 /* finish "BATCH" processing and return results to the client */
1900 static int batch_done (HANDLER_PROTO) /* {{{ */
1901 {
1902   assert(sock->batch_start);
1903   sock->batch_start = 0;
1904   sock->batch_cmd  = 0;
1905   return send_response(sock, RESP_OK, "errors\n");
1906 } /* }}} static int batch_done */
1907
1908 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1909 {
1910   return -1;
1911 } /* }}} static int handle_request_quit */
1912
1913 static command_t list_of_commands[] = { /* {{{ */
1914   {
1915     "UPDATE",
1916     handle_request_update,
1917     CMD_CONTEXT_ANY,
1918     "UPDATE <filename> <values> [<values> ...]\n"
1919     ,
1920     "Adds the given file to the internal cache if it is not yet known and\n"
1921     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1922     "for details.\n"
1923     "\n"
1924     "Each <values> has the following form:\n"
1925     "  <values> = <time>:<value>[:<value>[...]]\n"
1926     "See the rrdupdate(1) manpage for details.\n"
1927   },
1928   {
1929     "WROTE",
1930     handle_request_wrote,
1931     CMD_CONTEXT_JOURNAL,
1932     NULL,
1933     NULL
1934   },
1935   {
1936     "FLUSH",
1937     handle_request_flush,
1938     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1939     "FLUSH <filename>\n"
1940     ,
1941     "Adds the given filename to the head of the update queue and returns\n"
1942     "after it has been dequeued.\n"
1943   },
1944   {
1945     "FLUSHALL",
1946     handle_request_flushall,
1947     CMD_CONTEXT_CLIENT,
1948     "FLUSHALL\n"
1949     ,
1950     "Triggers writing of all pending updates.  Returns immediately.\n"
1951   },
1952   {
1953     "PENDING",
1954     handle_request_pending,
1955     CMD_CONTEXT_CLIENT,
1956     "PENDING <filename>\n"
1957     ,
1958     "Shows any 'pending' updates for a file, in order.\n"
1959     "The updates shown have not yet been written to the underlying RRD file.\n"
1960   },
1961   {
1962     "FORGET",
1963     handle_request_forget,
1964     CMD_CONTEXT_ANY,
1965     "FORGET <filename>\n"
1966     ,
1967     "Removes the file completely from the cache.\n"
1968     "Any pending updates for the file will be lost.\n"
1969   },
1970   {
1971     "QUEUE",
1972     handle_request_queue,
1973     CMD_CONTEXT_CLIENT,
1974     "QUEUE\n"
1975     ,
1976         "Shows all files in the output queue.\n"
1977     "The output is zero or more lines in the following format:\n"
1978     "(where <num_vals> is the number of values to be written)\n"
1979     "\n"
1980     "<num_vals> <filename>\n"
1981   },
1982   {
1983     "STATS",
1984     handle_request_stats,
1985     CMD_CONTEXT_CLIENT,
1986     "STATS\n"
1987     ,
1988     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1989     "a description of the values.\n"
1990   },
1991   {
1992     "HELP",
1993     handle_request_help,
1994     CMD_CONTEXT_CLIENT,
1995     "HELP [<command>]\n",
1996     NULL, /* special! */
1997   },
1998   {
1999     "BATCH",
2000     batch_start,
2001     CMD_CONTEXT_CLIENT,
2002     "BATCH\n"
2003     ,
2004     "The 'BATCH' command permits the client to initiate a bulk load\n"
2005     "   of commands to rrdcached.\n"
2006     "\n"
2007     "Usage:\n"
2008     "\n"
2009     "    client: BATCH\n"
2010     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
2011     "    client: command #1\n"
2012     "    client: command #2\n"
2013     "    client: ... and so on\n"
2014     "    client: .\n"
2015     "    server: 2 errors\n"
2016     "    server: 7 message for command #7\n"
2017     "    server: 9 message for command #9\n"
2018     "\n"
2019     "For more information, consult the rrdcached(1) documentation.\n"
2020   },
2021   {
2022     ".",   /* BATCH terminator */
2023     batch_done,
2024     CMD_CONTEXT_BATCH,
2025     NULL,
2026     NULL
2027   },
2028   {
2029     "FETCH",
2030     handle_request_fetch,
2031     CMD_CONTEXT_CLIENT,
2032     "FETCH <file> <CF> [<start> [<end>]]\n"
2033     ,
2034     "The 'FETCH' can be used by the client to retrieve values from an RRD file.\n"
2035   },
2036   {
2037     "INFO",
2038     handle_request_info,
2039     CMD_CONTEXT_CLIENT,
2040     "INFO <filename>\n",
2041     "The INFO command retrieves information about a specified RRD file.\n"
2042     "This is returned in standard rrdinfo format, a sequence of lines\n"
2043     "with the format <keyname> = <value>\n"
2044     "Note that this is the data as of the last update of the RRD file itself,\n"
2045     "not the last time data was received via rrdcached, so there may be pending\n"
2046     "updates in the queue.  If this bothers you, then first run a FLUSH.\n"
2047   },
2048   {
2049     "FIRST",
2050     handle_request_first,
2051     CMD_CONTEXT_CLIENT,
2052     "FIRST <filename> <rra index>\n",
2053     "The FIRST command retrieves the first data time for a specified RRA in\n"
2054     "an RRD file.\n"
2055   },
2056   {
2057     "LAST",
2058     handle_request_last,
2059     CMD_CONTEXT_CLIENT,
2060     "LAST <filename>\n",
2061     "The LAST command retrieves the last update time for a specified RRD file.\n"
2062     "Note that this is the time of the last update of the RRD file itself, not\n"
2063     "the last time data was received via rrdcached, so there may be pending\n"
2064     "updates in the queue.  If this bothers you, then first run a FLUSH.\n"
2065   },
2066   {
2067     "CREATE",
2068     handle_request_create,
2069     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2070     "CREATE <filename> [-b start] [-s step] [-O] <DS definitions> <RRA definitions>\n",
2071     "The CREATE command will create an RRD file, overwriting any existing file\n"
2072     "unless the -O option is given or rrdcached was started with the -O option.\n"
2073     "The start parameter needs to be in seconds since 1/1/70 (AT-style syntax is\n"
2074     "not acceptable) and the step is in seconds (default is 300).\n"
2075     "The DS and RRA definitions are as for the 'rrdtool create' command.\n"
2076   },
2077   {
2078     "QUIT",
2079     handle_request_quit,
2080     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2081     "QUIT\n"
2082     ,
2083     "Disconnect from rrdcached.\n"
2084   }
2085 }; /* }}} command_t list_of_commands[] */
2086 static size_t list_of_commands_len = sizeof (list_of_commands)
2087   / sizeof (list_of_commands[0]);
2088
2089 static command_t *find_command(char *cmd)
2090 {
2091   size_t i;
2092
2093   for (i = 0; i < list_of_commands_len; i++)
2094     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2095       return (&list_of_commands[i]);
2096   return NULL;
2097 }
2098
2099 /* We currently use the index in the `list_of_commands' array as a bit position
2100  * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
2101  * outside these functions so that switching to a more elegant storage method
2102  * is easily possible. */
2103 static ssize_t find_command_index (const char *cmd) /* {{{ */
2104 {
2105   size_t i;
2106
2107   for (i = 0; i < list_of_commands_len; i++)
2108     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2109       return ((ssize_t) i);
2110   return (-1);
2111 } /* }}} ssize_t find_command_index */
2112
2113 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
2114     const char *cmd)
2115 {
2116   ssize_t i;
2117
2118   if (JOURNAL_REPLAY(sock))
2119     return (1);
2120
2121   if (cmd == NULL)
2122     return (-1);
2123
2124   if ((strcasecmp ("QUIT", cmd) == 0)
2125       || (strcasecmp ("HELP", cmd) == 0))
2126     return (1);
2127   else if (strcmp (".", cmd) == 0)
2128     cmd = "BATCH";
2129
2130   i = find_command_index (cmd);
2131   if (i < 0)
2132     return (-1);
2133   assert (i < 32);
2134
2135   if ((sock->permissions & (1 << i)) != 0)
2136     return (1);
2137   return (0);
2138 } /* }}} int socket_permission_check */
2139
2140 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
2141     const char *cmd)
2142 {
2143   ssize_t i;
2144
2145   i = find_command_index (cmd);
2146   if (i < 0)
2147     return (-1);
2148   assert (i < 32);
2149
2150   sock->permissions |= (1 << i);
2151   return (0);
2152 } /* }}} int socket_permission_add */
2153
2154 static void socket_permission_clear (listen_socket_t *sock) /* {{{ */
2155 {
2156   sock->permissions = 0;
2157 } /* }}} socket_permission_clear */
2158
2159 static void socket_permission_copy (listen_socket_t *dest, /* {{{ */
2160     listen_socket_t *src)
2161 {
2162   dest->permissions = src->permissions;
2163 } /* }}} socket_permission_copy */
2164
2165 static void socket_permission_set_all (listen_socket_t *sock) /* {{{ */
2166 {
2167   size_t i;
2168
2169   sock->permissions = 0;
2170   for (i = 0; i < list_of_commands_len; i++)
2171     sock->permissions |= (1 << i);
2172 } /* }}} void socket_permission_set_all */
2173
2174 /* check whether commands are received in the expected context */
2175 static int command_check_context(listen_socket_t *sock, command_t *cmd)
2176 {
2177   if (JOURNAL_REPLAY(sock))
2178     return (cmd->context & CMD_CONTEXT_JOURNAL);
2179   else if (sock->batch_start)
2180     return (cmd->context & CMD_CONTEXT_BATCH);
2181   else
2182     return (cmd->context & CMD_CONTEXT_CLIENT);
2183
2184   /* NOTREACHED */
2185   assert(1==0);
2186 }
2187
2188 static int handle_request_help (HANDLER_PROTO) /* {{{ */
2189 {
2190   int status;
2191   char *cmd_str;
2192   char *resp_txt;
2193   command_t *help = NULL;
2194
2195   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
2196   if (status == 0)
2197     help = find_command(cmd_str);
2198
2199   if (help && (help->syntax || help->help))
2200   {
2201     char tmp[RRD_CMD_MAX];
2202
2203     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
2204     resp_txt = tmp;
2205
2206     if (help->syntax)
2207       add_response_info(sock, "Usage: %s\n", help->syntax);
2208
2209     if (help->help)
2210       add_response_info(sock, "%s\n", help->help);
2211   }
2212   else
2213   {
2214     size_t i;
2215
2216     resp_txt = "Command overview\n";
2217
2218     for (i = 0; i < list_of_commands_len; i++)
2219     {
2220       if (list_of_commands[i].syntax == NULL)
2221         continue;
2222       add_response_info (sock, "%s", list_of_commands[i].syntax);
2223     }
2224   }
2225
2226   return send_response(sock, RESP_OK, resp_txt);
2227 } /* }}} int handle_request_help */
2228
2229 static int handle_request (DISPATCH_PROTO) /* {{{ */
2230 {
2231   char *buffer_ptr = buffer;
2232   char *cmd_str = NULL;
2233   command_t *cmd = NULL;
2234   int status;
2235
2236   assert (buffer[buffer_size - 1] == '\0');
2237
2238   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
2239   if (status != 0)
2240   {
2241     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
2242     return (-1);
2243   }
2244
2245   if (sock != NULL && sock->batch_start)
2246     sock->batch_cmd++;
2247
2248   cmd = find_command(cmd_str);
2249   if (!cmd)
2250     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
2251
2252   if (!socket_permission_check (sock, cmd->cmd))
2253     return send_response(sock, RESP_ERR, "Permission denied.\n");
2254
2255   if (!command_check_context(sock, cmd))
2256     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
2257
2258   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
2259 } /* }}} int handle_request */
2260
2261 static void journal_set_free (journal_set *js) /* {{{ */
2262 {
2263   if (js == NULL)
2264     return;
2265
2266   rrd_free_ptrs((void ***) &js->files, &js->files_num);
2267
2268   free(js);
2269 } /* }}} journal_set_free */
2270
2271 static void journal_set_remove (journal_set *js) /* {{{ */
2272 {
2273   if (js == NULL)
2274     return;
2275
2276   for (uint i=0; i < js->files_num; i++)
2277   {
2278     RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
2279     unlink(js->files[i]);
2280   }
2281 } /* }}} journal_set_remove */
2282
2283 /* close current journal file handle.
2284  * MUST hold journal_lock before calling */
2285 static void journal_close(void) /* {{{ */
2286 {
2287   if (journal_fh != NULL)
2288   {
2289     if (fclose(journal_fh) != 0)
2290       RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
2291   }
2292
2293   journal_fh = NULL;
2294   journal_size = 0;
2295 } /* }}} journal_close */
2296
2297 /* MUST hold journal_lock before calling */
2298 static void journal_new_file(void) /* {{{ */
2299 {
2300   struct timeval now;
2301   int  new_fd;
2302   char new_file[PATH_MAX + 1];
2303
2304   assert(journal_dir != NULL);
2305   assert(journal_cur != NULL);
2306
2307   journal_close();
2308
2309   gettimeofday(&now, NULL);
2310   /* this format assures that the files sort in strcmp() order */
2311   snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
2312            journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
2313
2314   new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
2315                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
2316   if (new_fd < 0)
2317     goto error;
2318
2319   journal_fh = fdopen(new_fd, "a");
2320   if (journal_fh == NULL)
2321     goto error;
2322
2323   journal_size = ftell(journal_fh);
2324   RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
2325
2326   /* record the file in the journal set */
2327   rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
2328
2329   return;
2330
2331 error:
2332   RRDD_LOG(LOG_CRIT,
2333            "JOURNALING DISABLED: Error while trying to create %s : %s",
2334            new_file, rrd_strerror(errno));
2335   RRDD_LOG(LOG_CRIT,
2336            "JOURNALING DISABLED: All values will be flushed at shutdown");
2337
2338   close(new_fd);
2339   config_flush_at_shutdown = 1;
2340
2341 } /* }}} journal_new_file */
2342
2343 /* MUST NOT hold journal_lock before calling this */
2344 static void journal_rotate(void) /* {{{ */
2345 {
2346   journal_set *old_js = NULL;
2347
2348   if (journal_dir == NULL)
2349     return;
2350
2351   RRDD_LOG(LOG_DEBUG, "rotating journals");
2352
2353   pthread_mutex_lock(&stats_lock);
2354   ++stats_journal_rotate;
2355   pthread_mutex_unlock(&stats_lock);
2356
2357   pthread_mutex_lock(&journal_lock);
2358
2359   journal_close();
2360
2361   /* rotate the journal sets */
2362   old_js = journal_old;
2363   journal_old = journal_cur;
2364   journal_cur = calloc(1, sizeof(journal_set));
2365
2366   if (journal_cur != NULL)
2367     journal_new_file();
2368   else
2369     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2370
2371   pthread_mutex_unlock(&journal_lock);
2372
2373   journal_set_remove(old_js);
2374   journal_set_free  (old_js);
2375
2376 } /* }}} static void journal_rotate */
2377
2378 /* MUST hold journal_lock when calling */
2379 static void journal_done(void) /* {{{ */
2380 {
2381   if (journal_cur == NULL)
2382     return;
2383
2384   journal_close();
2385
2386   if (config_flush_at_shutdown)
2387   {
2388     RRDD_LOG(LOG_INFO, "removing journals");
2389     journal_set_remove(journal_old);
2390     journal_set_remove(journal_cur);
2391   }
2392   else
2393   {
2394     RRDD_LOG(LOG_INFO, "expedited shutdown; "
2395              "journals will be used at next startup");
2396   }
2397
2398   journal_set_free(journal_cur);
2399   journal_set_free(journal_old);
2400   free(journal_dir);
2401
2402 } /* }}} static void journal_done */
2403
2404 static int journal_write(char *cmd, char *args) /* {{{ */
2405 {
2406   int chars;
2407
2408   if (journal_fh == NULL)
2409     return 0;
2410
2411   pthread_mutex_lock(&journal_lock);
2412   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
2413   journal_size += chars;
2414
2415   if (journal_size > JOURNAL_MAX)
2416     journal_new_file();
2417
2418   pthread_mutex_unlock(&journal_lock);
2419
2420   if (chars > 0)
2421   {
2422     pthread_mutex_lock(&stats_lock);
2423     stats_journal_bytes += chars;
2424     pthread_mutex_unlock(&stats_lock);
2425   }
2426
2427   return chars;
2428 } /* }}} static int journal_write */
2429
2430 static int journal_replay (const char *file) /* {{{ */
2431 {
2432   FILE *fh;
2433   int entry_cnt = 0;
2434   int fail_cnt = 0;
2435   uint64_t line = 0;
2436   char entry[RRD_CMD_MAX];
2437   time_t now;
2438
2439   if (file == NULL) return 0;
2440
2441   {
2442     char *reason = "unknown error";
2443     int status = 0;
2444     struct stat statbuf;
2445
2446     memset(&statbuf, 0, sizeof(statbuf));
2447     if (stat(file, &statbuf) != 0)
2448     {
2449       reason = "stat error";
2450       status = errno;
2451     }
2452     else if (!S_ISREG(statbuf.st_mode))
2453     {
2454       reason = "not a regular file";
2455       status = EPERM;
2456     }
2457     if (statbuf.st_uid != daemon_uid)
2458     {
2459       reason = "not owned by daemon user";
2460       status = EACCES;
2461     }
2462     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2463     {
2464       reason = "must not be user/group writable";
2465       status = EACCES;
2466     }
2467
2468     if (status != 0)
2469     {
2470       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2471                file, rrd_strerror(status), reason);
2472       return 0;
2473     }
2474   }
2475
2476   fh = fopen(file, "r");
2477   if (fh == NULL)
2478   {
2479     if (errno != ENOENT)
2480       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2481                file, rrd_strerror(errno));
2482     return 0;
2483   }
2484   else
2485     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2486
2487   now = time(NULL);
2488
2489   while(!feof(fh))
2490   {
2491     size_t entry_len;
2492
2493     ++line;
2494     if (fgets(entry, sizeof(entry), fh) == NULL)
2495       break;
2496     entry_len = strlen(entry);
2497
2498     /* check \n termination in case journal writing crashed mid-line */
2499     if (entry_len == 0)
2500       continue;
2501     else if (entry[entry_len - 1] != '\n')
2502     {
2503       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2504       ++fail_cnt;
2505       continue;
2506     }
2507
2508     entry[entry_len - 1] = '\0';
2509
2510     if (handle_request(NULL, now, entry, entry_len) == 0)
2511       ++entry_cnt;
2512     else
2513       ++fail_cnt;
2514   }
2515
2516   fclose(fh);
2517
2518   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2519            entry_cnt, fail_cnt);
2520
2521   return entry_cnt > 0 ? 1 : 0;
2522 } /* }}} static int journal_replay */
2523
2524 static int journal_sort(const void *v1, const void *v2)
2525 {
2526   char **jn1 = (char **) v1;
2527   char **jn2 = (char **) v2;
2528
2529   return strcmp(*jn1,*jn2);
2530 }
2531
2532 static void journal_init(void) /* {{{ */
2533 {
2534   int had_journal = 0;
2535   DIR *dir;
2536   struct dirent *dent;
2537   char path[PATH_MAX+1];
2538
2539   if (journal_dir == NULL) return;
2540
2541   pthread_mutex_lock(&journal_lock);
2542
2543   journal_cur = calloc(1, sizeof(journal_set));
2544   if (journal_cur == NULL)
2545   {
2546     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2547     return;
2548   }
2549
2550   RRDD_LOG(LOG_INFO, "checking for journal files");
2551
2552   /* Handle old journal files during transition.  This gives them the
2553    * correct sort order.  TODO: remove after first release
2554    */
2555   {
2556     char old_path[PATH_MAX+1];
2557     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2558     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2559     rename(old_path, path);
2560
2561     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE        );
2562     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2563     rename(old_path, path);
2564   }
2565
2566   dir = opendir(journal_dir);
2567   if (!dir) {
2568     RRDD_LOG(LOG_CRIT, "journal_init: opendir(%s) failed\n", journal_dir);
2569     return;
2570   }
2571   while ((dent = readdir(dir)) != NULL)
2572   {
2573     /* looks like a journal file? */
2574     if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2575       continue;
2576
2577     snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2578
2579     if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2580     {
2581       RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2582                dent->d_name);
2583       break;
2584     }
2585   }
2586   closedir(dir);
2587
2588   qsort(journal_cur->files, journal_cur->files_num,
2589         sizeof(journal_cur->files[0]), journal_sort);
2590
2591   for (uint i=0; i < journal_cur->files_num; i++)
2592     had_journal += journal_replay(journal_cur->files[i]);
2593
2594   journal_new_file();
2595
2596   /* it must have been a crash.  start a flush */
2597   if (had_journal && config_flush_at_shutdown)
2598     flush_old_values(-1);
2599
2600   pthread_mutex_unlock(&journal_lock);
2601
2602   RRDD_LOG(LOG_INFO, "journal processing complete");
2603
2604 } /* }}} static void journal_init */
2605
2606 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2607 {
2608   assert(sock != NULL);
2609
2610   free(sock->rbuf);  sock->rbuf = NULL;
2611   free(sock->wbuf);  sock->wbuf = NULL;
2612   free(sock);
2613 } /* }}} void free_listen_socket */
2614
2615 static void close_connection(listen_socket_t *sock) /* {{{ */
2616 {
2617   if (sock->fd >= 0)
2618   {
2619     close(sock->fd);
2620     sock->fd = -1;
2621   }
2622
2623   free_listen_socket(sock);
2624
2625 } /* }}} void close_connection */
2626
2627 static void *connection_thread_main (void *args) /* {{{ */
2628 {
2629   listen_socket_t *sock;
2630   int fd;
2631
2632   sock = (listen_socket_t *) args;
2633   fd = sock->fd;
2634
2635   /* init read buffers */
2636   sock->next_read = sock->next_cmd = 0;
2637   sock->rbuf = malloc(RBUF_SIZE);
2638   if (sock->rbuf == NULL)
2639   {
2640     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2641     close_connection(sock);
2642     return NULL;
2643   }
2644
2645   pthread_mutex_lock (&connection_threads_lock);
2646 #ifdef HAVE_LIBWRAP
2647   /* LIBWRAP does not support multiple threads! By putting this code
2648      inside pthread_mutex_lock we do not have to worry about request_info
2649      getting overwritten by another thread.
2650   */
2651   struct request_info req;
2652   request_init(&req, RQ_DAEMON, "rrdcached\0", RQ_FILE, fd, NULL );
2653   fromhost(&req);
2654   if(!hosts_access(&req)) {
2655     RRDD_LOG(LOG_INFO, "refused connection from %s", eval_client(&req));
2656     pthread_mutex_unlock (&connection_threads_lock);
2657     close_connection(sock);
2658     return NULL;
2659   }
2660 #endif /* HAVE_LIBWRAP */
2661   connection_threads_num++;
2662   pthread_mutex_unlock (&connection_threads_lock);
2663
2664   while (state == RUNNING)
2665   {
2666     char *cmd;
2667     ssize_t cmd_len;
2668     ssize_t rbytes;
2669     time_t now;
2670
2671     struct pollfd pollfd;
2672     int status;
2673
2674     pollfd.fd = fd;
2675     pollfd.events = POLLIN | POLLPRI;
2676     pollfd.revents = 0;
2677
2678     status = poll (&pollfd, 1, /* timeout = */ 500);
2679     if (state != RUNNING)
2680       break;
2681     else if (status == 0) /* timeout */
2682       continue;
2683     else if (status < 0) /* error */
2684     {
2685       status = errno;
2686       if (status != EINTR)
2687         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2688       continue;
2689     }
2690
2691     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2692       break;
2693     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2694     {
2695       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2696           "poll(2) returned something unexpected: %#04hx",
2697           pollfd.revents);
2698       break;
2699     }
2700
2701     rbytes = read(fd, sock->rbuf + sock->next_read,
2702                   RBUF_SIZE - sock->next_read);
2703     if (rbytes < 0)
2704     {
2705       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2706       break;
2707     }
2708     else if (rbytes == 0)
2709       break; /* eof */
2710
2711     sock->next_read += rbytes;
2712
2713     if (sock->batch_start)
2714       now = sock->batch_start;
2715     else
2716       now = time(NULL);
2717
2718     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2719     {
2720       status = handle_request (sock, now, cmd, cmd_len+1);
2721       if (status != 0)
2722         goto out_close;
2723     }
2724   }
2725
2726 out_close:
2727   close_connection(sock);
2728
2729   /* Remove this thread from the connection threads list */
2730   pthread_mutex_lock (&connection_threads_lock);
2731   connection_threads_num--;
2732   if (connection_threads_num <= 0)
2733     pthread_cond_broadcast(&connection_threads_done);
2734   pthread_mutex_unlock (&connection_threads_lock);
2735
2736   return (NULL);
2737 } /* }}} void *connection_thread_main */
2738
2739 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2740 {
2741   int fd;
2742   struct sockaddr_un sa;
2743   listen_socket_t *temp;
2744   int status;
2745   const char *path;
2746   char *path_copy, *dir;
2747
2748   path = sock->addr;
2749   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2750     path += strlen("unix:");
2751
2752   /* dirname may modify its argument */
2753   path_copy = strdup(path);
2754   if (path_copy == NULL)
2755   {
2756     fprintf(stderr, "rrdcached: strdup(): %s\n",
2757         rrd_strerror(errno));
2758     return (-1);
2759   }
2760
2761   dir = dirname(path_copy);
2762   if (rrd_mkdir_p(dir, 0777) != 0)
2763   {
2764     fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2765         dir, rrd_strerror(errno));
2766     return (-1);
2767   }
2768
2769   free(path_copy);
2770
2771   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2772       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2773   if (temp == NULL)
2774   {
2775     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2776     return (-1);
2777   }
2778   listen_fds = temp;
2779   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2780
2781   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2782   if (fd < 0)
2783   {
2784     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2785              rrd_strerror(errno));
2786     return (-1);
2787   }
2788
2789   memset (&sa, 0, sizeof (sa));
2790   sa.sun_family = AF_UNIX;
2791   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2792
2793   /* if we've gotten this far, we own the pid file.  any daemon started
2794    * with the same args must not be alive.  therefore, ensure that we can
2795    * create the socket...
2796    */
2797   unlink(path);
2798
2799   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2800   if (status != 0)
2801   {
2802     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2803              path, rrd_strerror(errno));
2804     close (fd);
2805     return (-1);
2806   }
2807
2808   /* tweak the sockets group ownership */
2809   if (sock->socket_group != (gid_t)-1)
2810   {
2811     if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2812          (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2813     {
2814       fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2815     }
2816   }
2817
2818   if (sock->socket_permissions != (mode_t)-1)
2819   {
2820     if (chmod(path, sock->socket_permissions) != 0)
2821       fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2822           (unsigned int)sock->socket_permissions, strerror(errno));
2823   }
2824
2825   status = listen (fd, /* backlog = */ 10);
2826   if (status != 0)
2827   {
2828     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2829              path, rrd_strerror(errno));
2830     close (fd);
2831     unlink (path);
2832     return (-1);
2833   }
2834
2835   listen_fds[listen_fds_num].fd = fd;
2836   listen_fds[listen_fds_num].family = PF_UNIX;
2837   strncpy(listen_fds[listen_fds_num].addr, path,
2838           sizeof (listen_fds[listen_fds_num].addr) - 1);
2839   listen_fds_num++;
2840
2841   return (0);
2842 } /* }}} int open_listen_socket_unix */
2843
2844 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2845 {
2846   struct addrinfo ai_hints;
2847   struct addrinfo *ai_res;
2848   struct addrinfo *ai_ptr;
2849   char addr_copy[NI_MAXHOST];
2850   char *addr;
2851   char *port;
2852   int status;
2853
2854   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2855   addr_copy[sizeof (addr_copy) - 1] = 0;
2856   addr = addr_copy;
2857
2858   memset (&ai_hints, 0, sizeof (ai_hints));
2859   ai_hints.ai_flags = 0;
2860 #ifdef AI_ADDRCONFIG
2861   ai_hints.ai_flags |= AI_ADDRCONFIG;
2862 #endif
2863   ai_hints.ai_family = AF_UNSPEC;
2864   ai_hints.ai_socktype = SOCK_STREAM;
2865
2866   port = NULL;
2867   if (*addr == '[') /* IPv6+port format */
2868   {
2869     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2870     addr++;
2871
2872     port = strchr (addr, ']');
2873     if (port == NULL)
2874     {
2875       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2876       return (-1);
2877     }
2878     *port = 0;
2879     port++;
2880
2881     if (*port == ':')
2882       port++;
2883     else if (*port == 0)
2884       port = NULL;
2885     else
2886     {
2887       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2888       return (-1);
2889     }
2890   } /* if (*addr == '[') */
2891   else
2892   {
2893     port = rindex(addr, ':');
2894     if (port != NULL)
2895     {
2896       *port = 0;
2897       port++;
2898     }
2899   }
2900   ai_res = NULL;
2901   status = getaddrinfo (addr,
2902                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2903                         &ai_hints, &ai_res);
2904   if (status != 0)
2905   {
2906     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2907              addr, gai_strerror (status));
2908     return (-1);
2909   }
2910
2911   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2912   {
2913     int fd;
2914     listen_socket_t *temp;
2915     int one = 1;
2916
2917     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2918         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2919     if (temp == NULL)
2920     {
2921       fprintf (stderr,
2922                "rrdcached: open_listen_socket_network: realloc failed.\n");
2923       continue;
2924     }
2925     listen_fds = temp;
2926     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2927
2928     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2929     if (fd < 0)
2930     {
2931       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2932                rrd_strerror(errno));
2933       continue;
2934     }
2935
2936     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2937
2938     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2939     if (status != 0)
2940     {
2941       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2942                sock->addr, rrd_strerror(errno));
2943       close (fd);
2944       continue;
2945     }
2946
2947     status = listen (fd, /* backlog = */ 10);
2948     if (status != 0)
2949     {
2950       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2951                sock->addr, rrd_strerror(errno));
2952       close (fd);
2953       freeaddrinfo(ai_res);
2954       return (-1);
2955     }
2956
2957     listen_fds[listen_fds_num].fd = fd;
2958     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2959     listen_fds_num++;
2960   } /* for (ai_ptr) */
2961
2962   freeaddrinfo(ai_res);
2963   return (0);
2964 } /* }}} static int open_listen_socket_network */
2965
2966 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2967 {
2968   assert(sock != NULL);
2969   assert(sock->addr != NULL);
2970
2971   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2972       || sock->addr[0] == '/')
2973     return (open_listen_socket_unix(sock));
2974   else
2975     return (open_listen_socket_network(sock));
2976 } /* }}} int open_listen_socket */
2977
2978 static int close_listen_sockets (void) /* {{{ */
2979 {
2980   size_t i;
2981
2982   for (i = 0; i < listen_fds_num; i++)
2983   {
2984     close (listen_fds[i].fd);
2985
2986     if (listen_fds[i].family == PF_UNIX)
2987       unlink(listen_fds[i].addr);
2988   }
2989
2990   free (listen_fds);
2991   listen_fds = NULL;
2992   listen_fds_num = 0;
2993
2994   return (0);
2995 } /* }}} int close_listen_sockets */
2996
2997 static void *listen_thread_main (void UNUSED(*args)) /* {{{ */
2998 {
2999   struct pollfd *pollfds;
3000   int pollfds_num;
3001   int status;
3002   int i;
3003
3004   if (listen_fds_num < 1)
3005   {
3006     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
3007     return (NULL);
3008   }
3009
3010   pollfds_num = listen_fds_num;
3011   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
3012   if (pollfds == NULL)
3013   {
3014     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
3015     return (NULL);
3016   }
3017   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
3018
3019   RRDD_LOG(LOG_INFO, "listening for connections");
3020
3021   while (state == RUNNING)
3022   {
3023     for (i = 0; i < pollfds_num; i++)
3024     {
3025       pollfds[i].fd = listen_fds[i].fd;
3026       pollfds[i].events = POLLIN | POLLPRI;
3027       pollfds[i].revents = 0;
3028     }
3029
3030     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
3031     if (state != RUNNING)
3032       break;
3033     else if (status == 0) /* timeout */
3034       continue;
3035     else if (status < 0) /* error */
3036     {
3037       status = errno;
3038       if (status != EINTR)
3039       {
3040         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
3041       }
3042       continue;
3043     }
3044
3045     for (i = 0; i < pollfds_num; i++)
3046     {
3047       listen_socket_t *client_sock;
3048       struct sockaddr_storage client_sa;
3049       socklen_t client_sa_size;
3050       pthread_t tid;
3051       pthread_attr_t attr;
3052
3053       if (pollfds[i].revents == 0)
3054         continue;
3055
3056       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
3057       {
3058         RRDD_LOG (LOG_ERR, "listen_thread_main: "
3059             "poll(2) returned something unexpected for listen FD #%i.",
3060             pollfds[i].fd);
3061         continue;
3062       }
3063
3064       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
3065       if (client_sock == NULL)
3066       {
3067         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
3068         continue;
3069       }
3070       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
3071
3072       client_sa_size = sizeof (client_sa);
3073       client_sock->fd = accept (pollfds[i].fd,
3074           (struct sockaddr *) &client_sa, &client_sa_size);
3075       if (client_sock->fd < 0)
3076       {
3077         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
3078         free(client_sock);
3079         continue;
3080       }
3081
3082       pthread_attr_init (&attr);
3083       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
3084
3085       status = pthread_create (&tid, &attr, connection_thread_main,
3086                                client_sock);
3087       if (status != 0)
3088       {
3089         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
3090         close_connection(client_sock);
3091         continue;
3092       }
3093     } /* for (pollfds_num) */
3094   } /* while (state == RUNNING) */
3095
3096   RRDD_LOG(LOG_INFO, "starting shutdown");
3097
3098   close_listen_sockets ();
3099
3100   pthread_mutex_lock (&connection_threads_lock);
3101   while (connection_threads_num > 0)
3102     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
3103   pthread_mutex_unlock (&connection_threads_lock);
3104
3105   free(pollfds);
3106
3107   return (NULL);
3108 } /* }}} void *listen_thread_main */
3109
3110 static int daemonize (void) /* {{{ */
3111 {
3112   int pid_fd;
3113   char *base_dir;
3114
3115   daemon_uid = geteuid();
3116
3117   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
3118   if (pid_fd < 0)
3119     pid_fd = check_pidfile();
3120   if (pid_fd < 0)
3121     return pid_fd;
3122
3123   /* open all the listen sockets */
3124   if (config_listen_address_list_len > 0)
3125   {
3126     for (size_t i = 0; i < config_listen_address_list_len; i++)
3127       open_listen_socket (config_listen_address_list[i]);
3128
3129     rrd_free_ptrs((void ***) &config_listen_address_list,
3130                   &config_listen_address_list_len);
3131   }
3132   else
3133   {
3134     strncpy(default_socket.addr, RRDCACHED_DEFAULT_ADDRESS,
3135         sizeof(default_socket.addr) - 1);
3136     default_socket.addr[sizeof(default_socket.addr) - 1] = '\0';
3137
3138     if (default_socket.permissions == 0)
3139       socket_permission_set_all (&default_socket);
3140
3141     open_listen_socket (&default_socket);
3142   }
3143
3144   if (listen_fds_num < 1)
3145   {
3146     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
3147     goto error;
3148   }
3149
3150   if (!stay_foreground)
3151   {
3152     pid_t child;
3153
3154     child = fork ();
3155     if (child < 0)
3156     {
3157       fprintf (stderr, "daemonize: fork(2) failed.\n");
3158       goto error;
3159     }
3160     else if (child > 0)
3161       exit(0);
3162
3163     /* Become session leader */
3164     setsid ();
3165
3166     /* Open the first three file descriptors to /dev/null */
3167     close (2);
3168     close (1);
3169     close (0);
3170
3171     open ("/dev/null", O_RDWR);
3172     if (dup(0) == -1 || dup(0) == -1){
3173         RRDD_LOG (LOG_ERR, "faild to run dup.\n");
3174     }
3175   } /* if (!stay_foreground) */
3176
3177   /* Change into the /tmp directory. */
3178   base_dir = (config_base_dir != NULL)
3179     ? config_base_dir
3180     : "/tmp";
3181
3182   if (chdir (base_dir) != 0)
3183   {
3184     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
3185     goto error;
3186   }
3187
3188   install_signal_handlers();
3189
3190   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
3191   RRDD_LOG(LOG_INFO, "starting up");
3192
3193   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
3194                                 (GDestroyNotify) free_cache_item);
3195   if (cache_tree == NULL)
3196   {
3197     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
3198     goto error;
3199   }
3200
3201   return write_pidfile (pid_fd);
3202
3203 error:
3204   remove_pidfile();
3205   return -1;
3206 } /* }}} int daemonize */
3207
3208 static int cleanup (void) /* {{{ */
3209 {
3210   pthread_cond_broadcast (&flush_cond);
3211   pthread_join (flush_thread, NULL);
3212
3213   pthread_cond_broadcast (&queue_cond);
3214   for (int i = 0; i < config_queue_threads; i++)
3215     pthread_join (queue_threads[i], NULL);
3216
3217   if (config_flush_at_shutdown)
3218   {
3219     assert(cache_queue_head == NULL);
3220     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
3221   }
3222
3223   free(queue_threads);
3224   free(config_base_dir);
3225
3226   pthread_mutex_lock(&cache_lock);
3227   g_tree_destroy(cache_tree);
3228
3229   pthread_mutex_lock(&journal_lock);
3230   journal_done();
3231
3232   RRDD_LOG(LOG_INFO, "goodbye");
3233   closelog ();
3234
3235   remove_pidfile ();
3236   free(config_pid_file);
3237
3238   return (0);
3239 } /* }}} int cleanup */
3240
3241 static int read_options (int argc, char **argv) /* {{{ */
3242 {
3243   int option;
3244   int status = 0;
3245
3246   socket_permission_clear (&default_socket);
3247
3248   default_socket.socket_group = (gid_t)-1;
3249   default_socket.socket_permissions = (mode_t)-1;
3250
3251   while ((option = getopt(argc, argv, "Ogl:s:m:P:f:w:z:t:Bb:p:Fj:a:h?")) != -1)
3252   {
3253     switch (option)
3254     {
3255       case 'O':
3256         opt_no_overwrite = 1;
3257         break;
3258
3259       case 'g':
3260         stay_foreground=1;
3261         break;
3262
3263       case 'l':
3264       {
3265         listen_socket_t *new;
3266
3267         new = malloc(sizeof(listen_socket_t));
3268         if (new == NULL)
3269         {
3270           fprintf(stderr, "read_options: malloc failed.\n");
3271           return(2);
3272         }
3273         memset(new, 0, sizeof(listen_socket_t));
3274
3275         strncpy(new->addr, optarg, sizeof(new->addr)-1);
3276
3277         /* Add permissions to the socket {{{ */
3278         if (default_socket.permissions != 0)
3279         {
3280           socket_permission_copy (new, &default_socket);
3281         }
3282         else /* if (default_socket.permissions == 0) */
3283         {
3284           /* Add permission for ALL commands to the socket. */
3285           socket_permission_set_all (new);
3286         }
3287         /* }}} Done adding permissions. */
3288
3289         new->socket_group = default_socket.socket_group;
3290         new->socket_permissions = default_socket.socket_permissions;
3291
3292         if (!rrd_add_ptr((void ***)&config_listen_address_list,
3293                          &config_listen_address_list_len, new))
3294         {
3295           fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
3296           return (2);
3297         }
3298       }
3299       break;
3300
3301       /* set socket group permissions */
3302       case 's':
3303       {
3304         gid_t group_gid;
3305         struct group *grp;
3306
3307         group_gid = strtoul(optarg, NULL, 10);
3308         if (errno != EINVAL && group_gid>0)
3309         {
3310           /* we were passed a number */
3311           grp = getgrgid(group_gid);
3312         }
3313         else
3314         {
3315           grp = getgrnam(optarg);
3316         }
3317
3318         if (grp)
3319         {
3320           default_socket.socket_group = grp->gr_gid;
3321         }
3322         else
3323         {
3324           /* no idea what the user wanted... */
3325           fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
3326           return (5);
3327         }
3328       }
3329       break;
3330
3331       /* set socket file permissions */
3332       case 'm':
3333       {
3334         long  tmp;
3335         char *endptr = NULL;
3336
3337         tmp = strtol (optarg, &endptr, 8);
3338         if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
3339             || (tmp > 07777) || (tmp < 0)) {
3340           fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
3341               optarg);
3342           return (5);
3343         }
3344
3345         default_socket.socket_permissions = (mode_t)tmp;
3346       }
3347       break;
3348
3349       case 'P':
3350       {
3351         char *optcopy;
3352         char *saveptr;
3353         char *dummy;
3354         char *ptr;
3355
3356         socket_permission_clear (&default_socket);
3357
3358         optcopy = strdup (optarg);
3359         dummy = optcopy;
3360         saveptr = NULL;
3361         while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
3362         {
3363           dummy = NULL;
3364           status = socket_permission_add (&default_socket, ptr);
3365           if (status != 0)
3366           {
3367             fprintf (stderr, "read_options: Adding permission \"%s\" to "
3368                 "socket failed. Most likely, this permission doesn't "
3369                 "exist. Check your command line.\n", ptr);
3370             status = 4;
3371           }
3372         }
3373
3374         free (optcopy);
3375       }
3376       break;
3377
3378       case 'f':
3379       {
3380         int temp;
3381
3382         temp = atoi (optarg);
3383         if (temp > 0)
3384           config_flush_interval = temp;
3385         else
3386         {
3387           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
3388           status = 3;
3389         }
3390       }
3391       break;
3392
3393       case 'w':
3394       {
3395         int temp;
3396
3397         temp = atoi (optarg);
3398         if (temp > 0)
3399           config_write_interval = temp;
3400         else
3401         {
3402           fprintf (stderr, "Invalid write interval: %s\n", optarg);
3403           status = 2;
3404         }
3405       }
3406       break;
3407
3408       case 'z':
3409       {
3410         int temp;
3411
3412         temp = atoi(optarg);
3413         if (temp > 0)
3414           config_write_jitter = temp;
3415         else
3416         {
3417           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
3418           status = 2;
3419         }
3420
3421         break;
3422       }
3423
3424       case 't':
3425       {
3426         int threads;
3427         threads = atoi(optarg);
3428         if (threads >= 1)
3429           config_queue_threads = threads;
3430         else
3431         {
3432           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
3433           return 1;
3434         }
3435       }
3436       break;
3437
3438       case 'B':
3439         config_write_base_only = 1;
3440         break;
3441
3442       case 'b':
3443       {
3444         size_t len;
3445         char base_realpath[PATH_MAX];
3446
3447         if (config_base_dir != NULL)
3448           free (config_base_dir);
3449         config_base_dir = strdup (optarg);
3450         if (config_base_dir == NULL)
3451         {
3452           fprintf (stderr, "read_options: strdup failed.\n");
3453           return (3);
3454         }
3455
3456         if (rrd_mkdir_p (config_base_dir, 0777) != 0)
3457         {
3458           fprintf (stderr, "Failed to create base directory '%s': %s\n",
3459               config_base_dir, rrd_strerror (errno));
3460           return (3);
3461         }
3462
3463         /* make sure that the base directory is not resolved via
3464          * symbolic links.  this makes some performance-enhancing
3465          * assumptions possible (we don't have to resolve paths
3466          * that start with a "/")
3467          */
3468         if (realpath(config_base_dir, base_realpath) == NULL)
3469         {
3470           fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3471               "%s\n", config_base_dir, rrd_strerror(errno));
3472           return 5;
3473         }
3474
3475         len = strlen (config_base_dir);
3476         while ((len > 0) && (config_base_dir[len - 1] == '/'))
3477         {
3478           config_base_dir[len - 1] = 0;
3479           len--;
3480         }
3481
3482         if (len < 1)
3483         {
3484           fprintf (stderr, "Invalid base directory: %s\n", optarg);
3485           return (4);
3486         }
3487
3488         _config_base_dir_len = len;
3489
3490         len = strlen (base_realpath);
3491         while ((len > 0) && (base_realpath[len - 1] == '/'))
3492         {
3493           base_realpath[len - 1] = '\0';
3494           len--;
3495         }
3496
3497         if (strncmp(config_base_dir,
3498                          base_realpath, sizeof(base_realpath)) != 0)
3499         {
3500           fprintf(stderr,
3501                   "Base directory (-b) resolved via file system links!\n"
3502                   "Please consult rrdcached '-b' documentation!\n"
3503                   "Consider specifying the real directory (%s)\n",
3504                   base_realpath);
3505           return 5;
3506         }
3507       }
3508       break;
3509
3510       case 'p':
3511       {
3512         if (config_pid_file != NULL)
3513           free (config_pid_file);
3514         config_pid_file = strdup (optarg);
3515         if (config_pid_file == NULL)
3516         {
3517           fprintf (stderr, "read_options: strdup failed.\n");
3518           return (3);
3519         }
3520       }
3521       break;
3522
3523       case 'F':
3524         config_flush_at_shutdown = 1;
3525         break;
3526
3527       case 'j':
3528       {
3529         char journal_dir_actual[PATH_MAX];
3530         journal_dir = realpath((const char *)optarg, journal_dir_actual);
3531         if (journal_dir)
3532         {
3533           // if we were able to properly resolve the path, lets have a copy
3534           // for use outside this block.
3535           journal_dir = strdup(journal_dir);           
3536           status = rrd_mkdir_p(journal_dir, 0777);
3537           if (status != 0)
3538           {
3539             fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3540                     journal_dir, rrd_strerror(errno));
3541             return 6;
3542           }
3543           if (access(journal_dir, R_OK|W_OK|X_OK) != 0)
3544           {
3545             fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3546                     errno ? rrd_strerror(errno) : "");
3547             return 6;
3548           }
3549         } else {
3550           fprintf(stderr, "Unable to resolve journal path (%s,%s)\n", optarg,
3551                   errno ? rrd_strerror(errno) : "");
3552           return 6;
3553         }
3554       }
3555       break;
3556
3557       case 'a':
3558       {
3559         int temp = atoi(optarg);
3560         if (temp > 0)
3561           config_alloc_chunk = temp;
3562         else
3563         {
3564           fprintf(stderr, "Invalid allocation size: %s\n", optarg);
3565           return 10;
3566         }
3567       }
3568       break;
3569
3570       case 'h':
3571       case '?':
3572         printf ("RRDCacheD %s\n"
3573             "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3574             "\n"
3575             "Usage: rrdcached [options]\n"
3576             "\n"
3577             "Valid options are:\n"
3578             "  -l <address>  Socket address to listen to.\n"
3579             "                Default: "RRDCACHED_DEFAULT_ADDRESS"\n"
3580             "  -P <perms>    Sets the permissions to assign to all following "
3581                             "sockets\n"
3582             "  -w <seconds>  Interval in which to write data.\n"
3583             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
3584             "  -t <threads>  Number of write threads.\n"
3585             "  -f <seconds>  Interval in which to flush dead data.\n"
3586             "  -p <file>     Location of the PID-file.\n"
3587             "  -b <dir>      Base directory to change to.\n"
3588             "  -B            Restrict file access to paths within -b <dir>\n"
3589             "  -g            Do not fork and run in the foreground.\n"
3590             "  -j <dir>      Directory in which to create the journal files.\n"
3591             "  -F            Always flush all updates at shutdown\n"
3592             "  -s <id|name>  Group owner of all following UNIX sockets\n"
3593             "                (the socket will also have read/write permissions "
3594                             "for that group)\n"
3595             "  -m <mode>     File permissions (octal) of all following UNIX "
3596                             "sockets\n"
3597             "  -a <size>     Memory allocation chunk size. Default is 1.\n"
3598             "  -O            Do not allow CREATE commands to overwrite existing\n"
3599             "                files, even if asked to.\n"
3600             "\n"
3601             "For more information and a detailed description of all options "
3602             "please refer\n"
3603             "to the rrdcached(1) manual page.\n",
3604             VERSION);
3605         if (option == 'h')
3606           status = -1;
3607         else
3608           status = 1;
3609         break;
3610     } /* switch (option) */
3611   } /* while (getopt) */
3612
3613   /* advise the user when values are not sane */
3614   if (config_flush_interval < 2 * config_write_interval)
3615     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3616             " 2x write interval (-w) !\n");
3617   if (config_write_jitter > config_write_interval)
3618     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3619             " write interval (-w) !\n");
3620
3621   if (config_write_base_only && config_base_dir == NULL)
3622     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3623             "  Consult the rrdcached documentation\n");
3624
3625   if (journal_dir == NULL)
3626     config_flush_at_shutdown = 1;
3627
3628   return (status);
3629 } /* }}} int read_options */
3630
3631 int main (int argc, char **argv)
3632 {
3633   int status;
3634
3635   status = read_options (argc, argv);
3636   if (status != 0)
3637   {
3638     if (status < 0)
3639       status = 0;
3640     return (status);
3641   }
3642
3643   status = daemonize ();
3644   if (status != 0)
3645   {
3646     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3647     return (1);
3648   }
3649
3650   journal_init();
3651
3652   /* start the queue threads */
3653   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3654   if (queue_threads == NULL)
3655   {
3656     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3657     cleanup();
3658     return (1);
3659   }
3660   for (int i = 0; i < config_queue_threads; i++)
3661   {
3662     memset (&queue_threads[i], 0, sizeof (*queue_threads));
3663     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3664     if (status != 0)
3665     {
3666       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3667       cleanup();
3668       return (1);
3669     }
3670   }
3671
3672   /* start the flush thread */
3673   memset(&flush_thread, 0, sizeof(flush_thread));
3674   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3675   if (status != 0)
3676   {
3677     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3678     cleanup();
3679     return (1);
3680   }
3681
3682   listen_thread_main (NULL);
3683   cleanup ();
3684
3685   return (0);
3686 } /* int main */
3687
3688 /*
3689  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3690  */