rrd_random() is a wrapper around random() that ensures the PRNG is seeded
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
73
74 #include "rrd.h"
75 #include "rrd_client.h"
76
77 #include <stdlib.h>
78
79 #ifndef WIN32
80 #include <stdint.h>
81 #include <unistd.h>
82 #include <strings.h>
83 #include <inttypes.h>
84 #       include <sys/socket.h>
85
86 #else
87
88 #endif
89 #include <stdio.h>
90 #include <string.h>
91
92 #include <sys/types.h>
93 #include <sys/stat.h>
94 #include <fcntl.h>
95 #include <signal.h>
96 #include <sys/un.h>
97 #include <netdb.h>
98 #include <poll.h>
99 #include <syslog.h>
100 #include <pthread.h>
101 #include <errno.h>
102 #include <assert.h>
103 #include <sys/time.h>
104 #include <time.h>
105
106 #include <glib-2.0/glib.h>
107 /* }}} */
108
109 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
110
111 #ifndef __GNUC__
112 # define __attribute__(x) /**/
113 #endif
114
115 /*
116  * Types
117  */
118 typedef enum
119 {
120   PRIV_LOW,
121   PRIV_HIGH
122 } socket_privilege;
123
124 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
125
126 struct listen_socket_s
127 {
128   int fd;
129   char addr[PATH_MAX + 1];
130   int family;
131   socket_privilege privilege;
132
133   /* state for BATCH processing */
134   time_t batch_start;
135   int batch_cmd;
136
137   /* buffered IO */
138   char *rbuf;
139   off_t next_cmd;
140   off_t next_read;
141
142   char *wbuf;
143   ssize_t wbuf_len;
144 };
145 typedef struct listen_socket_s listen_socket_t;
146
147 struct command;
148 /* note: guard against "unused" warnings in the handlers */
149 #define DISPATCH_PROTO  listen_socket_t *sock   __attribute__((unused)),\
150                         time_t now              __attribute__((unused)),\
151                         char  *buffer           __attribute__((unused)),\
152                         size_t buffer_size      __attribute__((unused))
153
154 #define HANDLER_PROTO   struct command *cmd     __attribute__((unused)),\
155                         DISPATCH_PROTO
156
157 struct command {
158   char   *cmd;
159   int (*handler)(HANDLER_PROTO);
160   socket_privilege min_priv;
161
162   char  context;                /* where we expect to see it */
163 #define CMD_CONTEXT_CLIENT      (1<<0)
164 #define CMD_CONTEXT_BATCH       (1<<1)
165 #define CMD_CONTEXT_JOURNAL     (1<<2)
166 #define CMD_CONTEXT_ANY         (0x7f)
167
168   char *syntax;
169   char *help;
170 };
171
172 struct cache_item_s;
173 typedef struct cache_item_s cache_item_t;
174 struct cache_item_s
175 {
176   char *file;
177   char **values;
178   int values_num;
179   time_t last_flush_time;
180   time_t last_update_stamp;
181 #define CI_FLAGS_IN_TREE  (1<<0)
182 #define CI_FLAGS_IN_QUEUE (1<<1)
183   int flags;
184   pthread_cond_t  flushed;
185   cache_item_t *prev;
186   cache_item_t *next;
187 };
188
189 struct callback_flush_data_s
190 {
191   time_t now;
192   time_t abs_timeout;
193   char **keys;
194   size_t keys_num;
195 };
196 typedef struct callback_flush_data_s callback_flush_data_t;
197
198 enum queue_side_e
199 {
200   HEAD,
201   TAIL
202 };
203 typedef enum queue_side_e queue_side_t;
204
205 /* max length of socket command or response */
206 #define CMD_MAX 4096
207 #define RBUF_SIZE (CMD_MAX*2)
208
209 /*
210  * Variables
211  */
212 static int stay_foreground = 0;
213 static uid_t daemon_uid;
214
215 static listen_socket_t *listen_fds = NULL;
216 static size_t listen_fds_num = 0;
217
218 static int do_shutdown = 0;
219
220 static pthread_t *queue_threads;
221 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
222 static int config_queue_threads = 4;
223
224 static pthread_t flush_thread;
225 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
226
227 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
228 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
229 static int connection_threads_num = 0;
230
231 /* Cache stuff */
232 static GTree          *cache_tree = NULL;
233 static cache_item_t   *cache_queue_head = NULL;
234 static cache_item_t   *cache_queue_tail = NULL;
235 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
236
237 static int config_write_interval = 300;
238 static int config_write_jitter   = 0;
239 static int config_flush_interval = 3600;
240 static int config_flush_at_shutdown = 0;
241 static char *config_pid_file = NULL;
242 static char *config_base_dir = NULL;
243 static size_t _config_base_dir_len = 0;
244 static int config_write_base_only = 0;
245
246 static listen_socket_t **config_listen_address_list = NULL;
247 static int config_listen_address_list_len = 0;
248
249 static uint64_t stats_queue_length = 0;
250 static uint64_t stats_updates_received = 0;
251 static uint64_t stats_flush_received = 0;
252 static uint64_t stats_updates_written = 0;
253 static uint64_t stats_data_sets_written = 0;
254 static uint64_t stats_journal_bytes = 0;
255 static uint64_t stats_journal_rotate = 0;
256 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
257
258 /* Journaled updates */
259 static char *journal_cur = NULL;
260 static char *journal_old = NULL;
261 static FILE *journal_fh = NULL;
262 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
263 static int journal_write(char *cmd, char *args);
264 static void journal_done(void);
265 static void journal_rotate(void);
266
267 /* prototypes for forward refernces */
268 static int handle_request_help (HANDLER_PROTO);
269
270 /* 
271  * Functions
272  */
273 static void sig_common (const char *sig) /* {{{ */
274 {
275   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
276   do_shutdown++;
277   pthread_cond_broadcast(&flush_cond);
278   pthread_cond_broadcast(&queue_cond);
279 } /* }}} void sig_common */
280
281 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
282 {
283   sig_common("INT");
284 } /* }}} void sig_int_handler */
285
286 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
287 {
288   sig_common("TERM");
289 } /* }}} void sig_term_handler */
290
291 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
292 {
293   config_flush_at_shutdown = 1;
294   sig_common("USR1");
295 } /* }}} void sig_usr1_handler */
296
297 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
298 {
299   config_flush_at_shutdown = 0;
300   sig_common("USR2");
301 } /* }}} void sig_usr2_handler */
302
303 static void install_signal_handlers(void) /* {{{ */
304 {
305   /* These structures are static, because `sigaction' behaves weird if the are
306    * overwritten.. */
307   static struct sigaction sa_int;
308   static struct sigaction sa_term;
309   static struct sigaction sa_pipe;
310   static struct sigaction sa_usr1;
311   static struct sigaction sa_usr2;
312
313   /* Install signal handlers */
314   memset (&sa_int, 0, sizeof (sa_int));
315   sa_int.sa_handler = sig_int_handler;
316   sigaction (SIGINT, &sa_int, NULL);
317
318   memset (&sa_term, 0, sizeof (sa_term));
319   sa_term.sa_handler = sig_term_handler;
320   sigaction (SIGTERM, &sa_term, NULL);
321
322   memset (&sa_pipe, 0, sizeof (sa_pipe));
323   sa_pipe.sa_handler = SIG_IGN;
324   sigaction (SIGPIPE, &sa_pipe, NULL);
325
326   memset (&sa_pipe, 0, sizeof (sa_usr1));
327   sa_usr1.sa_handler = sig_usr1_handler;
328   sigaction (SIGUSR1, &sa_usr1, NULL);
329
330   memset (&sa_usr2, 0, sizeof (sa_usr2));
331   sa_usr2.sa_handler = sig_usr2_handler;
332   sigaction (SIGUSR2, &sa_usr2, NULL);
333
334 } /* }}} void install_signal_handlers */
335
336 static int open_pidfile(char *action, int oflag) /* {{{ */
337 {
338   int fd;
339   char *file;
340
341   file = (config_pid_file != NULL)
342     ? config_pid_file
343     : LOCALSTATEDIR "/run/rrdcached.pid";
344
345   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
346   if (fd < 0)
347     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
348             action, file, rrd_strerror(errno));
349
350   return(fd);
351 } /* }}} static int open_pidfile */
352
353 /* check existing pid file to see whether a daemon is running */
354 static int check_pidfile(void)
355 {
356   int pid_fd;
357   pid_t pid;
358   char pid_str[16];
359
360   pid_fd = open_pidfile("open", O_RDWR);
361   if (pid_fd < 0)
362     return pid_fd;
363
364   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
365     return -1;
366
367   pid = atoi(pid_str);
368   if (pid <= 0)
369     return -1;
370
371   /* another running process that we can signal COULD be
372    * a competing rrdcached */
373   if (pid != getpid() && kill(pid, 0) == 0)
374   {
375     fprintf(stderr,
376             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
377     close(pid_fd);
378     return -1;
379   }
380
381   lseek(pid_fd, 0, SEEK_SET);
382   ftruncate(pid_fd, 0);
383
384   fprintf(stderr,
385           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
386           "rrdcached: starting normally.\n", pid);
387
388   return pid_fd;
389 } /* }}} static int check_pidfile */
390
391 static int write_pidfile (int fd) /* {{{ */
392 {
393   pid_t pid;
394   FILE *fh;
395
396   pid = getpid ();
397
398   fh = fdopen (fd, "w");
399   if (fh == NULL)
400   {
401     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
402     close(fd);
403     return (-1);
404   }
405
406   fprintf (fh, "%i\n", (int) pid);
407   fclose (fh);
408
409   return (0);
410 } /* }}} int write_pidfile */
411
412 static int remove_pidfile (void) /* {{{ */
413 {
414   char *file;
415   int status;
416
417   file = (config_pid_file != NULL)
418     ? config_pid_file
419     : LOCALSTATEDIR "/run/rrdcached.pid";
420
421   status = unlink (file);
422   if (status == 0)
423     return (0);
424   return (errno);
425 } /* }}} int remove_pidfile */
426
427 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
428 {
429   char *eol;
430
431   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
432                sock->next_read - sock->next_cmd);
433
434   if (eol == NULL)
435   {
436     /* no commands left, move remainder back to front of rbuf */
437     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
438             sock->next_read - sock->next_cmd);
439     sock->next_read -= sock->next_cmd;
440     sock->next_cmd = 0;
441     *len = 0;
442     return NULL;
443   }
444   else
445   {
446     char *cmd = sock->rbuf + sock->next_cmd;
447     *eol = '\0';
448
449     sock->next_cmd = eol - sock->rbuf + 1;
450
451     if (eol > sock->rbuf && *(eol-1) == '\r')
452       *(--eol) = '\0'; /* handle "\r\n" EOL */
453
454     *len = eol - cmd;
455
456     return cmd;
457   }
458
459   /* NOTREACHED */
460   assert(1==0);
461 }
462
463 /* add the characters directly to the write buffer */
464 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
465 {
466   char *new_buf;
467
468   assert(sock != NULL);
469
470   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
471   if (new_buf == NULL)
472   {
473     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
474     return -1;
475   }
476
477   strncpy(new_buf + sock->wbuf_len, str, len + 1);
478
479   sock->wbuf = new_buf;
480   sock->wbuf_len += len;
481
482   return 0;
483 } /* }}} static int add_to_wbuf */
484
485 /* add the text to the "extra" info that's sent after the status line */
486 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
487 {
488   va_list argp;
489   char buffer[CMD_MAX];
490   int len;
491
492   if (sock == NULL) return 0; /* journal replay mode */
493   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
494
495   va_start(argp, fmt);
496 #ifdef HAVE_VSNPRINTF
497   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
498 #else
499   len = vsprintf(buffer, fmt, argp);
500 #endif
501   va_end(argp);
502   if (len < 0)
503   {
504     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
505     return -1;
506   }
507
508   return add_to_wbuf(sock, buffer, len);
509 } /* }}} static int add_response_info */
510
511 static int count_lines(char *str) /* {{{ */
512 {
513   int lines = 0;
514
515   if (str != NULL)
516   {
517     while ((str = strchr(str, '\n')) != NULL)
518     {
519       ++lines;
520       ++str;
521     }
522   }
523
524   return lines;
525 } /* }}} static int count_lines */
526
527 /* send the response back to the user.
528  * returns 0 on success, -1 on error
529  * write buffer is always zeroed after this call */
530 static int send_response (listen_socket_t *sock, response_code rc,
531                           char *fmt, ...) /* {{{ */
532 {
533   va_list argp;
534   char buffer[CMD_MAX];
535   int lines;
536   ssize_t wrote;
537   int rclen, len;
538
539   if (sock == NULL) return rc;  /* journal replay mode */
540
541   if (sock->batch_start)
542   {
543     if (rc == RESP_OK)
544       return rc; /* no response on success during BATCH */
545     lines = sock->batch_cmd;
546   }
547   else if (rc == RESP_OK)
548     lines = count_lines(sock->wbuf);
549   else
550     lines = -1;
551
552   rclen = sprintf(buffer, "%d ", lines);
553   va_start(argp, fmt);
554 #ifdef HAVE_VSNPRINTF
555   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
556 #else
557   len = vsprintf(buffer+rclen, fmt, argp);
558 #endif
559   va_end(argp);
560   if (len < 0)
561     return -1;
562
563   len += rclen;
564
565   /* append the result to the wbuf, don't write to the user */
566   if (sock->batch_start)
567     return add_to_wbuf(sock, buffer, len);
568
569   /* first write must be complete */
570   if (len != write(sock->fd, buffer, len))
571   {
572     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
573     return -1;
574   }
575
576   if (sock->wbuf != NULL && rc == RESP_OK)
577   {
578     wrote = 0;
579     while (wrote < sock->wbuf_len)
580     {
581       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
582       if (wb <= 0)
583       {
584         RRDD_LOG(LOG_INFO, "send_response: could not write results");
585         return -1;
586       }
587       wrote += wb;
588     }
589   }
590
591   free(sock->wbuf); sock->wbuf = NULL;
592   sock->wbuf_len = 0;
593
594   return 0;
595 } /* }}} */
596
597 static void wipe_ci_values(cache_item_t *ci, time_t when)
598 {
599   ci->values = NULL;
600   ci->values_num = 0;
601
602   ci->last_flush_time = when;
603   if (config_write_jitter > 0)
604     ci->last_flush_time += (rrd_random() % config_write_jitter);
605 }
606
607 /* remove_from_queue
608  * remove a "cache_item_t" item from the queue.
609  * must hold 'cache_lock' when calling this
610  */
611 static void remove_from_queue(cache_item_t *ci) /* {{{ */
612 {
613   if (ci == NULL) return;
614   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
615
616   if (ci->prev == NULL)
617     cache_queue_head = ci->next; /* reset head */
618   else
619     ci->prev->next = ci->next;
620
621   if (ci->next == NULL)
622     cache_queue_tail = ci->prev; /* reset the tail */
623   else
624     ci->next->prev = ci->prev;
625
626   ci->next = ci->prev = NULL;
627   ci->flags &= ~CI_FLAGS_IN_QUEUE;
628
629   pthread_mutex_lock (&stats_lock);
630   assert (stats_queue_length > 0);
631   stats_queue_length--;
632   pthread_mutex_unlock (&stats_lock);
633
634 } /* }}} static void remove_from_queue */
635
636 /* free the resources associated with the cache_item_t
637  * must hold cache_lock when calling this function
638  */
639 static void *free_cache_item(cache_item_t *ci) /* {{{ */
640 {
641   if (ci == NULL) return NULL;
642
643   remove_from_queue(ci);
644
645   for (int i=0; i < ci->values_num; i++)
646     free(ci->values[i]);
647
648   free (ci->values);
649   free (ci->file);
650
651   /* in case anyone is waiting */
652   pthread_cond_broadcast(&ci->flushed);
653
654   free (ci);
655
656   return NULL;
657 } /* }}} static void *free_cache_item */
658
659 /*
660  * enqueue_cache_item:
661  * `cache_lock' must be acquired before calling this function!
662  */
663 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
664     queue_side_t side)
665 {
666   if (ci == NULL)
667     return (-1);
668
669   if (ci->values_num == 0)
670     return (0);
671
672   if (side == HEAD)
673   {
674     if (cache_queue_head == ci)
675       return 0;
676
677     /* remove if further down in queue */
678     remove_from_queue(ci);
679
680     ci->prev = NULL;
681     ci->next = cache_queue_head;
682     if (ci->next != NULL)
683       ci->next->prev = ci;
684     cache_queue_head = ci;
685
686     if (cache_queue_tail == NULL)
687       cache_queue_tail = cache_queue_head;
688   }
689   else /* (side == TAIL) */
690   {
691     /* We don't move values back in the list.. */
692     if (ci->flags & CI_FLAGS_IN_QUEUE)
693       return (0);
694
695     assert (ci->next == NULL);
696     assert (ci->prev == NULL);
697
698     ci->prev = cache_queue_tail;
699
700     if (cache_queue_tail == NULL)
701       cache_queue_head = ci;
702     else
703       cache_queue_tail->next = ci;
704
705     cache_queue_tail = ci;
706   }
707
708   ci->flags |= CI_FLAGS_IN_QUEUE;
709
710   pthread_cond_signal(&queue_cond);
711   pthread_mutex_lock (&stats_lock);
712   stats_queue_length++;
713   pthread_mutex_unlock (&stats_lock);
714
715   return (0);
716 } /* }}} int enqueue_cache_item */
717
718 /*
719  * tree_callback_flush:
720  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
721  * while this is in progress.
722  */
723 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
724     gpointer data)
725 {
726   cache_item_t *ci;
727   callback_flush_data_t *cfd;
728
729   ci = (cache_item_t *) value;
730   cfd = (callback_flush_data_t *) data;
731
732   if (ci->flags & CI_FLAGS_IN_QUEUE)
733     return FALSE;
734
735   if ((ci->last_flush_time <= cfd->abs_timeout)
736       && (ci->values_num > 0))
737   {
738     enqueue_cache_item (ci, TAIL);
739   }
740   else if ((do_shutdown != 0)
741       && (ci->values_num > 0))
742   {
743     enqueue_cache_item (ci, TAIL);
744   }
745   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
746       && (ci->values_num <= 0))
747   {
748     char **temp;
749
750     temp = (char **) rrd_realloc (cfd->keys,
751         sizeof (char *) * (cfd->keys_num + 1));
752     if (temp == NULL)
753     {
754       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
755       return (FALSE);
756     }
757     cfd->keys = temp;
758     /* Make really sure this points to the _same_ place */
759     assert ((char *) key == ci->file);
760     cfd->keys[cfd->keys_num] = (char *) key;
761     cfd->keys_num++;
762   }
763
764   return (FALSE);
765 } /* }}} gboolean tree_callback_flush */
766
767 static int flush_old_values (int max_age)
768 {
769   callback_flush_data_t cfd;
770   size_t k;
771
772   memset (&cfd, 0, sizeof (cfd));
773   /* Pass the current time as user data so that we don't need to call
774    * `time' for each node. */
775   cfd.now = time (NULL);
776   cfd.keys = NULL;
777   cfd.keys_num = 0;
778
779   if (max_age > 0)
780     cfd.abs_timeout = cfd.now - max_age;
781   else
782     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
783
784   /* `tree_callback_flush' will return the keys of all values that haven't
785    * been touched in the last `config_flush_interval' seconds in `cfd'.
786    * The char*'s in this array point to the same memory as ci->file, so we
787    * don't need to free them separately. */
788   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
789
790   for (k = 0; k < cfd.keys_num; k++)
791   {
792     /* should never fail, since we have held the cache_lock
793      * the entire time */
794     assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
795   }
796
797   if (cfd.keys != NULL)
798   {
799     free (cfd.keys);
800     cfd.keys = NULL;
801   }
802
803   return (0);
804 } /* int flush_old_values */
805
806 static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
807 {
808   struct timeval now;
809   struct timespec next_flush;
810   int status;
811
812   gettimeofday (&now, NULL);
813   next_flush.tv_sec = now.tv_sec + config_flush_interval;
814   next_flush.tv_nsec = 1000 * now.tv_usec;
815
816   pthread_mutex_lock(&cache_lock);
817
818   while (!do_shutdown)
819   {
820     gettimeofday (&now, NULL);
821     if ((now.tv_sec > next_flush.tv_sec)
822         || ((now.tv_sec == next_flush.tv_sec)
823           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
824     {
825       /* Flush all values that haven't been written in the last
826        * `config_write_interval' seconds. */
827       flush_old_values (config_write_interval);
828
829       /* Determine the time of the next cache flush. */
830       next_flush.tv_sec =
831         now.tv_sec + next_flush.tv_sec % config_flush_interval;
832
833       /* unlock the cache while we rotate so we don't block incoming
834        * updates if the fsync() blocks on disk I/O */
835       pthread_mutex_unlock(&cache_lock);
836       journal_rotate();
837       pthread_mutex_lock(&cache_lock);
838     }
839
840     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
841     if (status != 0 && status != ETIMEDOUT)
842     {
843       RRDD_LOG (LOG_ERR, "flush_thread_main: "
844                 "pthread_cond_timedwait returned %i.", status);
845     }
846   }
847
848   if (config_flush_at_shutdown)
849     flush_old_values (-1); /* flush everything */
850
851   pthread_mutex_unlock(&cache_lock);
852
853   return NULL;
854 } /* void *flush_thread_main */
855
856 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
857 {
858   pthread_mutex_lock (&cache_lock);
859
860   while (!do_shutdown
861          || (cache_queue_head != NULL && config_flush_at_shutdown))
862   {
863     cache_item_t *ci;
864     char *file;
865     char **values;
866     int values_num;
867     int status;
868     int i;
869
870     /* Now, check if there's something to store away. If not, wait until
871      * something comes in.  if we are shutting down, do not wait around.  */
872     if (cache_queue_head == NULL && !do_shutdown)
873     {
874       status = pthread_cond_wait (&queue_cond, &cache_lock);
875       if ((status != 0) && (status != ETIMEDOUT))
876       {
877         RRDD_LOG (LOG_ERR, "queue_thread_main: "
878             "pthread_cond_wait returned %i.", status);
879       }
880     }
881
882     /* Check if a value has arrived. This may be NULL if we timed out or there
883      * was an interrupt such as a signal. */
884     if (cache_queue_head == NULL)
885       continue;
886
887     ci = cache_queue_head;
888
889     /* copy the relevant parts */
890     file = strdup (ci->file);
891     if (file == NULL)
892     {
893       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
894       continue;
895     }
896
897     assert(ci->values != NULL);
898     assert(ci->values_num > 0);
899
900     values = ci->values;
901     values_num = ci->values_num;
902
903     wipe_ci_values(ci, time(NULL));
904     remove_from_queue(ci);
905
906     pthread_mutex_unlock (&cache_lock);
907
908     rrd_clear_error ();
909     status = rrd_update_r (file, NULL, values_num, (void *) values);
910     if (status != 0)
911     {
912       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
913           "rrd_update_r (%s) failed with status %i. (%s)",
914           file, status, rrd_get_error());
915     }
916
917     journal_write("wrote", file);
918     pthread_cond_broadcast(&ci->flushed);
919
920     for (i = 0; i < values_num; i++)
921       free (values[i]);
922
923     free(values);
924     free(file);
925
926     if (status == 0)
927     {
928       pthread_mutex_lock (&stats_lock);
929       stats_updates_written++;
930       stats_data_sets_written += values_num;
931       pthread_mutex_unlock (&stats_lock);
932     }
933
934     pthread_mutex_lock (&cache_lock);
935   }
936   pthread_mutex_unlock (&cache_lock);
937
938   return (NULL);
939 } /* }}} void *queue_thread_main */
940
941 static int buffer_get_field (char **buffer_ret, /* {{{ */
942     size_t *buffer_size_ret, char **field_ret)
943 {
944   char *buffer;
945   size_t buffer_pos;
946   size_t buffer_size;
947   char *field;
948   size_t field_size;
949   int status;
950
951   buffer = *buffer_ret;
952   buffer_pos = 0;
953   buffer_size = *buffer_size_ret;
954   field = *buffer_ret;
955   field_size = 0;
956
957   if (buffer_size <= 0)
958     return (-1);
959
960   /* This is ensured by `handle_request'. */
961   assert (buffer[buffer_size - 1] == '\0');
962
963   status = -1;
964   while (buffer_pos < buffer_size)
965   {
966     /* Check for end-of-field or end-of-buffer */
967     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
968     {
969       field[field_size] = 0;
970       field_size++;
971       buffer_pos++;
972       status = 0;
973       break;
974     }
975     /* Handle escaped characters. */
976     else if (buffer[buffer_pos] == '\\')
977     {
978       if (buffer_pos >= (buffer_size - 1))
979         break;
980       buffer_pos++;
981       field[field_size] = buffer[buffer_pos];
982       field_size++;
983       buffer_pos++;
984     }
985     /* Normal operation */ 
986     else
987     {
988       field[field_size] = buffer[buffer_pos];
989       field_size++;
990       buffer_pos++;
991     }
992   } /* while (buffer_pos < buffer_size) */
993
994   if (status != 0)
995     return (status);
996
997   *buffer_ret = buffer + buffer_pos;
998   *buffer_size_ret = buffer_size - buffer_pos;
999   *field_ret = field;
1000
1001   return (0);
1002 } /* }}} int buffer_get_field */
1003
1004 /* if we're restricting writes to the base directory,
1005  * check whether the file falls within the dir
1006  * returns 1 if OK, otherwise 0
1007  */
1008 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1009 {
1010   assert(file != NULL);
1011
1012   if (!config_write_base_only
1013       || sock == NULL /* journal replay */
1014       || config_base_dir == NULL)
1015     return 1;
1016
1017   if (strstr(file, "../") != NULL) goto err;
1018
1019   /* relative paths without "../" are ok */
1020   if (*file != '/') return 1;
1021
1022   /* file must be of the format base + "/" + <1+ char filename> */
1023   if (strlen(file) < _config_base_dir_len + 2) goto err;
1024   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1025   if (*(file + _config_base_dir_len) != '/') goto err;
1026
1027   return 1;
1028
1029 err:
1030   if (sock != NULL && sock->fd >= 0)
1031     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1032
1033   return 0;
1034 } /* }}} static int check_file_access */
1035
1036 /* when using a base dir, convert relative paths to absolute paths.
1037  * if necessary, modifies the "filename" pointer to point
1038  * to the new path created in "tmp".  "tmp" is provided
1039  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1040  *
1041  * this allows us to optimize for the expected case (absolute path)
1042  * with a no-op.
1043  */
1044 static void get_abs_path(char **filename, char *tmp)
1045 {
1046   assert(tmp != NULL);
1047   assert(filename != NULL && *filename != NULL);
1048
1049   if (config_base_dir == NULL || **filename == '/')
1050     return;
1051
1052   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1053   *filename = tmp;
1054 } /* }}} static int get_abs_path */
1055
1056 /* returns 1 if we have the required privilege level,
1057  * otherwise issue an error to the user on sock */
1058 static int has_privilege (listen_socket_t *sock, /* {{{ */
1059                           socket_privilege priv)
1060 {
1061   if (sock == NULL) /* journal replay */
1062     return 1;
1063
1064   if (sock->privilege >= priv)
1065     return 1;
1066
1067   return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1068 } /* }}} static int has_privilege */
1069
1070 static int flush_file (const char *filename) /* {{{ */
1071 {
1072   cache_item_t *ci;
1073
1074   pthread_mutex_lock (&cache_lock);
1075
1076   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1077   if (ci == NULL)
1078   {
1079     pthread_mutex_unlock (&cache_lock);
1080     return (ENOENT);
1081   }
1082
1083   if (ci->values_num > 0)
1084   {
1085     /* Enqueue at head */
1086     enqueue_cache_item (ci, HEAD);
1087     pthread_cond_wait(&ci->flushed, &cache_lock);
1088   }
1089
1090   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1091    * may have been purged during our cond_wait() */
1092
1093   pthread_mutex_unlock(&cache_lock);
1094
1095   return (0);
1096 } /* }}} int flush_file */
1097
1098 static int syntax_error(listen_socket_t *sock, struct command *cmd) /* {{{ */
1099 {
1100   char *err = "Syntax error.\n";
1101
1102   if (cmd && cmd->syntax)
1103     err = cmd->syntax;
1104
1105   return send_response(sock, RESP_ERR, "Usage: %s", err);
1106 } /* }}} static int syntax_error() */
1107
1108 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1109 {
1110   uint64_t copy_queue_length;
1111   uint64_t copy_updates_received;
1112   uint64_t copy_flush_received;
1113   uint64_t copy_updates_written;
1114   uint64_t copy_data_sets_written;
1115   uint64_t copy_journal_bytes;
1116   uint64_t copy_journal_rotate;
1117
1118   uint64_t tree_nodes_number;
1119   uint64_t tree_depth;
1120
1121   pthread_mutex_lock (&stats_lock);
1122   copy_queue_length       = stats_queue_length;
1123   copy_updates_received   = stats_updates_received;
1124   copy_flush_received     = stats_flush_received;
1125   copy_updates_written    = stats_updates_written;
1126   copy_data_sets_written  = stats_data_sets_written;
1127   copy_journal_bytes      = stats_journal_bytes;
1128   copy_journal_rotate     = stats_journal_rotate;
1129   pthread_mutex_unlock (&stats_lock);
1130
1131   pthread_mutex_lock (&cache_lock);
1132   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1133   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1134   pthread_mutex_unlock (&cache_lock);
1135
1136   add_response_info(sock,
1137                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1138   add_response_info(sock,
1139                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1140   add_response_info(sock,
1141                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1142   add_response_info(sock,
1143                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1144   add_response_info(sock,
1145                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1146   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1147   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1148   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1149   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1150
1151   send_response(sock, RESP_OK, "Statistics follow\n");
1152
1153   return (0);
1154 } /* }}} int handle_request_stats */
1155
1156 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1157 {
1158   char *file, file_tmp[PATH_MAX];
1159   int status;
1160
1161   status = buffer_get_field (&buffer, &buffer_size, &file);
1162   if (status != 0)
1163   {
1164     return syntax_error(sock,cmd);
1165   }
1166   else
1167   {
1168     pthread_mutex_lock(&stats_lock);
1169     stats_flush_received++;
1170     pthread_mutex_unlock(&stats_lock);
1171
1172     get_abs_path(&file, file_tmp);
1173     if (!check_file_access(file, sock)) return 0;
1174
1175     status = flush_file (file);
1176     if (status == 0)
1177       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1178     else if (status == ENOENT)
1179     {
1180       /* no file in our tree; see whether it exists at all */
1181       struct stat statbuf;
1182
1183       memset(&statbuf, 0, sizeof(statbuf));
1184       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1185         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1186       else
1187         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1188     }
1189     else if (status < 0)
1190       return send_response(sock, RESP_ERR, "Internal error.\n");
1191     else
1192       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1193   }
1194
1195   /* NOTREACHED */
1196   assert(1==0);
1197 } /* }}} int handle_request_flush */
1198
1199 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1200 {
1201   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1202
1203   pthread_mutex_lock(&cache_lock);
1204   flush_old_values(-1);
1205   pthread_mutex_unlock(&cache_lock);
1206
1207   return send_response(sock, RESP_OK, "Started flush.\n");
1208 } /* }}} static int handle_request_flushall */
1209
1210 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1211 {
1212   int status;
1213   char *file, file_tmp[PATH_MAX];
1214   cache_item_t *ci;
1215
1216   status = buffer_get_field(&buffer, &buffer_size, &file);
1217   if (status != 0)
1218     return syntax_error(sock,cmd);
1219
1220   get_abs_path(&file, file_tmp);
1221
1222   pthread_mutex_lock(&cache_lock);
1223   ci = g_tree_lookup(cache_tree, file);
1224   if (ci == NULL)
1225   {
1226     pthread_mutex_unlock(&cache_lock);
1227     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1228   }
1229
1230   for (int i=0; i < ci->values_num; i++)
1231     add_response_info(sock, "%s\n", ci->values[i]);
1232
1233   pthread_mutex_unlock(&cache_lock);
1234   return send_response(sock, RESP_OK, "updates pending\n");
1235 } /* }}} static int handle_request_pending */
1236
1237 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1238 {
1239   int status;
1240   gboolean found;
1241   char *file, file_tmp[PATH_MAX];
1242
1243   status = buffer_get_field(&buffer, &buffer_size, &file);
1244   if (status != 0)
1245     return syntax_error(sock,cmd);
1246
1247   get_abs_path(&file, file_tmp);
1248   if (!check_file_access(file, sock)) return 0;
1249
1250   pthread_mutex_lock(&cache_lock);
1251   found = g_tree_remove(cache_tree, file);
1252   pthread_mutex_unlock(&cache_lock);
1253
1254   if (found == TRUE)
1255   {
1256     if (sock != NULL)
1257       journal_write("forget", file);
1258
1259     return send_response(sock, RESP_OK, "Gone!\n");
1260   }
1261   else
1262     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1263
1264   /* NOTREACHED */
1265   assert(1==0);
1266 } /* }}} static int handle_request_forget */
1267
1268 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1269 {
1270   cache_item_t *ci;
1271
1272   pthread_mutex_lock(&cache_lock);
1273
1274   ci = cache_queue_head;
1275   while (ci != NULL)
1276   {
1277     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1278     ci = ci->next;
1279   }
1280
1281   pthread_mutex_unlock(&cache_lock);
1282
1283   return send_response(sock, RESP_OK, "in queue.\n");
1284 } /* }}} int handle_request_queue */
1285
1286 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1287 {
1288   char *file, file_tmp[PATH_MAX];
1289   int values_num = 0;
1290   int status;
1291   char orig_buf[CMD_MAX];
1292
1293   cache_item_t *ci;
1294
1295   /* save it for the journal later */
1296   strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1297
1298   status = buffer_get_field (&buffer, &buffer_size, &file);
1299   if (status != 0)
1300     return syntax_error(sock,cmd);
1301
1302   pthread_mutex_lock(&stats_lock);
1303   stats_updates_received++;
1304   pthread_mutex_unlock(&stats_lock);
1305
1306   get_abs_path(&file, file_tmp);
1307   if (!check_file_access(file, sock)) return 0;
1308
1309   pthread_mutex_lock (&cache_lock);
1310   ci = g_tree_lookup (cache_tree, file);
1311
1312   if (ci == NULL) /* {{{ */
1313   {
1314     struct stat statbuf;
1315
1316     /* don't hold the lock while we setup; stat(2) might block */
1317     pthread_mutex_unlock(&cache_lock);
1318
1319     memset (&statbuf, 0, sizeof (statbuf));
1320     status = stat (file, &statbuf);
1321     if (status != 0)
1322     {
1323       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1324
1325       status = errno;
1326       if (status == ENOENT)
1327         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1328       else
1329         return send_response(sock, RESP_ERR,
1330                              "stat failed with error %i.\n", status);
1331     }
1332     if (!S_ISREG (statbuf.st_mode))
1333       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1334
1335     if (access(file, R_OK|W_OK) != 0)
1336       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1337                            file, rrd_strerror(errno));
1338
1339     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1340     if (ci == NULL)
1341     {
1342       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1343
1344       return send_response(sock, RESP_ERR, "malloc failed.\n");
1345     }
1346     memset (ci, 0, sizeof (cache_item_t));
1347
1348     ci->file = strdup (file);
1349     if (ci->file == NULL)
1350     {
1351       free (ci);
1352       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1353
1354       return send_response(sock, RESP_ERR, "strdup failed.\n");
1355     }
1356
1357     wipe_ci_values(ci, now);
1358     ci->flags = CI_FLAGS_IN_TREE;
1359     pthread_cond_init(&ci->flushed, NULL);
1360
1361     pthread_mutex_lock(&cache_lock);
1362     g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1363   } /* }}} */
1364   assert (ci != NULL);
1365
1366   /* don't re-write updates in replay mode */
1367   if (sock != NULL)
1368     journal_write("update", orig_buf);
1369
1370   while (buffer_size > 0)
1371   {
1372     char **temp;
1373     char *value;
1374     time_t stamp;
1375     char *eostamp;
1376
1377     status = buffer_get_field (&buffer, &buffer_size, &value);
1378     if (status != 0)
1379     {
1380       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1381       break;
1382     }
1383
1384     /* make sure update time is always moving forward */
1385     stamp = strtol(value, &eostamp, 10);
1386     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1387     {
1388       pthread_mutex_unlock(&cache_lock);
1389       return send_response(sock, RESP_ERR,
1390                            "Cannot find timestamp in '%s'!\n", value);
1391     }
1392     else if (stamp <= ci->last_update_stamp)
1393     {
1394       pthread_mutex_unlock(&cache_lock);
1395       return send_response(sock, RESP_ERR,
1396                            "illegal attempt to update using time %ld when last"
1397                            " update time is %ld (minimum one second step)\n",
1398                            stamp, ci->last_update_stamp);
1399     }
1400     else
1401       ci->last_update_stamp = stamp;
1402
1403     temp = (char **) rrd_realloc (ci->values,
1404         sizeof (char *) * (ci->values_num + 1));
1405     if (temp == NULL)
1406     {
1407       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1408       continue;
1409     }
1410     ci->values = temp;
1411
1412     ci->values[ci->values_num] = strdup (value);
1413     if (ci->values[ci->values_num] == NULL)
1414     {
1415       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1416       continue;
1417     }
1418     ci->values_num++;
1419
1420     values_num++;
1421   }
1422
1423   if (((now - ci->last_flush_time) >= config_write_interval)
1424       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1425       && (ci->values_num > 0))
1426   {
1427     enqueue_cache_item (ci, TAIL);
1428   }
1429
1430   pthread_mutex_unlock (&cache_lock);
1431
1432   if (values_num < 1)
1433     return send_response(sock, RESP_ERR, "No values updated.\n");
1434   else
1435     return send_response(sock, RESP_OK,
1436                          "errors, enqueued %i value(s).\n", values_num);
1437
1438   /* NOTREACHED */
1439   assert(1==0);
1440
1441 } /* }}} int handle_request_update */
1442
1443 /* we came across a "WROTE" entry during journal replay.
1444  * throw away any values that we have accumulated for this file
1445  */
1446 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1447 {
1448   int i;
1449   cache_item_t *ci;
1450   const char *file = buffer;
1451
1452   pthread_mutex_lock(&cache_lock);
1453
1454   ci = g_tree_lookup(cache_tree, file);
1455   if (ci == NULL)
1456   {
1457     pthread_mutex_unlock(&cache_lock);
1458     return (0);
1459   }
1460
1461   if (ci->values)
1462   {
1463     for (i=0; i < ci->values_num; i++)
1464       free(ci->values[i]);
1465
1466     free(ci->values);
1467   }
1468
1469   wipe_ci_values(ci, now);
1470   remove_from_queue(ci);
1471
1472   pthread_mutex_unlock(&cache_lock);
1473   return (0);
1474 } /* }}} int handle_request_wrote */
1475
1476 /* start "BATCH" processing */
1477 static int batch_start (HANDLER_PROTO) /* {{{ */
1478 {
1479   int status;
1480   if (sock->batch_start)
1481     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1482
1483   status = send_response(sock, RESP_OK,
1484                          "Go ahead.  End with dot '.' on its own line.\n");
1485   sock->batch_start = time(NULL);
1486   sock->batch_cmd = 0;
1487
1488   return status;
1489 } /* }}} static int batch_start */
1490
1491 /* finish "BATCH" processing and return results to the client */
1492 static int batch_done (HANDLER_PROTO) /* {{{ */
1493 {
1494   assert(sock->batch_start);
1495   sock->batch_start = 0;
1496   sock->batch_cmd  = 0;
1497   return send_response(sock, RESP_OK, "errors\n");
1498 } /* }}} static int batch_done */
1499
1500 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1501 {
1502   return -1;
1503 } /* }}} static int handle_request_quit */
1504
1505 struct command COMMANDS[] = {
1506   {
1507     "UPDATE",
1508     handle_request_update,
1509     PRIV_HIGH,
1510     CMD_CONTEXT_ANY,
1511     "UPDATE <filename> <values> [<values> ...]\n"
1512     ,
1513     "Adds the given file to the internal cache if it is not yet known and\n"
1514     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1515     "for details.\n"
1516     "\n"
1517     "Each <values> has the following form:\n"
1518     "  <values> = <time>:<value>[:<value>[...]]\n"
1519     "See the rrdupdate(1) manpage for details.\n"
1520   },
1521   {
1522     "WROTE",
1523     handle_request_wrote,
1524     PRIV_HIGH,
1525     CMD_CONTEXT_JOURNAL,
1526     NULL,
1527     NULL
1528   },
1529   {
1530     "FLUSH",
1531     handle_request_flush,
1532     PRIV_LOW,
1533     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1534     "FLUSH <filename>\n"
1535     ,
1536     "Adds the given filename to the head of the update queue and returns\n"
1537     "after it has been dequeued.\n"
1538   },
1539   {
1540     "FLUSHALL",
1541     handle_request_flushall,
1542     PRIV_HIGH,
1543     CMD_CONTEXT_CLIENT,
1544     "FLUSHALL\n"
1545     ,
1546     "Triggers writing of all pending updates.  Returns immediately.\n"
1547   },
1548   {
1549     "PENDING",
1550     handle_request_pending,
1551     PRIV_HIGH,
1552     CMD_CONTEXT_CLIENT,
1553     "PENDING <filename>\n"
1554     ,
1555     "Shows any 'pending' updates for a file, in order.\n"
1556     "The updates shown have not yet been written to the underlying RRD file.\n"
1557   },
1558   {
1559     "FORGET",
1560     handle_request_forget,
1561     PRIV_HIGH,
1562     CMD_CONTEXT_ANY,
1563     "FORGET <filename>\n"
1564     ,
1565     "Removes the file completely from the cache.\n"
1566     "Any pending updates for the file will be lost.\n"
1567   },
1568   {
1569     "QUEUE",
1570     handle_request_queue,
1571     PRIV_LOW,
1572     CMD_CONTEXT_CLIENT,
1573     "QUEUE\n"
1574     ,
1575         "Shows all files in the output queue.\n"
1576     "The output is zero or more lines in the following format:\n"
1577     "(where <num_vals> is the number of values to be written)\n"
1578     "\n"
1579     "<num_vals> <filename>\n"
1580   },
1581   {
1582     "STATS",
1583     handle_request_stats,
1584     PRIV_LOW,
1585     CMD_CONTEXT_CLIENT,
1586     "STATS\n"
1587     ,
1588     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1589     "a description of the values.\n"
1590   },
1591   {
1592     "HELP",
1593     handle_request_help,
1594     PRIV_LOW,
1595     CMD_CONTEXT_CLIENT,
1596     "HELP [<command>]\n",
1597     NULL, /* special! */
1598   },
1599   {
1600     "BATCH",
1601     batch_start,
1602     PRIV_LOW,
1603     CMD_CONTEXT_CLIENT,
1604     "BATCH\n"
1605     ,
1606     "The 'BATCH' command permits the client to initiate a bulk load\n"
1607     "   of commands to rrdcached.\n"
1608     "\n"
1609     "Usage:\n"
1610     "\n"
1611     "    client: BATCH\n"
1612     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1613     "    client: command #1\n"
1614     "    client: command #2\n"
1615     "    client: ... and so on\n"
1616     "    client: .\n"
1617     "    server: 2 errors\n"
1618     "    server: 7 message for command #7\n"
1619     "    server: 9 message for command #9\n"
1620     "\n"
1621     "For more information, consult the rrdcached(1) documentation.\n"
1622   },
1623   {
1624     ".",   /* BATCH terminator */
1625     batch_done,
1626     PRIV_LOW,
1627     CMD_CONTEXT_BATCH,
1628     NULL,
1629     NULL
1630   },
1631   {
1632     "QUIT",
1633     handle_request_quit,
1634     PRIV_LOW,
1635     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1636     "QUIT\n"
1637     ,
1638     "Disconnect from rrdcached.\n"
1639   },
1640   {NULL,NULL,0,0,NULL,NULL}  /* LAST ENTRY */
1641 };
1642
1643 static struct command *find_command(char *cmd)
1644 {
1645   struct command *c = COMMANDS;
1646
1647   while (c->cmd != NULL)
1648   {
1649     if (strcasecmp(cmd, c->cmd) == 0)
1650       break;
1651     c++;
1652   }
1653
1654   if (c->cmd == NULL)
1655     return NULL;
1656   else
1657     return c;
1658 }
1659
1660 /* check whether commands are received in the expected context */
1661 static int command_check_context(listen_socket_t *sock, struct command *cmd)
1662 {
1663   if (sock == NULL)
1664     return (cmd->context & CMD_CONTEXT_JOURNAL);
1665   else if (sock->batch_start)
1666     return (cmd->context & CMD_CONTEXT_BATCH);
1667   else
1668     return (cmd->context & CMD_CONTEXT_CLIENT);
1669
1670   /* NOTREACHED */
1671   assert(1==0);
1672 }
1673
1674 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1675 {
1676   int status;
1677   char *cmd_str;
1678   char *resp_txt;
1679   struct command *help = NULL;
1680
1681   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1682   if (status == 0)
1683     help = find_command(cmd_str);
1684
1685   if (help && (help->syntax || help->help))
1686   {
1687     char tmp[CMD_MAX];
1688
1689     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1690     resp_txt = tmp;
1691
1692     if (help->syntax)
1693       add_response_info(sock, "Usage: %s\n", help->syntax);
1694
1695     if (help->help)
1696       add_response_info(sock, "%s\n", help->help);
1697   }
1698   else
1699   {
1700     help = COMMANDS;
1701     resp_txt = "Command overview\n";
1702
1703     while (help->cmd)
1704     {
1705       if (help->syntax)
1706         add_response_info(sock, "%s", help->syntax);
1707       help++;
1708     }
1709   }
1710
1711   return send_response(sock, RESP_OK, resp_txt);
1712 } /* }}} int handle_request_help */
1713
1714 /* if sock==NULL, we are in journal replay mode */
1715 static int handle_request (DISPATCH_PROTO) /* {{{ */
1716 {
1717   char *buffer_ptr = buffer;
1718   char *cmd_str = NULL;
1719   struct command *cmd = NULL;
1720   int status;
1721
1722   assert (buffer[buffer_size - 1] == '\0');
1723
1724   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1725   if (status != 0)
1726   {
1727     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1728     return (-1);
1729   }
1730
1731   if (sock != NULL && sock->batch_start)
1732     sock->batch_cmd++;
1733
1734   cmd = find_command(cmd_str);
1735   if (!cmd)
1736     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1737
1738   status = has_privilege(sock, cmd->min_priv);
1739   if (status <= 0)
1740     return status;
1741
1742   if (!command_check_context(sock, cmd))
1743     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1744
1745   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1746 } /* }}} int handle_request */
1747
1748 /* MUST NOT hold journal_lock before calling this */
1749 static void journal_rotate(void) /* {{{ */
1750 {
1751   FILE *old_fh = NULL;
1752   int new_fd;
1753
1754   if (journal_cur == NULL || journal_old == NULL)
1755     return;
1756
1757   pthread_mutex_lock(&journal_lock);
1758
1759   /* we rotate this way (rename before close) so that the we can release
1760    * the journal lock as fast as possible.  Journal writes to the new
1761    * journal can proceed immediately after the new file is opened.  The
1762    * fclose can then block without affecting new updates.
1763    */
1764   if (journal_fh != NULL)
1765   {
1766     old_fh = journal_fh;
1767     journal_fh = NULL;
1768     rename(journal_cur, journal_old);
1769     ++stats_journal_rotate;
1770   }
1771
1772   new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1773                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1774   if (new_fd >= 0)
1775   {
1776     journal_fh = fdopen(new_fd, "a");
1777     if (journal_fh == NULL)
1778       close(new_fd);
1779   }
1780
1781   pthread_mutex_unlock(&journal_lock);
1782
1783   if (old_fh != NULL)
1784     fclose(old_fh);
1785
1786   if (journal_fh == NULL)
1787   {
1788     RRDD_LOG(LOG_CRIT,
1789              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1790              journal_cur, rrd_strerror(errno));
1791
1792     RRDD_LOG(LOG_ERR,
1793              "JOURNALING DISABLED: All values will be flushed at shutdown");
1794     config_flush_at_shutdown = 1;
1795   }
1796
1797 } /* }}} static void journal_rotate */
1798
1799 static void journal_done(void) /* {{{ */
1800 {
1801   if (journal_cur == NULL)
1802     return;
1803
1804   pthread_mutex_lock(&journal_lock);
1805   if (journal_fh != NULL)
1806   {
1807     fclose(journal_fh);
1808     journal_fh = NULL;
1809   }
1810
1811   if (config_flush_at_shutdown)
1812   {
1813     RRDD_LOG(LOG_INFO, "removing journals");
1814     unlink(journal_old);
1815     unlink(journal_cur);
1816   }
1817   else
1818   {
1819     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1820              "journals will be used at next startup");
1821   }
1822
1823   pthread_mutex_unlock(&journal_lock);
1824
1825 } /* }}} static void journal_done */
1826
1827 static int journal_write(char *cmd, char *args) /* {{{ */
1828 {
1829   int chars;
1830
1831   if (journal_fh == NULL)
1832     return 0;
1833
1834   pthread_mutex_lock(&journal_lock);
1835   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1836   pthread_mutex_unlock(&journal_lock);
1837
1838   if (chars > 0)
1839   {
1840     pthread_mutex_lock(&stats_lock);
1841     stats_journal_bytes += chars;
1842     pthread_mutex_unlock(&stats_lock);
1843   }
1844
1845   return chars;
1846 } /* }}} static int journal_write */
1847
1848 static int journal_replay (const char *file) /* {{{ */
1849 {
1850   FILE *fh;
1851   int entry_cnt = 0;
1852   int fail_cnt = 0;
1853   uint64_t line = 0;
1854   char entry[CMD_MAX];
1855   time_t now;
1856
1857   if (file == NULL) return 0;
1858
1859   {
1860     char *reason = "unknown error";
1861     int status = 0;
1862     struct stat statbuf;
1863
1864     memset(&statbuf, 0, sizeof(statbuf));
1865     if (stat(file, &statbuf) != 0)
1866     {
1867       if (errno == ENOENT)
1868         return 0;
1869
1870       reason = "stat error";
1871       status = errno;
1872     }
1873     else if (!S_ISREG(statbuf.st_mode))
1874     {
1875       reason = "not a regular file";
1876       status = EPERM;
1877     }
1878     if (statbuf.st_uid != daemon_uid)
1879     {
1880       reason = "not owned by daemon user";
1881       status = EACCES;
1882     }
1883     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1884     {
1885       reason = "must not be user/group writable";
1886       status = EACCES;
1887     }
1888
1889     if (status != 0)
1890     {
1891       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1892                file, rrd_strerror(status), reason);
1893       return 0;
1894     }
1895   }
1896
1897   fh = fopen(file, "r");
1898   if (fh == NULL)
1899   {
1900     if (errno != ENOENT)
1901       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1902                file, rrd_strerror(errno));
1903     return 0;
1904   }
1905   else
1906     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1907
1908   now = time(NULL);
1909
1910   while(!feof(fh))
1911   {
1912     size_t entry_len;
1913
1914     ++line;
1915     if (fgets(entry, sizeof(entry), fh) == NULL)
1916       break;
1917     entry_len = strlen(entry);
1918
1919     /* check \n termination in case journal writing crashed mid-line */
1920     if (entry_len == 0)
1921       continue;
1922     else if (entry[entry_len - 1] != '\n')
1923     {
1924       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1925       ++fail_cnt;
1926       continue;
1927     }
1928
1929     entry[entry_len - 1] = '\0';
1930
1931     if (handle_request(NULL, now, entry, entry_len) == 0)
1932       ++entry_cnt;
1933     else
1934       ++fail_cnt;
1935   }
1936
1937   fclose(fh);
1938
1939   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1940            entry_cnt, fail_cnt);
1941
1942   return entry_cnt > 0 ? 1 : 0;
1943 } /* }}} static int journal_replay */
1944
1945 static void journal_init(void) /* {{{ */
1946 {
1947   int had_journal = 0;
1948
1949   if (journal_cur == NULL) return;
1950
1951   pthread_mutex_lock(&journal_lock);
1952
1953   RRDD_LOG(LOG_INFO, "checking for journal files");
1954
1955   had_journal += journal_replay(journal_old);
1956   had_journal += journal_replay(journal_cur);
1957
1958   /* it must have been a crash.  start a flush */
1959   if (had_journal && config_flush_at_shutdown)
1960     flush_old_values(-1);
1961
1962   pthread_mutex_unlock(&journal_lock);
1963   journal_rotate();
1964
1965   RRDD_LOG(LOG_INFO, "journal processing complete");
1966
1967 } /* }}} static void journal_init */
1968
1969 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
1970 {
1971   assert(sock != NULL);
1972
1973   free(sock->rbuf);  sock->rbuf = NULL;
1974   free(sock->wbuf);  sock->wbuf = NULL;
1975   free(sock);
1976 } /* }}} void free_listen_socket */
1977
1978 static void close_connection(listen_socket_t *sock) /* {{{ */
1979 {
1980   if (sock->fd >= 0)
1981   {
1982     close(sock->fd);
1983     sock->fd = -1;
1984   }
1985
1986   free_listen_socket(sock);
1987
1988 } /* }}} void close_connection */
1989
1990 static void *connection_thread_main (void *args) /* {{{ */
1991 {
1992   listen_socket_t *sock;
1993   int fd;
1994
1995   sock = (listen_socket_t *) args;
1996   fd = sock->fd;
1997
1998   /* init read buffers */
1999   sock->next_read = sock->next_cmd = 0;
2000   sock->rbuf = malloc(RBUF_SIZE);
2001   if (sock->rbuf == NULL)
2002   {
2003     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2004     close_connection(sock);
2005     return NULL;
2006   }
2007
2008   pthread_mutex_lock (&connection_threads_lock);
2009   connection_threads_num++;
2010   pthread_mutex_unlock (&connection_threads_lock);
2011
2012   while (do_shutdown == 0)
2013   {
2014     char *cmd;
2015     ssize_t cmd_len;
2016     ssize_t rbytes;
2017     time_t now;
2018
2019     struct pollfd pollfd;
2020     int status;
2021
2022     pollfd.fd = fd;
2023     pollfd.events = POLLIN | POLLPRI;
2024     pollfd.revents = 0;
2025
2026     status = poll (&pollfd, 1, /* timeout = */ 500);
2027     if (do_shutdown)
2028       break;
2029     else if (status == 0) /* timeout */
2030       continue;
2031     else if (status < 0) /* error */
2032     {
2033       status = errno;
2034       if (status != EINTR)
2035         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2036       continue;
2037     }
2038
2039     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2040       break;
2041     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2042     {
2043       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2044           "poll(2) returned something unexpected: %#04hx",
2045           pollfd.revents);
2046       break;
2047     }
2048
2049     rbytes = read(fd, sock->rbuf + sock->next_read,
2050                   RBUF_SIZE - sock->next_read);
2051     if (rbytes < 0)
2052     {
2053       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2054       break;
2055     }
2056     else if (rbytes == 0)
2057       break; /* eof */
2058
2059     sock->next_read += rbytes;
2060
2061     if (sock->batch_start)
2062       now = sock->batch_start;
2063     else
2064       now = time(NULL);
2065
2066     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2067     {
2068       status = handle_request (sock, now, cmd, cmd_len+1);
2069       if (status != 0)
2070         goto out_close;
2071     }
2072   }
2073
2074 out_close:
2075   close_connection(sock);
2076
2077   /* Remove this thread from the connection threads list */
2078   pthread_mutex_lock (&connection_threads_lock);
2079   connection_threads_num--;
2080   if (connection_threads_num <= 0)
2081     pthread_cond_broadcast(&connection_threads_done);
2082   pthread_mutex_unlock (&connection_threads_lock);
2083
2084   return (NULL);
2085 } /* }}} void *connection_thread_main */
2086
2087 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2088 {
2089   int fd;
2090   struct sockaddr_un sa;
2091   listen_socket_t *temp;
2092   int status;
2093   const char *path;
2094
2095   path = sock->addr;
2096   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2097     path += strlen("unix:");
2098
2099   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2100       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2101   if (temp == NULL)
2102   {
2103     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2104     return (-1);
2105   }
2106   listen_fds = temp;
2107   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2108
2109   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2110   if (fd < 0)
2111   {
2112     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2113              rrd_strerror(errno));
2114     return (-1);
2115   }
2116
2117   memset (&sa, 0, sizeof (sa));
2118   sa.sun_family = AF_UNIX;
2119   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2120
2121   /* if we've gotten this far, we own the pid file.  any daemon started
2122    * with the same args must not be alive.  therefore, ensure that we can
2123    * create the socket...
2124    */
2125   unlink(path);
2126
2127   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2128   if (status != 0)
2129   {
2130     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2131              path, rrd_strerror(errno));
2132     close (fd);
2133     return (-1);
2134   }
2135
2136   status = listen (fd, /* backlog = */ 10);
2137   if (status != 0)
2138   {
2139     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2140              path, rrd_strerror(errno));
2141     close (fd);
2142     unlink (path);
2143     return (-1);
2144   }
2145
2146   listen_fds[listen_fds_num].fd = fd;
2147   listen_fds[listen_fds_num].family = PF_UNIX;
2148   strncpy(listen_fds[listen_fds_num].addr, path,
2149           sizeof (listen_fds[listen_fds_num].addr) - 1);
2150   listen_fds_num++;
2151
2152   return (0);
2153 } /* }}} int open_listen_socket_unix */
2154
2155 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2156 {
2157   struct addrinfo ai_hints;
2158   struct addrinfo *ai_res;
2159   struct addrinfo *ai_ptr;
2160   char addr_copy[NI_MAXHOST];
2161   char *addr;
2162   char *port;
2163   int status;
2164
2165   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2166   addr_copy[sizeof (addr_copy) - 1] = 0;
2167   addr = addr_copy;
2168
2169   memset (&ai_hints, 0, sizeof (ai_hints));
2170   ai_hints.ai_flags = 0;
2171 #ifdef AI_ADDRCONFIG
2172   ai_hints.ai_flags |= AI_ADDRCONFIG;
2173 #endif
2174   ai_hints.ai_family = AF_UNSPEC;
2175   ai_hints.ai_socktype = SOCK_STREAM;
2176
2177   port = NULL;
2178   if (*addr == '[') /* IPv6+port format */
2179   {
2180     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2181     addr++;
2182
2183     port = strchr (addr, ']');
2184     if (port == NULL)
2185     {
2186       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2187       return (-1);
2188     }
2189     *port = 0;
2190     port++;
2191
2192     if (*port == ':')
2193       port++;
2194     else if (*port == 0)
2195       port = NULL;
2196     else
2197     {
2198       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2199       return (-1);
2200     }
2201   } /* if (*addr = ']') */
2202   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2203   {
2204     port = rindex(addr, ':');
2205     if (port != NULL)
2206     {
2207       *port = 0;
2208       port++;
2209     }
2210   }
2211   ai_res = NULL;
2212   status = getaddrinfo (addr,
2213                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2214                         &ai_hints, &ai_res);
2215   if (status != 0)
2216   {
2217     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2218              addr, gai_strerror (status));
2219     return (-1);
2220   }
2221
2222   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2223   {
2224     int fd;
2225     listen_socket_t *temp;
2226     int one = 1;
2227
2228     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2229         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2230     if (temp == NULL)
2231     {
2232       fprintf (stderr,
2233                "rrdcached: open_listen_socket_network: realloc failed.\n");
2234       continue;
2235     }
2236     listen_fds = temp;
2237     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2238
2239     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2240     if (fd < 0)
2241     {
2242       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2243                rrd_strerror(errno));
2244       continue;
2245     }
2246
2247     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2248
2249     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2250     if (status != 0)
2251     {
2252       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2253                sock->addr, rrd_strerror(errno));
2254       close (fd);
2255       continue;
2256     }
2257
2258     status = listen (fd, /* backlog = */ 10);
2259     if (status != 0)
2260     {
2261       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2262                sock->addr, rrd_strerror(errno));
2263       close (fd);
2264       freeaddrinfo(ai_res);
2265       return (-1);
2266     }
2267
2268     listen_fds[listen_fds_num].fd = fd;
2269     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2270     listen_fds_num++;
2271   } /* for (ai_ptr) */
2272
2273   freeaddrinfo(ai_res);
2274   return (0);
2275 } /* }}} static int open_listen_socket_network */
2276
2277 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2278 {
2279   assert(sock != NULL);
2280   assert(sock->addr != NULL);
2281
2282   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2283       || sock->addr[0] == '/')
2284     return (open_listen_socket_unix(sock));
2285   else
2286     return (open_listen_socket_network(sock));
2287 } /* }}} int open_listen_socket */
2288
2289 static int close_listen_sockets (void) /* {{{ */
2290 {
2291   size_t i;
2292
2293   for (i = 0; i < listen_fds_num; i++)
2294   {
2295     close (listen_fds[i].fd);
2296
2297     if (listen_fds[i].family == PF_UNIX)
2298       unlink(listen_fds[i].addr);
2299   }
2300
2301   free (listen_fds);
2302   listen_fds = NULL;
2303   listen_fds_num = 0;
2304
2305   return (0);
2306 } /* }}} int close_listen_sockets */
2307
2308 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2309 {
2310   struct pollfd *pollfds;
2311   int pollfds_num;
2312   int status;
2313   int i;
2314
2315   if (listen_fds_num < 1)
2316   {
2317     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2318     return (NULL);
2319   }
2320
2321   pollfds_num = listen_fds_num;
2322   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2323   if (pollfds == NULL)
2324   {
2325     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2326     return (NULL);
2327   }
2328   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2329
2330   RRDD_LOG(LOG_INFO, "listening for connections");
2331
2332   while (do_shutdown == 0)
2333   {
2334     for (i = 0; i < pollfds_num; i++)
2335     {
2336       pollfds[i].fd = listen_fds[i].fd;
2337       pollfds[i].events = POLLIN | POLLPRI;
2338       pollfds[i].revents = 0;
2339     }
2340
2341     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2342     if (do_shutdown)
2343       break;
2344     else if (status == 0) /* timeout */
2345       continue;
2346     else if (status < 0) /* error */
2347     {
2348       status = errno;
2349       if (status != EINTR)
2350       {
2351         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2352       }
2353       continue;
2354     }
2355
2356     for (i = 0; i < pollfds_num; i++)
2357     {
2358       listen_socket_t *client_sock;
2359       struct sockaddr_storage client_sa;
2360       socklen_t client_sa_size;
2361       pthread_t tid;
2362       pthread_attr_t attr;
2363
2364       if (pollfds[i].revents == 0)
2365         continue;
2366
2367       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2368       {
2369         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2370             "poll(2) returned something unexpected for listen FD #%i.",
2371             pollfds[i].fd);
2372         continue;
2373       }
2374
2375       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2376       if (client_sock == NULL)
2377       {
2378         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2379         continue;
2380       }
2381       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2382
2383       client_sa_size = sizeof (client_sa);
2384       client_sock->fd = accept (pollfds[i].fd,
2385           (struct sockaddr *) &client_sa, &client_sa_size);
2386       if (client_sock->fd < 0)
2387       {
2388         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2389         free(client_sock);
2390         continue;
2391       }
2392
2393       pthread_attr_init (&attr);
2394       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2395
2396       status = pthread_create (&tid, &attr, connection_thread_main,
2397                                client_sock);
2398       if (status != 0)
2399       {
2400         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2401         close_connection(client_sock);
2402         continue;
2403       }
2404     } /* for (pollfds_num) */
2405   } /* while (do_shutdown == 0) */
2406
2407   RRDD_LOG(LOG_INFO, "starting shutdown");
2408
2409   close_listen_sockets ();
2410
2411   pthread_mutex_lock (&connection_threads_lock);
2412   while (connection_threads_num > 0)
2413     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2414   pthread_mutex_unlock (&connection_threads_lock);
2415
2416   free(pollfds);
2417
2418   return (NULL);
2419 } /* }}} void *listen_thread_main */
2420
2421 static int daemonize (void) /* {{{ */
2422 {
2423   int pid_fd;
2424   char *base_dir;
2425
2426   daemon_uid = geteuid();
2427
2428   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2429   if (pid_fd < 0)
2430     pid_fd = check_pidfile();
2431   if (pid_fd < 0)
2432     return pid_fd;
2433
2434   /* open all the listen sockets */
2435   if (config_listen_address_list_len > 0)
2436   {
2437     for (int i = 0; i < config_listen_address_list_len; i++)
2438     {
2439       open_listen_socket (config_listen_address_list[i]);
2440       free_listen_socket (config_listen_address_list[i]);
2441     }
2442
2443     free(config_listen_address_list);
2444   }
2445   else
2446   {
2447     listen_socket_t sock;
2448     memset(&sock, 0, sizeof(sock));
2449     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2450     open_listen_socket (&sock);
2451   }
2452
2453   if (listen_fds_num < 1)
2454   {
2455     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2456     goto error;
2457   }
2458
2459   if (!stay_foreground)
2460   {
2461     pid_t child;
2462
2463     child = fork ();
2464     if (child < 0)
2465     {
2466       fprintf (stderr, "daemonize: fork(2) failed.\n");
2467       goto error;
2468     }
2469     else if (child > 0)
2470       exit(0);
2471
2472     /* Become session leader */
2473     setsid ();
2474
2475     /* Open the first three file descriptors to /dev/null */
2476     close (2);
2477     close (1);
2478     close (0);
2479
2480     open ("/dev/null", O_RDWR);
2481     dup (0);
2482     dup (0);
2483   } /* if (!stay_foreground) */
2484
2485   /* Change into the /tmp directory. */
2486   base_dir = (config_base_dir != NULL)
2487     ? config_base_dir
2488     : "/tmp";
2489
2490   if (chdir (base_dir) != 0)
2491   {
2492     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2493     goto error;
2494   }
2495
2496   install_signal_handlers();
2497
2498   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2499   RRDD_LOG(LOG_INFO, "starting up");
2500
2501   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2502                                 (GDestroyNotify) free_cache_item);
2503   if (cache_tree == NULL)
2504   {
2505     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2506     goto error;
2507   }
2508
2509   return write_pidfile (pid_fd);
2510
2511 error:
2512   remove_pidfile();
2513   return -1;
2514 } /* }}} int daemonize */
2515
2516 static int cleanup (void) /* {{{ */
2517 {
2518   do_shutdown++;
2519
2520   pthread_cond_broadcast (&flush_cond);
2521   pthread_join (flush_thread, NULL);
2522
2523   pthread_cond_broadcast (&queue_cond);
2524   for (int i = 0; i < config_queue_threads; i++)
2525     pthread_join (queue_threads[i], NULL);
2526
2527   if (config_flush_at_shutdown)
2528   {
2529     assert(cache_queue_head == NULL);
2530     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2531   }
2532
2533   journal_done();
2534   remove_pidfile ();
2535
2536   free(queue_threads);
2537   free(config_base_dir);
2538   free(config_pid_file);
2539   free(journal_cur);
2540   free(journal_old);
2541
2542   pthread_mutex_lock(&cache_lock);
2543   g_tree_destroy(cache_tree);
2544
2545   RRDD_LOG(LOG_INFO, "goodbye");
2546   closelog ();
2547
2548   return (0);
2549 } /* }}} int cleanup */
2550
2551 static int read_options (int argc, char **argv) /* {{{ */
2552 {
2553   int option;
2554   int status = 0;
2555
2556   while ((option = getopt(argc, argv, "gl:L:f:w:z:t:Bb:p:Fj:h?")) != -1)
2557   {
2558     switch (option)
2559     {
2560       case 'g':
2561         stay_foreground=1;
2562         break;
2563
2564       case 'L':
2565       case 'l':
2566       {
2567         listen_socket_t **temp;
2568         listen_socket_t *new;
2569
2570         new = malloc(sizeof(listen_socket_t));
2571         if (new == NULL)
2572         {
2573           fprintf(stderr, "read_options: malloc failed.\n");
2574           return(2);
2575         }
2576         memset(new, 0, sizeof(listen_socket_t));
2577
2578         temp = (listen_socket_t **) rrd_realloc (config_listen_address_list,
2579             sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2580         if (temp == NULL)
2581         {
2582           fprintf (stderr, "read_options: realloc failed.\n");
2583           return (2);
2584         }
2585         config_listen_address_list = temp;
2586
2587         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2588         new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2589
2590         temp[config_listen_address_list_len] = new;
2591         config_listen_address_list_len++;
2592       }
2593       break;
2594
2595       case 'f':
2596       {
2597         int temp;
2598
2599         temp = atoi (optarg);
2600         if (temp > 0)
2601           config_flush_interval = temp;
2602         else
2603         {
2604           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2605           status = 3;
2606         }
2607       }
2608       break;
2609
2610       case 'w':
2611       {
2612         int temp;
2613
2614         temp = atoi (optarg);
2615         if (temp > 0)
2616           config_write_interval = temp;
2617         else
2618         {
2619           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2620           status = 2;
2621         }
2622       }
2623       break;
2624
2625       case 'z':
2626       {
2627         int temp;
2628
2629         temp = atoi(optarg);
2630         if (temp > 0)
2631           config_write_jitter = temp;
2632         else
2633         {
2634           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2635           status = 2;
2636         }
2637
2638         break;
2639       }
2640
2641       case 't':
2642       {
2643         int threads;
2644         threads = atoi(optarg);
2645         if (threads >= 1)
2646           config_queue_threads = threads;
2647         else
2648         {
2649           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2650           return 1;
2651         }
2652       }
2653       break;
2654
2655       case 'B':
2656         config_write_base_only = 1;
2657         break;
2658
2659       case 'b':
2660       {
2661         size_t len;
2662         char base_realpath[PATH_MAX];
2663
2664         if (config_base_dir != NULL)
2665           free (config_base_dir);
2666         config_base_dir = strdup (optarg);
2667         if (config_base_dir == NULL)
2668         {
2669           fprintf (stderr, "read_options: strdup failed.\n");
2670           return (3);
2671         }
2672
2673         /* make sure that the base directory is not resolved via
2674          * symbolic links.  this makes some performance-enhancing
2675          * assumptions possible (we don't have to resolve paths
2676          * that start with a "/")
2677          */
2678         if (realpath(config_base_dir, base_realpath) == NULL)
2679         {
2680           fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2681           return 5;
2682         }
2683         else if (strncmp(config_base_dir,
2684                          base_realpath, sizeof(base_realpath)) != 0)
2685         {
2686           fprintf(stderr,
2687                   "Base directory (-b) resolved via file system links!\n"
2688                   "Please consult rrdcached '-b' documentation!\n"
2689                   "Consider specifying the real directory (%s)\n",
2690                   base_realpath);
2691           return 5;
2692         }
2693
2694         len = strlen (config_base_dir);
2695         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2696         {
2697           config_base_dir[len - 1] = 0;
2698           len--;
2699         }
2700
2701         if (len < 1)
2702         {
2703           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2704           return (4);
2705         }
2706
2707         _config_base_dir_len = len;
2708       }
2709       break;
2710
2711       case 'p':
2712       {
2713         if (config_pid_file != NULL)
2714           free (config_pid_file);
2715         config_pid_file = strdup (optarg);
2716         if (config_pid_file == NULL)
2717         {
2718           fprintf (stderr, "read_options: strdup failed.\n");
2719           return (3);
2720         }
2721       }
2722       break;
2723
2724       case 'F':
2725         config_flush_at_shutdown = 1;
2726         break;
2727
2728       case 'j':
2729       {
2730         struct stat statbuf;
2731         const char *dir = optarg;
2732
2733         status = stat(dir, &statbuf);
2734         if (status != 0)
2735         {
2736           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2737           return 6;
2738         }
2739
2740         if (!S_ISDIR(statbuf.st_mode)
2741             || access(dir, R_OK|W_OK|X_OK) != 0)
2742         {
2743           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2744                   errno ? rrd_strerror(errno) : "");
2745           return 6;
2746         }
2747
2748         journal_cur = malloc(PATH_MAX + 1);
2749         journal_old = malloc(PATH_MAX + 1);
2750         if (journal_cur == NULL || journal_old == NULL)
2751         {
2752           fprintf(stderr, "malloc failure for journal files\n");
2753           return 6;
2754         }
2755         else 
2756         {
2757           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2758           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2759         }
2760       }
2761       break;
2762
2763       case 'h':
2764       case '?':
2765         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2766             "\n"
2767             "Usage: rrdcached [options]\n"
2768             "\n"
2769             "Valid options are:\n"
2770             "  -l <address>  Socket address to listen to.\n"
2771             "  -L <address>  Socket address to listen to ('FLUSH' only).\n"
2772             "  -w <seconds>  Interval in which to write data.\n"
2773             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2774             "  -t <threads>  Number of write threads.\n"
2775             "  -f <seconds>  Interval in which to flush dead data.\n"
2776             "  -p <file>     Location of the PID-file.\n"
2777             "  -b <dir>      Base directory to change to.\n"
2778             "  -B            Restrict file access to paths within -b <dir>\n"
2779             "  -g            Do not fork and run in the foreground.\n"
2780             "  -j <dir>      Directory in which to create the journal files.\n"
2781             "  -F            Always flush all updates at shutdown\n"
2782             "\n"
2783             "For more information and a detailed description of all options "
2784             "please refer\n"
2785             "to the rrdcached(1) manual page.\n",
2786             VERSION);
2787         status = -1;
2788         break;
2789     } /* switch (option) */
2790   } /* while (getopt) */
2791
2792   /* advise the user when values are not sane */
2793   if (config_flush_interval < 2 * config_write_interval)
2794     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2795             " 2x write interval (-w) !\n");
2796   if (config_write_jitter > config_write_interval)
2797     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2798             " write interval (-w) !\n");
2799
2800   if (config_write_base_only && config_base_dir == NULL)
2801     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2802             "  Consult the rrdcached documentation\n");
2803
2804   if (journal_cur == NULL)
2805     config_flush_at_shutdown = 1;
2806
2807   return (status);
2808 } /* }}} int read_options */
2809
2810 int main (int argc, char **argv)
2811 {
2812   int status;
2813
2814   status = read_options (argc, argv);
2815   if (status != 0)
2816   {
2817     if (status < 0)
2818       status = 0;
2819     return (status);
2820   }
2821
2822   status = daemonize ();
2823   if (status != 0)
2824   {
2825     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
2826     return (1);
2827   }
2828
2829   journal_init();
2830
2831   /* start the queue threads */
2832   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
2833   if (queue_threads == NULL)
2834   {
2835     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
2836     cleanup();
2837     return (1);
2838   }
2839   for (int i = 0; i < config_queue_threads; i++)
2840   {
2841     memset (&queue_threads[i], 0, sizeof (*queue_threads));
2842     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
2843     if (status != 0)
2844     {
2845       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2846       cleanup();
2847       return (1);
2848     }
2849   }
2850
2851   /* start the flush thread */
2852   memset(&flush_thread, 0, sizeof(flush_thread));
2853   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
2854   if (status != 0)
2855   {
2856     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
2857     cleanup();
2858     return (1);
2859   }
2860
2861   listen_thread_main (NULL);
2862   cleanup ();
2863
2864   return (0);
2865 } /* int main */
2866
2867 /*
2868  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2869  */