f997d3ceddbd4060add089914cf95b80b86b3fcf
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
73
74 #include "rrd.h"
75 #include "rrd_client.h"
76
77 #include <stdlib.h>
78
79 #ifndef WIN32
80 #include <stdint.h>
81 #include <unistd.h>
82 #include <strings.h>
83 #include <inttypes.h>
84 #       include <sys/socket.h>
85
86 #else
87
88 #endif
89 #include <stdio.h>
90 #include <string.h>
91
92 #include <sys/types.h>
93 #include <sys/stat.h>
94 #include <fcntl.h>
95 #include <signal.h>
96 #include <sys/un.h>
97 #include <netdb.h>
98 #include <poll.h>
99 #include <syslog.h>
100 #include <pthread.h>
101 #include <errno.h>
102 #include <assert.h>
103 #include <sys/time.h>
104 #include <time.h>
105
106 #include <glib-2.0/glib.h>
107 /* }}} */
108
109 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
110
111 #ifndef __GNUC__
112 # define __attribute__(x) /**/
113 #endif
114
115 /*
116  * Types
117  */
118 typedef enum
119 {
120   PRIV_LOW,
121   PRIV_HIGH
122 } socket_privilege;
123
124 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
125
126 struct listen_socket_s
127 {
128   int fd;
129   char addr[PATH_MAX + 1];
130   int family;
131   socket_privilege privilege;
132
133   /* state for BATCH processing */
134   time_t batch_start;
135   int batch_cmd;
136
137   /* buffered IO */
138   char *rbuf;
139   off_t next_cmd;
140   off_t next_read;
141
142   char *wbuf;
143   ssize_t wbuf_len;
144 };
145 typedef struct listen_socket_s listen_socket_t;
146
147 struct command;
148 /* note: guard against "unused" warnings in the handlers */
149 #define DISPATCH_PROTO  listen_socket_t *sock   __attribute__((unused)),\
150                         time_t now              __attribute__((unused)),\
151                         char  *buffer           __attribute__((unused)),\
152                         size_t buffer_size      __attribute__((unused))
153
154 #define HANDLER_PROTO   struct command *cmd     __attribute__((unused)),\
155                         DISPATCH_PROTO
156
157 struct command {
158   char   *cmd;
159   int (*handler)(HANDLER_PROTO);
160   socket_privilege min_priv;
161
162   char  context;                /* where we expect to see it */
163 #define CMD_CONTEXT_CLIENT      (1<<0)
164 #define CMD_CONTEXT_BATCH       (1<<1)
165 #define CMD_CONTEXT_JOURNAL     (1<<2)
166 #define CMD_CONTEXT_ANY         (0x7f)
167
168   char *syntax;
169   char *help;
170 };
171
172 struct cache_item_s;
173 typedef struct cache_item_s cache_item_t;
174 struct cache_item_s
175 {
176   char *file;
177   char **values;
178   size_t values_num;
179   time_t last_flush_time;
180   time_t last_update_stamp;
181 #define CI_FLAGS_IN_TREE  (1<<0)
182 #define CI_FLAGS_IN_QUEUE (1<<1)
183   int flags;
184   pthread_cond_t  flushed;
185   cache_item_t *prev;
186   cache_item_t *next;
187 };
188
189 struct callback_flush_data_s
190 {
191   time_t now;
192   time_t abs_timeout;
193   char **keys;
194   size_t keys_num;
195 };
196 typedef struct callback_flush_data_s callback_flush_data_t;
197
198 enum queue_side_e
199 {
200   HEAD,
201   TAIL
202 };
203 typedef enum queue_side_e queue_side_t;
204
205 /* max length of socket command or response */
206 #define CMD_MAX 4096
207 #define RBUF_SIZE (CMD_MAX*2)
208
209 /*
210  * Variables
211  */
212 static int stay_foreground = 0;
213 static uid_t daemon_uid;
214
215 static listen_socket_t *listen_fds = NULL;
216 static size_t listen_fds_num = 0;
217
218 static int do_shutdown = 0;
219
220 static pthread_t *queue_threads;
221 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
222 static int config_queue_threads = 4;
223
224 static pthread_t flush_thread;
225 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
226
227 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
228 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
229 static int connection_threads_num = 0;
230
231 /* Cache stuff */
232 static GTree          *cache_tree = NULL;
233 static cache_item_t   *cache_queue_head = NULL;
234 static cache_item_t   *cache_queue_tail = NULL;
235 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
236
237 static int config_write_interval = 300;
238 static int config_write_jitter   = 0;
239 static int config_flush_interval = 3600;
240 static int config_flush_at_shutdown = 0;
241 static char *config_pid_file = NULL;
242 static char *config_base_dir = NULL;
243 static size_t _config_base_dir_len = 0;
244 static int config_write_base_only = 0;
245
246 static listen_socket_t **config_listen_address_list = NULL;
247 static size_t config_listen_address_list_len = 0;
248
249 static uint64_t stats_queue_length = 0;
250 static uint64_t stats_updates_received = 0;
251 static uint64_t stats_flush_received = 0;
252 static uint64_t stats_updates_written = 0;
253 static uint64_t stats_data_sets_written = 0;
254 static uint64_t stats_journal_bytes = 0;
255 static uint64_t stats_journal_rotate = 0;
256 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
257
258 /* Journaled updates */
259 static char *journal_cur = NULL;
260 static char *journal_old = NULL;
261 static FILE *journal_fh = NULL;
262 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
263 static int journal_write(char *cmd, char *args);
264 static void journal_done(void);
265 static void journal_rotate(void);
266
267 /* prototypes for forward refernces */
268 static int handle_request_help (HANDLER_PROTO);
269
270 /* 
271  * Functions
272  */
273 static void sig_common (const char *sig) /* {{{ */
274 {
275   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
276   do_shutdown++;
277   pthread_cond_broadcast(&flush_cond);
278   pthread_cond_broadcast(&queue_cond);
279 } /* }}} void sig_common */
280
281 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
282 {
283   sig_common("INT");
284 } /* }}} void sig_int_handler */
285
286 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
287 {
288   sig_common("TERM");
289 } /* }}} void sig_term_handler */
290
291 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
292 {
293   config_flush_at_shutdown = 1;
294   sig_common("USR1");
295 } /* }}} void sig_usr1_handler */
296
297 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
298 {
299   config_flush_at_shutdown = 0;
300   sig_common("USR2");
301 } /* }}} void sig_usr2_handler */
302
303 static void install_signal_handlers(void) /* {{{ */
304 {
305   /* These structures are static, because `sigaction' behaves weird if the are
306    * overwritten.. */
307   static struct sigaction sa_int;
308   static struct sigaction sa_term;
309   static struct sigaction sa_pipe;
310   static struct sigaction sa_usr1;
311   static struct sigaction sa_usr2;
312
313   /* Install signal handlers */
314   memset (&sa_int, 0, sizeof (sa_int));
315   sa_int.sa_handler = sig_int_handler;
316   sigaction (SIGINT, &sa_int, NULL);
317
318   memset (&sa_term, 0, sizeof (sa_term));
319   sa_term.sa_handler = sig_term_handler;
320   sigaction (SIGTERM, &sa_term, NULL);
321
322   memset (&sa_pipe, 0, sizeof (sa_pipe));
323   sa_pipe.sa_handler = SIG_IGN;
324   sigaction (SIGPIPE, &sa_pipe, NULL);
325
326   memset (&sa_pipe, 0, sizeof (sa_usr1));
327   sa_usr1.sa_handler = sig_usr1_handler;
328   sigaction (SIGUSR1, &sa_usr1, NULL);
329
330   memset (&sa_usr2, 0, sizeof (sa_usr2));
331   sa_usr2.sa_handler = sig_usr2_handler;
332   sigaction (SIGUSR2, &sa_usr2, NULL);
333
334 } /* }}} void install_signal_handlers */
335
336 static int open_pidfile(char *action, int oflag) /* {{{ */
337 {
338   int fd;
339   char *file;
340
341   file = (config_pid_file != NULL)
342     ? config_pid_file
343     : LOCALSTATEDIR "/run/rrdcached.pid";
344
345   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
346   if (fd < 0)
347     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
348             action, file, rrd_strerror(errno));
349
350   return(fd);
351 } /* }}} static int open_pidfile */
352
353 /* check existing pid file to see whether a daemon is running */
354 static int check_pidfile(void)
355 {
356   int pid_fd;
357   pid_t pid;
358   char pid_str[16];
359
360   pid_fd = open_pidfile("open", O_RDWR);
361   if (pid_fd < 0)
362     return pid_fd;
363
364   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
365     return -1;
366
367   pid = atoi(pid_str);
368   if (pid <= 0)
369     return -1;
370
371   /* another running process that we can signal COULD be
372    * a competing rrdcached */
373   if (pid != getpid() && kill(pid, 0) == 0)
374   {
375     fprintf(stderr,
376             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
377     close(pid_fd);
378     return -1;
379   }
380
381   lseek(pid_fd, 0, SEEK_SET);
382   if (ftruncate(pid_fd, 0) == -1)
383   {
384     fprintf(stderr,
385             "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
386     close(pid_fd);
387     return -1;
388   }
389
390   fprintf(stderr,
391           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
392           "rrdcached: starting normally.\n", pid);
393
394   return pid_fd;
395 } /* }}} static int check_pidfile */
396
397 static int write_pidfile (int fd) /* {{{ */
398 {
399   pid_t pid;
400   FILE *fh;
401
402   pid = getpid ();
403
404   fh = fdopen (fd, "w");
405   if (fh == NULL)
406   {
407     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
408     close(fd);
409     return (-1);
410   }
411
412   fprintf (fh, "%i\n", (int) pid);
413   fclose (fh);
414
415   return (0);
416 } /* }}} int write_pidfile */
417
418 static int remove_pidfile (void) /* {{{ */
419 {
420   char *file;
421   int status;
422
423   file = (config_pid_file != NULL)
424     ? config_pid_file
425     : LOCALSTATEDIR "/run/rrdcached.pid";
426
427   status = unlink (file);
428   if (status == 0)
429     return (0);
430   return (errno);
431 } /* }}} int remove_pidfile */
432
433 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
434 {
435   char *eol;
436
437   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
438                sock->next_read - sock->next_cmd);
439
440   if (eol == NULL)
441   {
442     /* no commands left, move remainder back to front of rbuf */
443     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
444             sock->next_read - sock->next_cmd);
445     sock->next_read -= sock->next_cmd;
446     sock->next_cmd = 0;
447     *len = 0;
448     return NULL;
449   }
450   else
451   {
452     char *cmd = sock->rbuf + sock->next_cmd;
453     *eol = '\0';
454
455     sock->next_cmd = eol - sock->rbuf + 1;
456
457     if (eol > sock->rbuf && *(eol-1) == '\r')
458       *(--eol) = '\0'; /* handle "\r\n" EOL */
459
460     *len = eol - cmd;
461
462     return cmd;
463   }
464
465   /* NOTREACHED */
466   assert(1==0);
467 }
468
469 /* add the characters directly to the write buffer */
470 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
471 {
472   char *new_buf;
473
474   assert(sock != NULL);
475
476   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
477   if (new_buf == NULL)
478   {
479     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
480     return -1;
481   }
482
483   strncpy(new_buf + sock->wbuf_len, str, len + 1);
484
485   sock->wbuf = new_buf;
486   sock->wbuf_len += len;
487
488   return 0;
489 } /* }}} static int add_to_wbuf */
490
491 /* add the text to the "extra" info that's sent after the status line */
492 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
493 {
494   va_list argp;
495   char buffer[CMD_MAX];
496   int len;
497
498   if (sock == NULL) return 0; /* journal replay mode */
499   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
500
501   va_start(argp, fmt);
502 #ifdef HAVE_VSNPRINTF
503   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
504 #else
505   len = vsprintf(buffer, fmt, argp);
506 #endif
507   va_end(argp);
508   if (len < 0)
509   {
510     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
511     return -1;
512   }
513
514   return add_to_wbuf(sock, buffer, len);
515 } /* }}} static int add_response_info */
516
517 static int count_lines(char *str) /* {{{ */
518 {
519   int lines = 0;
520
521   if (str != NULL)
522   {
523     while ((str = strchr(str, '\n')) != NULL)
524     {
525       ++lines;
526       ++str;
527     }
528   }
529
530   return lines;
531 } /* }}} static int count_lines */
532
533 /* send the response back to the user.
534  * returns 0 on success, -1 on error
535  * write buffer is always zeroed after this call */
536 static int send_response (listen_socket_t *sock, response_code rc,
537                           char *fmt, ...) /* {{{ */
538 {
539   va_list argp;
540   char buffer[CMD_MAX];
541   int lines;
542   ssize_t wrote;
543   int rclen, len;
544
545   if (sock == NULL) return rc;  /* journal replay mode */
546
547   if (sock->batch_start)
548   {
549     if (rc == RESP_OK)
550       return rc; /* no response on success during BATCH */
551     lines = sock->batch_cmd;
552   }
553   else if (rc == RESP_OK)
554     lines = count_lines(sock->wbuf);
555   else
556     lines = -1;
557
558   rclen = sprintf(buffer, "%d ", lines);
559   va_start(argp, fmt);
560 #ifdef HAVE_VSNPRINTF
561   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
562 #else
563   len = vsprintf(buffer+rclen, fmt, argp);
564 #endif
565   va_end(argp);
566   if (len < 0)
567     return -1;
568
569   len += rclen;
570
571   /* append the result to the wbuf, don't write to the user */
572   if (sock->batch_start)
573     return add_to_wbuf(sock, buffer, len);
574
575   /* first write must be complete */
576   if (len != write(sock->fd, buffer, len))
577   {
578     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
579     return -1;
580   }
581
582   if (sock->wbuf != NULL && rc == RESP_OK)
583   {
584     wrote = 0;
585     while (wrote < sock->wbuf_len)
586     {
587       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
588       if (wb <= 0)
589       {
590         RRDD_LOG(LOG_INFO, "send_response: could not write results");
591         return -1;
592       }
593       wrote += wb;
594     }
595   }
596
597   free(sock->wbuf); sock->wbuf = NULL;
598   sock->wbuf_len = 0;
599
600   return 0;
601 } /* }}} */
602
603 static void wipe_ci_values(cache_item_t *ci, time_t when)
604 {
605   ci->values = NULL;
606   ci->values_num = 0;
607
608   ci->last_flush_time = when;
609   if (config_write_jitter > 0)
610     ci->last_flush_time += (rrd_random() % config_write_jitter);
611 }
612
613 /* remove_from_queue
614  * remove a "cache_item_t" item from the queue.
615  * must hold 'cache_lock' when calling this
616  */
617 static void remove_from_queue(cache_item_t *ci) /* {{{ */
618 {
619   if (ci == NULL) return;
620   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
621
622   if (ci->prev == NULL)
623     cache_queue_head = ci->next; /* reset head */
624   else
625     ci->prev->next = ci->next;
626
627   if (ci->next == NULL)
628     cache_queue_tail = ci->prev; /* reset the tail */
629   else
630     ci->next->prev = ci->prev;
631
632   ci->next = ci->prev = NULL;
633   ci->flags &= ~CI_FLAGS_IN_QUEUE;
634
635   pthread_mutex_lock (&stats_lock);
636   assert (stats_queue_length > 0);
637   stats_queue_length--;
638   pthread_mutex_unlock (&stats_lock);
639
640 } /* }}} static void remove_from_queue */
641
642 /* free the resources associated with the cache_item_t
643  * must hold cache_lock when calling this function
644  */
645 static void *free_cache_item(cache_item_t *ci) /* {{{ */
646 {
647   if (ci == NULL) return NULL;
648
649   remove_from_queue(ci);
650
651   for (size_t i=0; i < ci->values_num; i++)
652     free(ci->values[i]);
653
654   free (ci->values);
655   free (ci->file);
656
657   /* in case anyone is waiting */
658   pthread_cond_broadcast(&ci->flushed);
659
660   free (ci);
661
662   return NULL;
663 } /* }}} static void *free_cache_item */
664
665 /*
666  * enqueue_cache_item:
667  * `cache_lock' must be acquired before calling this function!
668  */
669 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
670     queue_side_t side)
671 {
672   if (ci == NULL)
673     return (-1);
674
675   if (ci->values_num == 0)
676     return (0);
677
678   if (side == HEAD)
679   {
680     if (cache_queue_head == ci)
681       return 0;
682
683     /* remove if further down in queue */
684     remove_from_queue(ci);
685
686     ci->prev = NULL;
687     ci->next = cache_queue_head;
688     if (ci->next != NULL)
689       ci->next->prev = ci;
690     cache_queue_head = ci;
691
692     if (cache_queue_tail == NULL)
693       cache_queue_tail = cache_queue_head;
694   }
695   else /* (side == TAIL) */
696   {
697     /* We don't move values back in the list.. */
698     if (ci->flags & CI_FLAGS_IN_QUEUE)
699       return (0);
700
701     assert (ci->next == NULL);
702     assert (ci->prev == NULL);
703
704     ci->prev = cache_queue_tail;
705
706     if (cache_queue_tail == NULL)
707       cache_queue_head = ci;
708     else
709       cache_queue_tail->next = ci;
710
711     cache_queue_tail = ci;
712   }
713
714   ci->flags |= CI_FLAGS_IN_QUEUE;
715
716   pthread_cond_signal(&queue_cond);
717   pthread_mutex_lock (&stats_lock);
718   stats_queue_length++;
719   pthread_mutex_unlock (&stats_lock);
720
721   return (0);
722 } /* }}} int enqueue_cache_item */
723
724 /*
725  * tree_callback_flush:
726  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
727  * while this is in progress.
728  */
729 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
730     gpointer data)
731 {
732   cache_item_t *ci;
733   callback_flush_data_t *cfd;
734
735   ci = (cache_item_t *) value;
736   cfd = (callback_flush_data_t *) data;
737
738   if (ci->flags & CI_FLAGS_IN_QUEUE)
739     return FALSE;
740
741   if ((ci->last_flush_time <= cfd->abs_timeout)
742       && (ci->values_num > 0))
743   {
744     enqueue_cache_item (ci, TAIL);
745   }
746   else if ((do_shutdown != 0)
747       && (ci->values_num > 0))
748   {
749     enqueue_cache_item (ci, TAIL);
750   }
751   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
752       && (ci->values_num <= 0))
753   {
754     assert ((char *) key == ci->file);
755     if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
756     {
757       RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
758       return (FALSE);
759     }
760   }
761
762   return (FALSE);
763 } /* }}} gboolean tree_callback_flush */
764
765 static int flush_old_values (int max_age)
766 {
767   callback_flush_data_t cfd;
768   size_t k;
769
770   memset (&cfd, 0, sizeof (cfd));
771   /* Pass the current time as user data so that we don't need to call
772    * `time' for each node. */
773   cfd.now = time (NULL);
774   cfd.keys = NULL;
775   cfd.keys_num = 0;
776
777   if (max_age > 0)
778     cfd.abs_timeout = cfd.now - max_age;
779   else
780     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
781
782   /* `tree_callback_flush' will return the keys of all values that haven't
783    * been touched in the last `config_flush_interval' seconds in `cfd'.
784    * The char*'s in this array point to the same memory as ci->file, so we
785    * don't need to free them separately. */
786   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
787
788   for (k = 0; k < cfd.keys_num; k++)
789   {
790     /* should never fail, since we have held the cache_lock
791      * the entire time */
792     assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
793   }
794
795   if (cfd.keys != NULL)
796   {
797     free (cfd.keys);
798     cfd.keys = NULL;
799   }
800
801   return (0);
802 } /* int flush_old_values */
803
804 static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
805 {
806   struct timeval now;
807   struct timespec next_flush;
808   int status;
809
810   gettimeofday (&now, NULL);
811   next_flush.tv_sec = now.tv_sec + config_flush_interval;
812   next_flush.tv_nsec = 1000 * now.tv_usec;
813
814   pthread_mutex_lock(&cache_lock);
815
816   while (!do_shutdown)
817   {
818     gettimeofday (&now, NULL);
819     if ((now.tv_sec > next_flush.tv_sec)
820         || ((now.tv_sec == next_flush.tv_sec)
821           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
822     {
823       /* Flush all values that haven't been written in the last
824        * `config_write_interval' seconds. */
825       flush_old_values (config_write_interval);
826
827       /* Determine the time of the next cache flush. */
828       next_flush.tv_sec =
829         now.tv_sec + next_flush.tv_sec % config_flush_interval;
830
831       /* unlock the cache while we rotate so we don't block incoming
832        * updates if the fsync() blocks on disk I/O */
833       pthread_mutex_unlock(&cache_lock);
834       journal_rotate();
835       pthread_mutex_lock(&cache_lock);
836     }
837
838     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
839     if (status != 0 && status != ETIMEDOUT)
840     {
841       RRDD_LOG (LOG_ERR, "flush_thread_main: "
842                 "pthread_cond_timedwait returned %i.", status);
843     }
844   }
845
846   if (config_flush_at_shutdown)
847     flush_old_values (-1); /* flush everything */
848
849   pthread_mutex_unlock(&cache_lock);
850
851   return NULL;
852 } /* void *flush_thread_main */
853
854 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
855 {
856   pthread_mutex_lock (&cache_lock);
857
858   while (!do_shutdown
859          || (cache_queue_head != NULL && config_flush_at_shutdown))
860   {
861     cache_item_t *ci;
862     char *file;
863     char **values;
864     size_t values_num;
865     int status;
866
867     /* Now, check if there's something to store away. If not, wait until
868      * something comes in.  if we are shutting down, do not wait around.  */
869     if (cache_queue_head == NULL && !do_shutdown)
870     {
871       status = pthread_cond_wait (&queue_cond, &cache_lock);
872       if ((status != 0) && (status != ETIMEDOUT))
873       {
874         RRDD_LOG (LOG_ERR, "queue_thread_main: "
875             "pthread_cond_wait returned %i.", status);
876       }
877     }
878
879     /* Check if a value has arrived. This may be NULL if we timed out or there
880      * was an interrupt such as a signal. */
881     if (cache_queue_head == NULL)
882       continue;
883
884     ci = cache_queue_head;
885
886     /* copy the relevant parts */
887     file = strdup (ci->file);
888     if (file == NULL)
889     {
890       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
891       continue;
892     }
893
894     assert(ci->values != NULL);
895     assert(ci->values_num > 0);
896
897     values = ci->values;
898     values_num = ci->values_num;
899
900     wipe_ci_values(ci, time(NULL));
901     remove_from_queue(ci);
902
903     pthread_mutex_unlock (&cache_lock);
904
905     rrd_clear_error ();
906     status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
907     if (status != 0)
908     {
909       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
910           "rrd_update_r (%s) failed with status %i. (%s)",
911           file, status, rrd_get_error());
912     }
913
914     journal_write("wrote", file);
915     pthread_cond_broadcast(&ci->flushed);
916
917     rrd_free_ptrs((void ***) &values, &values_num);
918     free(file);
919
920     if (status == 0)
921     {
922       pthread_mutex_lock (&stats_lock);
923       stats_updates_written++;
924       stats_data_sets_written += values_num;
925       pthread_mutex_unlock (&stats_lock);
926     }
927
928     pthread_mutex_lock (&cache_lock);
929   }
930   pthread_mutex_unlock (&cache_lock);
931
932   return (NULL);
933 } /* }}} void *queue_thread_main */
934
935 static int buffer_get_field (char **buffer_ret, /* {{{ */
936     size_t *buffer_size_ret, char **field_ret)
937 {
938   char *buffer;
939   size_t buffer_pos;
940   size_t buffer_size;
941   char *field;
942   size_t field_size;
943   int status;
944
945   buffer = *buffer_ret;
946   buffer_pos = 0;
947   buffer_size = *buffer_size_ret;
948   field = *buffer_ret;
949   field_size = 0;
950
951   if (buffer_size <= 0)
952     return (-1);
953
954   /* This is ensured by `handle_request'. */
955   assert (buffer[buffer_size - 1] == '\0');
956
957   status = -1;
958   while (buffer_pos < buffer_size)
959   {
960     /* Check for end-of-field or end-of-buffer */
961     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
962     {
963       field[field_size] = 0;
964       field_size++;
965       buffer_pos++;
966       status = 0;
967       break;
968     }
969     /* Handle escaped characters. */
970     else if (buffer[buffer_pos] == '\\')
971     {
972       if (buffer_pos >= (buffer_size - 1))
973         break;
974       buffer_pos++;
975       field[field_size] = buffer[buffer_pos];
976       field_size++;
977       buffer_pos++;
978     }
979     /* Normal operation */ 
980     else
981     {
982       field[field_size] = buffer[buffer_pos];
983       field_size++;
984       buffer_pos++;
985     }
986   } /* while (buffer_pos < buffer_size) */
987
988   if (status != 0)
989     return (status);
990
991   *buffer_ret = buffer + buffer_pos;
992   *buffer_size_ret = buffer_size - buffer_pos;
993   *field_ret = field;
994
995   return (0);
996 } /* }}} int buffer_get_field */
997
998 /* if we're restricting writes to the base directory,
999  * check whether the file falls within the dir
1000  * returns 1 if OK, otherwise 0
1001  */
1002 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1003 {
1004   assert(file != NULL);
1005
1006   if (!config_write_base_only
1007       || sock == NULL /* journal replay */
1008       || config_base_dir == NULL)
1009     return 1;
1010
1011   if (strstr(file, "../") != NULL) goto err;
1012
1013   /* relative paths without "../" are ok */
1014   if (*file != '/') return 1;
1015
1016   /* file must be of the format base + "/" + <1+ char filename> */
1017   if (strlen(file) < _config_base_dir_len + 2) goto err;
1018   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1019   if (*(file + _config_base_dir_len) != '/') goto err;
1020
1021   return 1;
1022
1023 err:
1024   if (sock != NULL && sock->fd >= 0)
1025     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1026
1027   return 0;
1028 } /* }}} static int check_file_access */
1029
1030 /* when using a base dir, convert relative paths to absolute paths.
1031  * if necessary, modifies the "filename" pointer to point
1032  * to the new path created in "tmp".  "tmp" is provided
1033  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1034  *
1035  * this allows us to optimize for the expected case (absolute path)
1036  * with a no-op.
1037  */
1038 static void get_abs_path(char **filename, char *tmp)
1039 {
1040   assert(tmp != NULL);
1041   assert(filename != NULL && *filename != NULL);
1042
1043   if (config_base_dir == NULL || **filename == '/')
1044     return;
1045
1046   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1047   *filename = tmp;
1048 } /* }}} static int get_abs_path */
1049
1050 /* returns 1 if we have the required privilege level,
1051  * otherwise issue an error to the user on sock */
1052 static int has_privilege (listen_socket_t *sock, /* {{{ */
1053                           socket_privilege priv)
1054 {
1055   if (sock == NULL) /* journal replay */
1056     return 1;
1057
1058   if (sock->privilege >= priv)
1059     return 1;
1060
1061   return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1062 } /* }}} static int has_privilege */
1063
1064 static int flush_file (const char *filename) /* {{{ */
1065 {
1066   cache_item_t *ci;
1067
1068   pthread_mutex_lock (&cache_lock);
1069
1070   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1071   if (ci == NULL)
1072   {
1073     pthread_mutex_unlock (&cache_lock);
1074     return (ENOENT);
1075   }
1076
1077   if (ci->values_num > 0)
1078   {
1079     /* Enqueue at head */
1080     enqueue_cache_item (ci, HEAD);
1081     pthread_cond_wait(&ci->flushed, &cache_lock);
1082   }
1083
1084   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1085    * may have been purged during our cond_wait() */
1086
1087   pthread_mutex_unlock(&cache_lock);
1088
1089   return (0);
1090 } /* }}} int flush_file */
1091
1092 static int syntax_error(listen_socket_t *sock, struct command *cmd) /* {{{ */
1093 {
1094   char *err = "Syntax error.\n";
1095
1096   if (cmd && cmd->syntax)
1097     err = cmd->syntax;
1098
1099   return send_response(sock, RESP_ERR, "Usage: %s", err);
1100 } /* }}} static int syntax_error() */
1101
1102 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1103 {
1104   uint64_t copy_queue_length;
1105   uint64_t copy_updates_received;
1106   uint64_t copy_flush_received;
1107   uint64_t copy_updates_written;
1108   uint64_t copy_data_sets_written;
1109   uint64_t copy_journal_bytes;
1110   uint64_t copy_journal_rotate;
1111
1112   uint64_t tree_nodes_number;
1113   uint64_t tree_depth;
1114
1115   pthread_mutex_lock (&stats_lock);
1116   copy_queue_length       = stats_queue_length;
1117   copy_updates_received   = stats_updates_received;
1118   copy_flush_received     = stats_flush_received;
1119   copy_updates_written    = stats_updates_written;
1120   copy_data_sets_written  = stats_data_sets_written;
1121   copy_journal_bytes      = stats_journal_bytes;
1122   copy_journal_rotate     = stats_journal_rotate;
1123   pthread_mutex_unlock (&stats_lock);
1124
1125   pthread_mutex_lock (&cache_lock);
1126   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1127   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1128   pthread_mutex_unlock (&cache_lock);
1129
1130   add_response_info(sock,
1131                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1132   add_response_info(sock,
1133                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1134   add_response_info(sock,
1135                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1136   add_response_info(sock,
1137                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1138   add_response_info(sock,
1139                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1140   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1141   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1142   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1143   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1144
1145   send_response(sock, RESP_OK, "Statistics follow\n");
1146
1147   return (0);
1148 } /* }}} int handle_request_stats */
1149
1150 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1151 {
1152   char *file, file_tmp[PATH_MAX];
1153   int status;
1154
1155   status = buffer_get_field (&buffer, &buffer_size, &file);
1156   if (status != 0)
1157   {
1158     return syntax_error(sock,cmd);
1159   }
1160   else
1161   {
1162     pthread_mutex_lock(&stats_lock);
1163     stats_flush_received++;
1164     pthread_mutex_unlock(&stats_lock);
1165
1166     get_abs_path(&file, file_tmp);
1167     if (!check_file_access(file, sock)) return 0;
1168
1169     status = flush_file (file);
1170     if (status == 0)
1171       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1172     else if (status == ENOENT)
1173     {
1174       /* no file in our tree; see whether it exists at all */
1175       struct stat statbuf;
1176
1177       memset(&statbuf, 0, sizeof(statbuf));
1178       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1179         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1180       else
1181         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1182     }
1183     else if (status < 0)
1184       return send_response(sock, RESP_ERR, "Internal error.\n");
1185     else
1186       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1187   }
1188
1189   /* NOTREACHED */
1190   assert(1==0);
1191 } /* }}} int handle_request_flush */
1192
1193 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1194 {
1195   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1196
1197   pthread_mutex_lock(&cache_lock);
1198   flush_old_values(-1);
1199   pthread_mutex_unlock(&cache_lock);
1200
1201   return send_response(sock, RESP_OK, "Started flush.\n");
1202 } /* }}} static int handle_request_flushall */
1203
1204 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1205 {
1206   int status;
1207   char *file, file_tmp[PATH_MAX];
1208   cache_item_t *ci;
1209
1210   status = buffer_get_field(&buffer, &buffer_size, &file);
1211   if (status != 0)
1212     return syntax_error(sock,cmd);
1213
1214   get_abs_path(&file, file_tmp);
1215
1216   pthread_mutex_lock(&cache_lock);
1217   ci = g_tree_lookup(cache_tree, file);
1218   if (ci == NULL)
1219   {
1220     pthread_mutex_unlock(&cache_lock);
1221     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1222   }
1223
1224   for (size_t i=0; i < ci->values_num; i++)
1225     add_response_info(sock, "%s\n", ci->values[i]);
1226
1227   pthread_mutex_unlock(&cache_lock);
1228   return send_response(sock, RESP_OK, "updates pending\n");
1229 } /* }}} static int handle_request_pending */
1230
1231 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1232 {
1233   int status;
1234   gboolean found;
1235   char *file, file_tmp[PATH_MAX];
1236
1237   status = buffer_get_field(&buffer, &buffer_size, &file);
1238   if (status != 0)
1239     return syntax_error(sock,cmd);
1240
1241   get_abs_path(&file, file_tmp);
1242   if (!check_file_access(file, sock)) return 0;
1243
1244   pthread_mutex_lock(&cache_lock);
1245   found = g_tree_remove(cache_tree, file);
1246   pthread_mutex_unlock(&cache_lock);
1247
1248   if (found == TRUE)
1249   {
1250     if (sock != NULL)
1251       journal_write("forget", file);
1252
1253     return send_response(sock, RESP_OK, "Gone!\n");
1254   }
1255   else
1256     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1257
1258   /* NOTREACHED */
1259   assert(1==0);
1260 } /* }}} static int handle_request_forget */
1261
1262 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1263 {
1264   cache_item_t *ci;
1265
1266   pthread_mutex_lock(&cache_lock);
1267
1268   ci = cache_queue_head;
1269   while (ci != NULL)
1270   {
1271     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1272     ci = ci->next;
1273   }
1274
1275   pthread_mutex_unlock(&cache_lock);
1276
1277   return send_response(sock, RESP_OK, "in queue.\n");
1278 } /* }}} int handle_request_queue */
1279
1280 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1281 {
1282   char *file, file_tmp[PATH_MAX];
1283   int values_num = 0;
1284   int status;
1285   char orig_buf[CMD_MAX];
1286
1287   cache_item_t *ci;
1288
1289   /* save it for the journal later */
1290   strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1291
1292   status = buffer_get_field (&buffer, &buffer_size, &file);
1293   if (status != 0)
1294     return syntax_error(sock,cmd);
1295
1296   pthread_mutex_lock(&stats_lock);
1297   stats_updates_received++;
1298   pthread_mutex_unlock(&stats_lock);
1299
1300   get_abs_path(&file, file_tmp);
1301   if (!check_file_access(file, sock)) return 0;
1302
1303   pthread_mutex_lock (&cache_lock);
1304   ci = g_tree_lookup (cache_tree, file);
1305
1306   if (ci == NULL) /* {{{ */
1307   {
1308     struct stat statbuf;
1309
1310     /* don't hold the lock while we setup; stat(2) might block */
1311     pthread_mutex_unlock(&cache_lock);
1312
1313     memset (&statbuf, 0, sizeof (statbuf));
1314     status = stat (file, &statbuf);
1315     if (status != 0)
1316     {
1317       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1318
1319       status = errno;
1320       if (status == ENOENT)
1321         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1322       else
1323         return send_response(sock, RESP_ERR,
1324                              "stat failed with error %i.\n", status);
1325     }
1326     if (!S_ISREG (statbuf.st_mode))
1327       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1328
1329     if (access(file, R_OK|W_OK) != 0)
1330       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1331                            file, rrd_strerror(errno));
1332
1333     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1334     if (ci == NULL)
1335     {
1336       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1337
1338       return send_response(sock, RESP_ERR, "malloc failed.\n");
1339     }
1340     memset (ci, 0, sizeof (cache_item_t));
1341
1342     ci->file = strdup (file);
1343     if (ci->file == NULL)
1344     {
1345       free (ci);
1346       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1347
1348       return send_response(sock, RESP_ERR, "strdup failed.\n");
1349     }
1350
1351     wipe_ci_values(ci, now);
1352     ci->flags = CI_FLAGS_IN_TREE;
1353     pthread_cond_init(&ci->flushed, NULL);
1354
1355     pthread_mutex_lock(&cache_lock);
1356     g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1357   } /* }}} */
1358   assert (ci != NULL);
1359
1360   /* don't re-write updates in replay mode */
1361   if (sock != NULL)
1362     journal_write("update", orig_buf);
1363
1364   while (buffer_size > 0)
1365   {
1366     char *value;
1367     time_t stamp;
1368     char *eostamp;
1369
1370     status = buffer_get_field (&buffer, &buffer_size, &value);
1371     if (status != 0)
1372     {
1373       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1374       break;
1375     }
1376
1377     /* make sure update time is always moving forward */
1378     stamp = strtol(value, &eostamp, 10);
1379     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1380     {
1381       pthread_mutex_unlock(&cache_lock);
1382       return send_response(sock, RESP_ERR,
1383                            "Cannot find timestamp in '%s'!\n", value);
1384     }
1385     else if (stamp <= ci->last_update_stamp)
1386     {
1387       pthread_mutex_unlock(&cache_lock);
1388       return send_response(sock, RESP_ERR,
1389                            "illegal attempt to update using time %ld when last"
1390                            " update time is %ld (minimum one second step)\n",
1391                            stamp, ci->last_update_stamp);
1392     }
1393     else
1394       ci->last_update_stamp = stamp;
1395
1396     if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
1397     {
1398       RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1399       continue;
1400     }
1401
1402     values_num++;
1403   }
1404
1405   if (((now - ci->last_flush_time) >= config_write_interval)
1406       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1407       && (ci->values_num > 0))
1408   {
1409     enqueue_cache_item (ci, TAIL);
1410   }
1411
1412   pthread_mutex_unlock (&cache_lock);
1413
1414   if (values_num < 1)
1415     return send_response(sock, RESP_ERR, "No values updated.\n");
1416   else
1417     return send_response(sock, RESP_OK,
1418                          "errors, enqueued %i value(s).\n", values_num);
1419
1420   /* NOTREACHED */
1421   assert(1==0);
1422
1423 } /* }}} int handle_request_update */
1424
1425 /* we came across a "WROTE" entry during journal replay.
1426  * throw away any values that we have accumulated for this file
1427  */
1428 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1429 {
1430   cache_item_t *ci;
1431   const char *file = buffer;
1432
1433   pthread_mutex_lock(&cache_lock);
1434
1435   ci = g_tree_lookup(cache_tree, file);
1436   if (ci == NULL)
1437   {
1438     pthread_mutex_unlock(&cache_lock);
1439     return (0);
1440   }
1441
1442   if (ci->values)
1443     rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1444
1445   wipe_ci_values(ci, now);
1446   remove_from_queue(ci);
1447
1448   pthread_mutex_unlock(&cache_lock);
1449   return (0);
1450 } /* }}} int handle_request_wrote */
1451
1452 /* start "BATCH" processing */
1453 static int batch_start (HANDLER_PROTO) /* {{{ */
1454 {
1455   int status;
1456   if (sock->batch_start)
1457     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1458
1459   status = send_response(sock, RESP_OK,
1460                          "Go ahead.  End with dot '.' on its own line.\n");
1461   sock->batch_start = time(NULL);
1462   sock->batch_cmd = 0;
1463
1464   return status;
1465 } /* }}} static int batch_start */
1466
1467 /* finish "BATCH" processing and return results to the client */
1468 static int batch_done (HANDLER_PROTO) /* {{{ */
1469 {
1470   assert(sock->batch_start);
1471   sock->batch_start = 0;
1472   sock->batch_cmd  = 0;
1473   return send_response(sock, RESP_OK, "errors\n");
1474 } /* }}} static int batch_done */
1475
1476 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1477 {
1478   return -1;
1479 } /* }}} static int handle_request_quit */
1480
1481 struct command COMMANDS[] = {
1482   {
1483     "UPDATE",
1484     handle_request_update,
1485     PRIV_HIGH,
1486     CMD_CONTEXT_ANY,
1487     "UPDATE <filename> <values> [<values> ...]\n"
1488     ,
1489     "Adds the given file to the internal cache if it is not yet known and\n"
1490     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1491     "for details.\n"
1492     "\n"
1493     "Each <values> has the following form:\n"
1494     "  <values> = <time>:<value>[:<value>[...]]\n"
1495     "See the rrdupdate(1) manpage for details.\n"
1496   },
1497   {
1498     "WROTE",
1499     handle_request_wrote,
1500     PRIV_HIGH,
1501     CMD_CONTEXT_JOURNAL,
1502     NULL,
1503     NULL
1504   },
1505   {
1506     "FLUSH",
1507     handle_request_flush,
1508     PRIV_LOW,
1509     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1510     "FLUSH <filename>\n"
1511     ,
1512     "Adds the given filename to the head of the update queue and returns\n"
1513     "after it has been dequeued.\n"
1514   },
1515   {
1516     "FLUSHALL",
1517     handle_request_flushall,
1518     PRIV_HIGH,
1519     CMD_CONTEXT_CLIENT,
1520     "FLUSHALL\n"
1521     ,
1522     "Triggers writing of all pending updates.  Returns immediately.\n"
1523   },
1524   {
1525     "PENDING",
1526     handle_request_pending,
1527     PRIV_HIGH,
1528     CMD_CONTEXT_CLIENT,
1529     "PENDING <filename>\n"
1530     ,
1531     "Shows any 'pending' updates for a file, in order.\n"
1532     "The updates shown have not yet been written to the underlying RRD file.\n"
1533   },
1534   {
1535     "FORGET",
1536     handle_request_forget,
1537     PRIV_HIGH,
1538     CMD_CONTEXT_ANY,
1539     "FORGET <filename>\n"
1540     ,
1541     "Removes the file completely from the cache.\n"
1542     "Any pending updates for the file will be lost.\n"
1543   },
1544   {
1545     "QUEUE",
1546     handle_request_queue,
1547     PRIV_LOW,
1548     CMD_CONTEXT_CLIENT,
1549     "QUEUE\n"
1550     ,
1551         "Shows all files in the output queue.\n"
1552     "The output is zero or more lines in the following format:\n"
1553     "(where <num_vals> is the number of values to be written)\n"
1554     "\n"
1555     "<num_vals> <filename>\n"
1556   },
1557   {
1558     "STATS",
1559     handle_request_stats,
1560     PRIV_LOW,
1561     CMD_CONTEXT_CLIENT,
1562     "STATS\n"
1563     ,
1564     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1565     "a description of the values.\n"
1566   },
1567   {
1568     "HELP",
1569     handle_request_help,
1570     PRIV_LOW,
1571     CMD_CONTEXT_CLIENT,
1572     "HELP [<command>]\n",
1573     NULL, /* special! */
1574   },
1575   {
1576     "BATCH",
1577     batch_start,
1578     PRIV_LOW,
1579     CMD_CONTEXT_CLIENT,
1580     "BATCH\n"
1581     ,
1582     "The 'BATCH' command permits the client to initiate a bulk load\n"
1583     "   of commands to rrdcached.\n"
1584     "\n"
1585     "Usage:\n"
1586     "\n"
1587     "    client: BATCH\n"
1588     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1589     "    client: command #1\n"
1590     "    client: command #2\n"
1591     "    client: ... and so on\n"
1592     "    client: .\n"
1593     "    server: 2 errors\n"
1594     "    server: 7 message for command #7\n"
1595     "    server: 9 message for command #9\n"
1596     "\n"
1597     "For more information, consult the rrdcached(1) documentation.\n"
1598   },
1599   {
1600     ".",   /* BATCH terminator */
1601     batch_done,
1602     PRIV_LOW,
1603     CMD_CONTEXT_BATCH,
1604     NULL,
1605     NULL
1606   },
1607   {
1608     "QUIT",
1609     handle_request_quit,
1610     PRIV_LOW,
1611     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1612     "QUIT\n"
1613     ,
1614     "Disconnect from rrdcached.\n"
1615   },
1616   {NULL,NULL,0,0,NULL,NULL}  /* LAST ENTRY */
1617 };
1618
1619 static struct command *find_command(char *cmd)
1620 {
1621   struct command *c = COMMANDS;
1622
1623   while (c->cmd != NULL)
1624   {
1625     if (strcasecmp(cmd, c->cmd) == 0)
1626       break;
1627     c++;
1628   }
1629
1630   if (c->cmd == NULL)
1631     return NULL;
1632   else
1633     return c;
1634 }
1635
1636 /* check whether commands are received in the expected context */
1637 static int command_check_context(listen_socket_t *sock, struct command *cmd)
1638 {
1639   if (sock == NULL)
1640     return (cmd->context & CMD_CONTEXT_JOURNAL);
1641   else if (sock->batch_start)
1642     return (cmd->context & CMD_CONTEXT_BATCH);
1643   else
1644     return (cmd->context & CMD_CONTEXT_CLIENT);
1645
1646   /* NOTREACHED */
1647   assert(1==0);
1648 }
1649
1650 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1651 {
1652   int status;
1653   char *cmd_str;
1654   char *resp_txt;
1655   struct command *help = NULL;
1656
1657   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1658   if (status == 0)
1659     help = find_command(cmd_str);
1660
1661   if (help && (help->syntax || help->help))
1662   {
1663     char tmp[CMD_MAX];
1664
1665     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1666     resp_txt = tmp;
1667
1668     if (help->syntax)
1669       add_response_info(sock, "Usage: %s\n", help->syntax);
1670
1671     if (help->help)
1672       add_response_info(sock, "%s\n", help->help);
1673   }
1674   else
1675   {
1676     help = COMMANDS;
1677     resp_txt = "Command overview\n";
1678
1679     while (help->cmd)
1680     {
1681       if (help->syntax)
1682         add_response_info(sock, "%s", help->syntax);
1683       help++;
1684     }
1685   }
1686
1687   return send_response(sock, RESP_OK, resp_txt);
1688 } /* }}} int handle_request_help */
1689
1690 /* if sock==NULL, we are in journal replay mode */
1691 static int handle_request (DISPATCH_PROTO) /* {{{ */
1692 {
1693   char *buffer_ptr = buffer;
1694   char *cmd_str = NULL;
1695   struct command *cmd = NULL;
1696   int status;
1697
1698   assert (buffer[buffer_size - 1] == '\0');
1699
1700   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1701   if (status != 0)
1702   {
1703     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1704     return (-1);
1705   }
1706
1707   if (sock != NULL && sock->batch_start)
1708     sock->batch_cmd++;
1709
1710   cmd = find_command(cmd_str);
1711   if (!cmd)
1712     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1713
1714   status = has_privilege(sock, cmd->min_priv);
1715   if (status <= 0)
1716     return status;
1717
1718   if (!command_check_context(sock, cmd))
1719     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1720
1721   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1722 } /* }}} int handle_request */
1723
1724 /* MUST NOT hold journal_lock before calling this */
1725 static void journal_rotate(void) /* {{{ */
1726 {
1727   FILE *old_fh = NULL;
1728   int new_fd;
1729
1730   if (journal_cur == NULL || journal_old == NULL)
1731     return;
1732
1733   pthread_mutex_lock(&journal_lock);
1734
1735   /* we rotate this way (rename before close) so that the we can release
1736    * the journal lock as fast as possible.  Journal writes to the new
1737    * journal can proceed immediately after the new file is opened.  The
1738    * fclose can then block without affecting new updates.
1739    */
1740   if (journal_fh != NULL)
1741   {
1742     old_fh = journal_fh;
1743     journal_fh = NULL;
1744     rename(journal_cur, journal_old);
1745     ++stats_journal_rotate;
1746   }
1747
1748   new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1749                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1750   if (new_fd >= 0)
1751   {
1752     journal_fh = fdopen(new_fd, "a");
1753     if (journal_fh == NULL)
1754       close(new_fd);
1755   }
1756
1757   pthread_mutex_unlock(&journal_lock);
1758
1759   if (old_fh != NULL)
1760     fclose(old_fh);
1761
1762   if (journal_fh == NULL)
1763   {
1764     RRDD_LOG(LOG_CRIT,
1765              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1766              journal_cur, rrd_strerror(errno));
1767
1768     RRDD_LOG(LOG_ERR,
1769              "JOURNALING DISABLED: All values will be flushed at shutdown");
1770     config_flush_at_shutdown = 1;
1771   }
1772
1773 } /* }}} static void journal_rotate */
1774
1775 static void journal_done(void) /* {{{ */
1776 {
1777   if (journal_cur == NULL)
1778     return;
1779
1780   pthread_mutex_lock(&journal_lock);
1781   if (journal_fh != NULL)
1782   {
1783     fclose(journal_fh);
1784     journal_fh = NULL;
1785   }
1786
1787   if (config_flush_at_shutdown)
1788   {
1789     RRDD_LOG(LOG_INFO, "removing journals");
1790     unlink(journal_old);
1791     unlink(journal_cur);
1792   }
1793   else
1794   {
1795     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1796              "journals will be used at next startup");
1797   }
1798
1799   pthread_mutex_unlock(&journal_lock);
1800
1801 } /* }}} static void journal_done */
1802
1803 static int journal_write(char *cmd, char *args) /* {{{ */
1804 {
1805   int chars;
1806
1807   if (journal_fh == NULL)
1808     return 0;
1809
1810   pthread_mutex_lock(&journal_lock);
1811   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1812   pthread_mutex_unlock(&journal_lock);
1813
1814   if (chars > 0)
1815   {
1816     pthread_mutex_lock(&stats_lock);
1817     stats_journal_bytes += chars;
1818     pthread_mutex_unlock(&stats_lock);
1819   }
1820
1821   return chars;
1822 } /* }}} static int journal_write */
1823
1824 static int journal_replay (const char *file) /* {{{ */
1825 {
1826   FILE *fh;
1827   int entry_cnt = 0;
1828   int fail_cnt = 0;
1829   uint64_t line = 0;
1830   char entry[CMD_MAX];
1831   time_t now;
1832
1833   if (file == NULL) return 0;
1834
1835   {
1836     char *reason = "unknown error";
1837     int status = 0;
1838     struct stat statbuf;
1839
1840     memset(&statbuf, 0, sizeof(statbuf));
1841     if (stat(file, &statbuf) != 0)
1842     {
1843       if (errno == ENOENT)
1844         return 0;
1845
1846       reason = "stat error";
1847       status = errno;
1848     }
1849     else if (!S_ISREG(statbuf.st_mode))
1850     {
1851       reason = "not a regular file";
1852       status = EPERM;
1853     }
1854     if (statbuf.st_uid != daemon_uid)
1855     {
1856       reason = "not owned by daemon user";
1857       status = EACCES;
1858     }
1859     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1860     {
1861       reason = "must not be user/group writable";
1862       status = EACCES;
1863     }
1864
1865     if (status != 0)
1866     {
1867       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1868                file, rrd_strerror(status), reason);
1869       return 0;
1870     }
1871   }
1872
1873   fh = fopen(file, "r");
1874   if (fh == NULL)
1875   {
1876     if (errno != ENOENT)
1877       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1878                file, rrd_strerror(errno));
1879     return 0;
1880   }
1881   else
1882     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1883
1884   now = time(NULL);
1885
1886   while(!feof(fh))
1887   {
1888     size_t entry_len;
1889
1890     ++line;
1891     if (fgets(entry, sizeof(entry), fh) == NULL)
1892       break;
1893     entry_len = strlen(entry);
1894
1895     /* check \n termination in case journal writing crashed mid-line */
1896     if (entry_len == 0)
1897       continue;
1898     else if (entry[entry_len - 1] != '\n')
1899     {
1900       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1901       ++fail_cnt;
1902       continue;
1903     }
1904
1905     entry[entry_len - 1] = '\0';
1906
1907     if (handle_request(NULL, now, entry, entry_len) == 0)
1908       ++entry_cnt;
1909     else
1910       ++fail_cnt;
1911   }
1912
1913   fclose(fh);
1914
1915   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1916            entry_cnt, fail_cnt);
1917
1918   return entry_cnt > 0 ? 1 : 0;
1919 } /* }}} static int journal_replay */
1920
1921 static void journal_init(void) /* {{{ */
1922 {
1923   int had_journal = 0;
1924
1925   if (journal_cur == NULL) return;
1926
1927   pthread_mutex_lock(&journal_lock);
1928
1929   RRDD_LOG(LOG_INFO, "checking for journal files");
1930
1931   had_journal += journal_replay(journal_old);
1932   had_journal += journal_replay(journal_cur);
1933
1934   /* it must have been a crash.  start a flush */
1935   if (had_journal && config_flush_at_shutdown)
1936     flush_old_values(-1);
1937
1938   pthread_mutex_unlock(&journal_lock);
1939   journal_rotate();
1940
1941   RRDD_LOG(LOG_INFO, "journal processing complete");
1942
1943 } /* }}} static void journal_init */
1944
1945 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
1946 {
1947   assert(sock != NULL);
1948
1949   free(sock->rbuf);  sock->rbuf = NULL;
1950   free(sock->wbuf);  sock->wbuf = NULL;
1951   free(sock);
1952 } /* }}} void free_listen_socket */
1953
1954 static void close_connection(listen_socket_t *sock) /* {{{ */
1955 {
1956   if (sock->fd >= 0)
1957   {
1958     close(sock->fd);
1959     sock->fd = -1;
1960   }
1961
1962   free_listen_socket(sock);
1963
1964 } /* }}} void close_connection */
1965
1966 static void *connection_thread_main (void *args) /* {{{ */
1967 {
1968   listen_socket_t *sock;
1969   int fd;
1970
1971   sock = (listen_socket_t *) args;
1972   fd = sock->fd;
1973
1974   /* init read buffers */
1975   sock->next_read = sock->next_cmd = 0;
1976   sock->rbuf = malloc(RBUF_SIZE);
1977   if (sock->rbuf == NULL)
1978   {
1979     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1980     close_connection(sock);
1981     return NULL;
1982   }
1983
1984   pthread_mutex_lock (&connection_threads_lock);
1985   connection_threads_num++;
1986   pthread_mutex_unlock (&connection_threads_lock);
1987
1988   while (do_shutdown == 0)
1989   {
1990     char *cmd;
1991     ssize_t cmd_len;
1992     ssize_t rbytes;
1993     time_t now;
1994
1995     struct pollfd pollfd;
1996     int status;
1997
1998     pollfd.fd = fd;
1999     pollfd.events = POLLIN | POLLPRI;
2000     pollfd.revents = 0;
2001
2002     status = poll (&pollfd, 1, /* timeout = */ 500);
2003     if (do_shutdown)
2004       break;
2005     else if (status == 0) /* timeout */
2006       continue;
2007     else if (status < 0) /* error */
2008     {
2009       status = errno;
2010       if (status != EINTR)
2011         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2012       continue;
2013     }
2014
2015     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2016       break;
2017     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2018     {
2019       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2020           "poll(2) returned something unexpected: %#04hx",
2021           pollfd.revents);
2022       break;
2023     }
2024
2025     rbytes = read(fd, sock->rbuf + sock->next_read,
2026                   RBUF_SIZE - sock->next_read);
2027     if (rbytes < 0)
2028     {
2029       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2030       break;
2031     }
2032     else if (rbytes == 0)
2033       break; /* eof */
2034
2035     sock->next_read += rbytes;
2036
2037     if (sock->batch_start)
2038       now = sock->batch_start;
2039     else
2040       now = time(NULL);
2041
2042     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2043     {
2044       status = handle_request (sock, now, cmd, cmd_len+1);
2045       if (status != 0)
2046         goto out_close;
2047     }
2048   }
2049
2050 out_close:
2051   close_connection(sock);
2052
2053   /* Remove this thread from the connection threads list */
2054   pthread_mutex_lock (&connection_threads_lock);
2055   connection_threads_num--;
2056   if (connection_threads_num <= 0)
2057     pthread_cond_broadcast(&connection_threads_done);
2058   pthread_mutex_unlock (&connection_threads_lock);
2059
2060   return (NULL);
2061 } /* }}} void *connection_thread_main */
2062
2063 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2064 {
2065   int fd;
2066   struct sockaddr_un sa;
2067   listen_socket_t *temp;
2068   int status;
2069   const char *path;
2070
2071   path = sock->addr;
2072   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2073     path += strlen("unix:");
2074
2075   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2076       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2077   if (temp == NULL)
2078   {
2079     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2080     return (-1);
2081   }
2082   listen_fds = temp;
2083   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2084
2085   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2086   if (fd < 0)
2087   {
2088     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2089              rrd_strerror(errno));
2090     return (-1);
2091   }
2092
2093   memset (&sa, 0, sizeof (sa));
2094   sa.sun_family = AF_UNIX;
2095   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2096
2097   /* if we've gotten this far, we own the pid file.  any daemon started
2098    * with the same args must not be alive.  therefore, ensure that we can
2099    * create the socket...
2100    */
2101   unlink(path);
2102
2103   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2104   if (status != 0)
2105   {
2106     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2107              path, rrd_strerror(errno));
2108     close (fd);
2109     return (-1);
2110   }
2111
2112   status = listen (fd, /* backlog = */ 10);
2113   if (status != 0)
2114   {
2115     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2116              path, rrd_strerror(errno));
2117     close (fd);
2118     unlink (path);
2119     return (-1);
2120   }
2121
2122   listen_fds[listen_fds_num].fd = fd;
2123   listen_fds[listen_fds_num].family = PF_UNIX;
2124   strncpy(listen_fds[listen_fds_num].addr, path,
2125           sizeof (listen_fds[listen_fds_num].addr) - 1);
2126   listen_fds_num++;
2127
2128   return (0);
2129 } /* }}} int open_listen_socket_unix */
2130
2131 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2132 {
2133   struct addrinfo ai_hints;
2134   struct addrinfo *ai_res;
2135   struct addrinfo *ai_ptr;
2136   char addr_copy[NI_MAXHOST];
2137   char *addr;
2138   char *port;
2139   int status;
2140
2141   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2142   addr_copy[sizeof (addr_copy) - 1] = 0;
2143   addr = addr_copy;
2144
2145   memset (&ai_hints, 0, sizeof (ai_hints));
2146   ai_hints.ai_flags = 0;
2147 #ifdef AI_ADDRCONFIG
2148   ai_hints.ai_flags |= AI_ADDRCONFIG;
2149 #endif
2150   ai_hints.ai_family = AF_UNSPEC;
2151   ai_hints.ai_socktype = SOCK_STREAM;
2152
2153   port = NULL;
2154   if (*addr == '[') /* IPv6+port format */
2155   {
2156     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2157     addr++;
2158
2159     port = strchr (addr, ']');
2160     if (port == NULL)
2161     {
2162       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2163       return (-1);
2164     }
2165     *port = 0;
2166     port++;
2167
2168     if (*port == ':')
2169       port++;
2170     else if (*port == 0)
2171       port = NULL;
2172     else
2173     {
2174       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2175       return (-1);
2176     }
2177   } /* if (*addr = ']') */
2178   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2179   {
2180     port = rindex(addr, ':');
2181     if (port != NULL)
2182     {
2183       *port = 0;
2184       port++;
2185     }
2186   }
2187   ai_res = NULL;
2188   status = getaddrinfo (addr,
2189                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2190                         &ai_hints, &ai_res);
2191   if (status != 0)
2192   {
2193     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2194              addr, gai_strerror (status));
2195     return (-1);
2196   }
2197
2198   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2199   {
2200     int fd;
2201     listen_socket_t *temp;
2202     int one = 1;
2203
2204     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2205         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2206     if (temp == NULL)
2207     {
2208       fprintf (stderr,
2209                "rrdcached: open_listen_socket_network: realloc failed.\n");
2210       continue;
2211     }
2212     listen_fds = temp;
2213     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2214
2215     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2216     if (fd < 0)
2217     {
2218       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2219                rrd_strerror(errno));
2220       continue;
2221     }
2222
2223     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2224
2225     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2226     if (status != 0)
2227     {
2228       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2229                sock->addr, rrd_strerror(errno));
2230       close (fd);
2231       continue;
2232     }
2233
2234     status = listen (fd, /* backlog = */ 10);
2235     if (status != 0)
2236     {
2237       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2238                sock->addr, rrd_strerror(errno));
2239       close (fd);
2240       freeaddrinfo(ai_res);
2241       return (-1);
2242     }
2243
2244     listen_fds[listen_fds_num].fd = fd;
2245     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2246     listen_fds_num++;
2247   } /* for (ai_ptr) */
2248
2249   freeaddrinfo(ai_res);
2250   return (0);
2251 } /* }}} static int open_listen_socket_network */
2252
2253 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2254 {
2255   assert(sock != NULL);
2256   assert(sock->addr != NULL);
2257
2258   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2259       || sock->addr[0] == '/')
2260     return (open_listen_socket_unix(sock));
2261   else
2262     return (open_listen_socket_network(sock));
2263 } /* }}} int open_listen_socket */
2264
2265 static int close_listen_sockets (void) /* {{{ */
2266 {
2267   size_t i;
2268
2269   for (i = 0; i < listen_fds_num; i++)
2270   {
2271     close (listen_fds[i].fd);
2272
2273     if (listen_fds[i].family == PF_UNIX)
2274       unlink(listen_fds[i].addr);
2275   }
2276
2277   free (listen_fds);
2278   listen_fds = NULL;
2279   listen_fds_num = 0;
2280
2281   return (0);
2282 } /* }}} int close_listen_sockets */
2283
2284 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2285 {
2286   struct pollfd *pollfds;
2287   int pollfds_num;
2288   int status;
2289   int i;
2290
2291   if (listen_fds_num < 1)
2292   {
2293     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2294     return (NULL);
2295   }
2296
2297   pollfds_num = listen_fds_num;
2298   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2299   if (pollfds == NULL)
2300   {
2301     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2302     return (NULL);
2303   }
2304   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2305
2306   RRDD_LOG(LOG_INFO, "listening for connections");
2307
2308   while (do_shutdown == 0)
2309   {
2310     for (i = 0; i < pollfds_num; i++)
2311     {
2312       pollfds[i].fd = listen_fds[i].fd;
2313       pollfds[i].events = POLLIN | POLLPRI;
2314       pollfds[i].revents = 0;
2315     }
2316
2317     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2318     if (do_shutdown)
2319       break;
2320     else if (status == 0) /* timeout */
2321       continue;
2322     else if (status < 0) /* error */
2323     {
2324       status = errno;
2325       if (status != EINTR)
2326       {
2327         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2328       }
2329       continue;
2330     }
2331
2332     for (i = 0; i < pollfds_num; i++)
2333     {
2334       listen_socket_t *client_sock;
2335       struct sockaddr_storage client_sa;
2336       socklen_t client_sa_size;
2337       pthread_t tid;
2338       pthread_attr_t attr;
2339
2340       if (pollfds[i].revents == 0)
2341         continue;
2342
2343       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2344       {
2345         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2346             "poll(2) returned something unexpected for listen FD #%i.",
2347             pollfds[i].fd);
2348         continue;
2349       }
2350
2351       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2352       if (client_sock == NULL)
2353       {
2354         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2355         continue;
2356       }
2357       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2358
2359       client_sa_size = sizeof (client_sa);
2360       client_sock->fd = accept (pollfds[i].fd,
2361           (struct sockaddr *) &client_sa, &client_sa_size);
2362       if (client_sock->fd < 0)
2363       {
2364         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2365         free(client_sock);
2366         continue;
2367       }
2368
2369       pthread_attr_init (&attr);
2370       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2371
2372       status = pthread_create (&tid, &attr, connection_thread_main,
2373                                client_sock);
2374       if (status != 0)
2375       {
2376         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2377         close_connection(client_sock);
2378         continue;
2379       }
2380     } /* for (pollfds_num) */
2381   } /* while (do_shutdown == 0) */
2382
2383   RRDD_LOG(LOG_INFO, "starting shutdown");
2384
2385   close_listen_sockets ();
2386
2387   pthread_mutex_lock (&connection_threads_lock);
2388   while (connection_threads_num > 0)
2389     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2390   pthread_mutex_unlock (&connection_threads_lock);
2391
2392   free(pollfds);
2393
2394   return (NULL);
2395 } /* }}} void *listen_thread_main */
2396
2397 static int daemonize (void) /* {{{ */
2398 {
2399   int pid_fd;
2400   char *base_dir;
2401
2402   daemon_uid = geteuid();
2403
2404   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2405   if (pid_fd < 0)
2406     pid_fd = check_pidfile();
2407   if (pid_fd < 0)
2408     return pid_fd;
2409
2410   /* open all the listen sockets */
2411   if (config_listen_address_list_len > 0)
2412   {
2413     for (size_t i = 0; i < config_listen_address_list_len; i++)
2414       open_listen_socket (config_listen_address_list[i]);
2415
2416     rrd_free_ptrs((void ***) &config_listen_address_list,
2417                   &config_listen_address_list_len);
2418   }
2419   else
2420   {
2421     listen_socket_t sock;
2422     memset(&sock, 0, sizeof(sock));
2423     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2424     open_listen_socket (&sock);
2425   }
2426
2427   if (listen_fds_num < 1)
2428   {
2429     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2430     goto error;
2431   }
2432
2433   if (!stay_foreground)
2434   {
2435     pid_t child;
2436
2437     child = fork ();
2438     if (child < 0)
2439     {
2440       fprintf (stderr, "daemonize: fork(2) failed.\n");
2441       goto error;
2442     }
2443     else if (child > 0)
2444       exit(0);
2445
2446     /* Become session leader */
2447     setsid ();
2448
2449     /* Open the first three file descriptors to /dev/null */
2450     close (2);
2451     close (1);
2452     close (0);
2453
2454     open ("/dev/null", O_RDWR);
2455     if (dup(0) == -1 || dup(0) == -1){
2456         RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2457     }
2458   } /* if (!stay_foreground) */
2459
2460   /* Change into the /tmp directory. */
2461   base_dir = (config_base_dir != NULL)
2462     ? config_base_dir
2463     : "/tmp";
2464
2465   if (chdir (base_dir) != 0)
2466   {
2467     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2468     goto error;
2469   }
2470
2471   install_signal_handlers();
2472
2473   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2474   RRDD_LOG(LOG_INFO, "starting up");
2475
2476   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2477                                 (GDestroyNotify) free_cache_item);
2478   if (cache_tree == NULL)
2479   {
2480     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2481     goto error;
2482   }
2483
2484   return write_pidfile (pid_fd);
2485
2486 error:
2487   remove_pidfile();
2488   return -1;
2489 } /* }}} int daemonize */
2490
2491 static int cleanup (void) /* {{{ */
2492 {
2493   do_shutdown++;
2494
2495   pthread_cond_broadcast (&flush_cond);
2496   pthread_join (flush_thread, NULL);
2497
2498   pthread_cond_broadcast (&queue_cond);
2499   for (int i = 0; i < config_queue_threads; i++)
2500     pthread_join (queue_threads[i], NULL);
2501
2502   if (config_flush_at_shutdown)
2503   {
2504     assert(cache_queue_head == NULL);
2505     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2506   }
2507
2508   journal_done();
2509   remove_pidfile ();
2510
2511   free(queue_threads);
2512   free(config_base_dir);
2513   free(config_pid_file);
2514   free(journal_cur);
2515   free(journal_old);
2516
2517   pthread_mutex_lock(&cache_lock);
2518   g_tree_destroy(cache_tree);
2519
2520   RRDD_LOG(LOG_INFO, "goodbye");
2521   closelog ();
2522
2523   return (0);
2524 } /* }}} int cleanup */
2525
2526 static int read_options (int argc, char **argv) /* {{{ */
2527 {
2528   int option;
2529   int status = 0;
2530
2531   while ((option = getopt(argc, argv, "gl:L:f:w:z:t:Bb:p:Fj:h?")) != -1)
2532   {
2533     switch (option)
2534     {
2535       case 'g':
2536         stay_foreground=1;
2537         break;
2538
2539       case 'L':
2540       case 'l':
2541       {
2542         listen_socket_t *new;
2543
2544         new = malloc(sizeof(listen_socket_t));
2545         if (new == NULL)
2546         {
2547           fprintf(stderr, "read_options: malloc failed.\n");
2548           return(2);
2549         }
2550         memset(new, 0, sizeof(listen_socket_t));
2551
2552         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2553         new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2554
2555         if (!rrd_add_ptr((void ***)&config_listen_address_list,
2556                          &config_listen_address_list_len, new))
2557         {
2558           fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2559           return (2);
2560         }
2561       }
2562       break;
2563
2564       case 'f':
2565       {
2566         int temp;
2567
2568         temp = atoi (optarg);
2569         if (temp > 0)
2570           config_flush_interval = temp;
2571         else
2572         {
2573           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2574           status = 3;
2575         }
2576       }
2577       break;
2578
2579       case 'w':
2580       {
2581         int temp;
2582
2583         temp = atoi (optarg);
2584         if (temp > 0)
2585           config_write_interval = temp;
2586         else
2587         {
2588           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2589           status = 2;
2590         }
2591       }
2592       break;
2593
2594       case 'z':
2595       {
2596         int temp;
2597
2598         temp = atoi(optarg);
2599         if (temp > 0)
2600           config_write_jitter = temp;
2601         else
2602         {
2603           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2604           status = 2;
2605         }
2606
2607         break;
2608       }
2609
2610       case 't':
2611       {
2612         int threads;
2613         threads = atoi(optarg);
2614         if (threads >= 1)
2615           config_queue_threads = threads;
2616         else
2617         {
2618           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2619           return 1;
2620         }
2621       }
2622       break;
2623
2624       case 'B':
2625         config_write_base_only = 1;
2626         break;
2627
2628       case 'b':
2629       {
2630         size_t len;
2631         char base_realpath[PATH_MAX];
2632
2633         if (config_base_dir != NULL)
2634           free (config_base_dir);
2635         config_base_dir = strdup (optarg);
2636         if (config_base_dir == NULL)
2637         {
2638           fprintf (stderr, "read_options: strdup failed.\n");
2639           return (3);
2640         }
2641
2642         /* make sure that the base directory is not resolved via
2643          * symbolic links.  this makes some performance-enhancing
2644          * assumptions possible (we don't have to resolve paths
2645          * that start with a "/")
2646          */
2647         if (realpath(config_base_dir, base_realpath) == NULL)
2648         {
2649           fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2650           return 5;
2651         }
2652         else if (strncmp(config_base_dir,
2653                          base_realpath, sizeof(base_realpath)) != 0)
2654         {
2655           fprintf(stderr,
2656                   "Base directory (-b) resolved via file system links!\n"
2657                   "Please consult rrdcached '-b' documentation!\n"
2658                   "Consider specifying the real directory (%s)\n",
2659                   base_realpath);
2660           return 5;
2661         }
2662
2663         len = strlen (config_base_dir);
2664         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2665         {
2666           config_base_dir[len - 1] = 0;
2667           len--;
2668         }
2669
2670         if (len < 1)
2671         {
2672           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2673           return (4);
2674         }
2675
2676         _config_base_dir_len = len;
2677       }
2678       break;
2679
2680       case 'p':
2681       {
2682         if (config_pid_file != NULL)
2683           free (config_pid_file);
2684         config_pid_file = strdup (optarg);
2685         if (config_pid_file == NULL)
2686         {
2687           fprintf (stderr, "read_options: strdup failed.\n");
2688           return (3);
2689         }
2690       }
2691       break;
2692
2693       case 'F':
2694         config_flush_at_shutdown = 1;
2695         break;
2696
2697       case 'j':
2698       {
2699         struct stat statbuf;
2700         const char *dir = optarg;
2701
2702         status = stat(dir, &statbuf);
2703         if (status != 0)
2704         {
2705           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2706           return 6;
2707         }
2708
2709         if (!S_ISDIR(statbuf.st_mode)
2710             || access(dir, R_OK|W_OK|X_OK) != 0)
2711         {
2712           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2713                   errno ? rrd_strerror(errno) : "");
2714           return 6;
2715         }
2716
2717         journal_cur = malloc(PATH_MAX + 1);
2718         journal_old = malloc(PATH_MAX + 1);
2719         if (journal_cur == NULL || journal_old == NULL)
2720         {
2721           fprintf(stderr, "malloc failure for journal files\n");
2722           return 6;
2723         }
2724         else 
2725         {
2726           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2727           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2728         }
2729       }
2730       break;
2731
2732       case 'h':
2733       case '?':
2734         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2735             "\n"
2736             "Usage: rrdcached [options]\n"
2737             "\n"
2738             "Valid options are:\n"
2739             "  -l <address>  Socket address to listen to.\n"
2740             "  -L <address>  Socket address to listen to ('FLUSH' only).\n"
2741             "  -w <seconds>  Interval in which to write data.\n"
2742             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2743             "  -t <threads>  Number of write threads.\n"
2744             "  -f <seconds>  Interval in which to flush dead data.\n"
2745             "  -p <file>     Location of the PID-file.\n"
2746             "  -b <dir>      Base directory to change to.\n"
2747             "  -B            Restrict file access to paths within -b <dir>\n"
2748             "  -g            Do not fork and run in the foreground.\n"
2749             "  -j <dir>      Directory in which to create the journal files.\n"
2750             "  -F            Always flush all updates at shutdown\n"
2751             "\n"
2752             "For more information and a detailed description of all options "
2753             "please refer\n"
2754             "to the rrdcached(1) manual page.\n",
2755             VERSION);
2756         status = -1;
2757         break;
2758     } /* switch (option) */
2759   } /* while (getopt) */
2760
2761   /* advise the user when values are not sane */
2762   if (config_flush_interval < 2 * config_write_interval)
2763     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2764             " 2x write interval (-w) !\n");
2765   if (config_write_jitter > config_write_interval)
2766     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2767             " write interval (-w) !\n");
2768
2769   if (config_write_base_only && config_base_dir == NULL)
2770     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2771             "  Consult the rrdcached documentation\n");
2772
2773   if (journal_cur == NULL)
2774     config_flush_at_shutdown = 1;
2775
2776   return (status);
2777 } /* }}} int read_options */
2778
2779 int main (int argc, char **argv)
2780 {
2781   int status;
2782
2783   status = read_options (argc, argv);
2784   if (status != 0)
2785   {
2786     if (status < 0)
2787       status = 0;
2788     return (status);
2789   }
2790
2791   status = daemonize ();
2792   if (status != 0)
2793   {
2794     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
2795     return (1);
2796   }
2797
2798   journal_init();
2799
2800   /* start the queue threads */
2801   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
2802   if (queue_threads == NULL)
2803   {
2804     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
2805     cleanup();
2806     return (1);
2807   }
2808   for (int i = 0; i < config_queue_threads; i++)
2809   {
2810     memset (&queue_threads[i], 0, sizeof (*queue_threads));
2811     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
2812     if (status != 0)
2813     {
2814       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2815       cleanup();
2816       return (1);
2817     }
2818   }
2819
2820   /* start the flush thread */
2821   memset(&flush_thread, 0, sizeof(flush_thread));
2822   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
2823   if (status != 0)
2824   {
2825     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
2826     cleanup();
2827     return (1);
2828   }
2829
2830   listen_thread_main (NULL);
2831   cleanup ();
2832
2833   return (0);
2834 } /* int main */
2835
2836 /*
2837  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2838  */