Two-phase shutdown for rrdcached ensures that values are flushed.
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
73
74 #include "rrd.h"
75 #include "rrd_client.h"
76
77 #include <stdlib.h>
78
79 #ifndef WIN32
80 #include <stdint.h>
81 #include <unistd.h>
82 #include <strings.h>
83 #include <inttypes.h>
84 #       include <sys/socket.h>
85
86 #else
87
88 #endif
89 #include <stdio.h>
90 #include <string.h>
91
92 #include <sys/types.h>
93 #include <sys/stat.h>
94 #include <fcntl.h>
95 #include <signal.h>
96 #include <sys/un.h>
97 #include <netdb.h>
98 #include <poll.h>
99 #include <syslog.h>
100 #include <pthread.h>
101 #include <errno.h>
102 #include <assert.h>
103 #include <sys/time.h>
104 #include <time.h>
105
106 #include <glib-2.0/glib.h>
107 /* }}} */
108
109 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
110
111 #ifndef __GNUC__
112 # define __attribute__(x) /**/
113 #endif
114
115 /*
116  * Types
117  */
118 typedef enum
119 {
120   PRIV_LOW,
121   PRIV_HIGH
122 } socket_privilege;
123
124 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
125
126 struct listen_socket_s
127 {
128   int fd;
129   char addr[PATH_MAX + 1];
130   int family;
131   socket_privilege privilege;
132
133   /* state for BATCH processing */
134   time_t batch_start;
135   int batch_cmd;
136
137   /* buffered IO */
138   char *rbuf;
139   off_t next_cmd;
140   off_t next_read;
141
142   char *wbuf;
143   ssize_t wbuf_len;
144 };
145 typedef struct listen_socket_s listen_socket_t;
146
147 struct command;
148 /* note: guard against "unused" warnings in the handlers */
149 #define DISPATCH_PROTO  listen_socket_t *sock   __attribute__((unused)),\
150                         time_t now              __attribute__((unused)),\
151                         char  *buffer           __attribute__((unused)),\
152                         size_t buffer_size      __attribute__((unused))
153
154 #define HANDLER_PROTO   struct command *cmd     __attribute__((unused)),\
155                         DISPATCH_PROTO
156
157 struct command {
158   char   *cmd;
159   int (*handler)(HANDLER_PROTO);
160   socket_privilege min_priv;
161
162   char  context;                /* where we expect to see it */
163 #define CMD_CONTEXT_CLIENT      (1<<0)
164 #define CMD_CONTEXT_BATCH       (1<<1)
165 #define CMD_CONTEXT_JOURNAL     (1<<2)
166 #define CMD_CONTEXT_ANY         (0x7f)
167
168   char *syntax;
169   char *help;
170 };
171
172 struct cache_item_s;
173 typedef struct cache_item_s cache_item_t;
174 struct cache_item_s
175 {
176   char *file;
177   char **values;
178   size_t values_num;
179   time_t last_flush_time;
180   time_t last_update_stamp;
181 #define CI_FLAGS_IN_TREE  (1<<0)
182 #define CI_FLAGS_IN_QUEUE (1<<1)
183   int flags;
184   pthread_cond_t  flushed;
185   cache_item_t *prev;
186   cache_item_t *next;
187 };
188
189 struct callback_flush_data_s
190 {
191   time_t now;
192   time_t abs_timeout;
193   char **keys;
194   size_t keys_num;
195 };
196 typedef struct callback_flush_data_s callback_flush_data_t;
197
198 enum queue_side_e
199 {
200   HEAD,
201   TAIL
202 };
203 typedef enum queue_side_e queue_side_t;
204
205 /* max length of socket command or response */
206 #define CMD_MAX 4096
207 #define RBUF_SIZE (CMD_MAX*2)
208
209 /*
210  * Variables
211  */
212 static int stay_foreground = 0;
213 static uid_t daemon_uid;
214
215 static listen_socket_t *listen_fds = NULL;
216 static size_t listen_fds_num = 0;
217
218 enum {
219   RUNNING,              /* normal operation */
220   FLUSHING,             /* flushing remaining values */
221   SHUTDOWN              /* shutting down */
222 } state = RUNNING;
223
224 static pthread_t *queue_threads;
225 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
226 static int config_queue_threads = 4;
227
228 static pthread_t flush_thread;
229 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
230
231 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
232 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
233 static int connection_threads_num = 0;
234
235 /* Cache stuff */
236 static GTree          *cache_tree = NULL;
237 static cache_item_t   *cache_queue_head = NULL;
238 static cache_item_t   *cache_queue_tail = NULL;
239 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
240
241 static int config_write_interval = 300;
242 static int config_write_jitter   = 0;
243 static int config_flush_interval = 3600;
244 static int config_flush_at_shutdown = 0;
245 static char *config_pid_file = NULL;
246 static char *config_base_dir = NULL;
247 static size_t _config_base_dir_len = 0;
248 static int config_write_base_only = 0;
249
250 static listen_socket_t **config_listen_address_list = NULL;
251 static size_t config_listen_address_list_len = 0;
252
253 static uint64_t stats_queue_length = 0;
254 static uint64_t stats_updates_received = 0;
255 static uint64_t stats_flush_received = 0;
256 static uint64_t stats_updates_written = 0;
257 static uint64_t stats_data_sets_written = 0;
258 static uint64_t stats_journal_bytes = 0;
259 static uint64_t stats_journal_rotate = 0;
260 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
261
262 /* Journaled updates */
263 static char *journal_cur = NULL;
264 static char *journal_old = NULL;
265 static FILE *journal_fh = NULL;
266 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
267 static int journal_write(char *cmd, char *args);
268 static void journal_done(void);
269 static void journal_rotate(void);
270
271 /* prototypes for forward refernces */
272 static int handle_request_help (HANDLER_PROTO);
273
274 /* 
275  * Functions
276  */
277 static void sig_common (const char *sig) /* {{{ */
278 {
279   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
280   state = FLUSHING;
281   pthread_cond_broadcast(&flush_cond);
282   pthread_cond_broadcast(&queue_cond);
283 } /* }}} void sig_common */
284
285 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
286 {
287   sig_common("INT");
288 } /* }}} void sig_int_handler */
289
290 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
291 {
292   sig_common("TERM");
293 } /* }}} void sig_term_handler */
294
295 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
296 {
297   config_flush_at_shutdown = 1;
298   sig_common("USR1");
299 } /* }}} void sig_usr1_handler */
300
301 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
302 {
303   config_flush_at_shutdown = 0;
304   sig_common("USR2");
305 } /* }}} void sig_usr2_handler */
306
307 static void install_signal_handlers(void) /* {{{ */
308 {
309   /* These structures are static, because `sigaction' behaves weird if the are
310    * overwritten.. */
311   static struct sigaction sa_int;
312   static struct sigaction sa_term;
313   static struct sigaction sa_pipe;
314   static struct sigaction sa_usr1;
315   static struct sigaction sa_usr2;
316
317   /* Install signal handlers */
318   memset (&sa_int, 0, sizeof (sa_int));
319   sa_int.sa_handler = sig_int_handler;
320   sigaction (SIGINT, &sa_int, NULL);
321
322   memset (&sa_term, 0, sizeof (sa_term));
323   sa_term.sa_handler = sig_term_handler;
324   sigaction (SIGTERM, &sa_term, NULL);
325
326   memset (&sa_pipe, 0, sizeof (sa_pipe));
327   sa_pipe.sa_handler = SIG_IGN;
328   sigaction (SIGPIPE, &sa_pipe, NULL);
329
330   memset (&sa_pipe, 0, sizeof (sa_usr1));
331   sa_usr1.sa_handler = sig_usr1_handler;
332   sigaction (SIGUSR1, &sa_usr1, NULL);
333
334   memset (&sa_usr2, 0, sizeof (sa_usr2));
335   sa_usr2.sa_handler = sig_usr2_handler;
336   sigaction (SIGUSR2, &sa_usr2, NULL);
337
338 } /* }}} void install_signal_handlers */
339
340 static int open_pidfile(char *action, int oflag) /* {{{ */
341 {
342   int fd;
343   char *file;
344
345   file = (config_pid_file != NULL)
346     ? config_pid_file
347     : LOCALSTATEDIR "/run/rrdcached.pid";
348
349   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
350   if (fd < 0)
351     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
352             action, file, rrd_strerror(errno));
353
354   return(fd);
355 } /* }}} static int open_pidfile */
356
357 /* check existing pid file to see whether a daemon is running */
358 static int check_pidfile(void)
359 {
360   int pid_fd;
361   pid_t pid;
362   char pid_str[16];
363
364   pid_fd = open_pidfile("open", O_RDWR);
365   if (pid_fd < 0)
366     return pid_fd;
367
368   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
369     return -1;
370
371   pid = atoi(pid_str);
372   if (pid <= 0)
373     return -1;
374
375   /* another running process that we can signal COULD be
376    * a competing rrdcached */
377   if (pid != getpid() && kill(pid, 0) == 0)
378   {
379     fprintf(stderr,
380             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
381     close(pid_fd);
382     return -1;
383   }
384
385   lseek(pid_fd, 0, SEEK_SET);
386   if (ftruncate(pid_fd, 0) == -1)
387   {
388     fprintf(stderr,
389             "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
390     close(pid_fd);
391     return -1;
392   }
393
394   fprintf(stderr,
395           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
396           "rrdcached: starting normally.\n", pid);
397
398   return pid_fd;
399 } /* }}} static int check_pidfile */
400
401 static int write_pidfile (int fd) /* {{{ */
402 {
403   pid_t pid;
404   FILE *fh;
405
406   pid = getpid ();
407
408   fh = fdopen (fd, "w");
409   if (fh == NULL)
410   {
411     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
412     close(fd);
413     return (-1);
414   }
415
416   fprintf (fh, "%i\n", (int) pid);
417   fclose (fh);
418
419   return (0);
420 } /* }}} int write_pidfile */
421
422 static int remove_pidfile (void) /* {{{ */
423 {
424   char *file;
425   int status;
426
427   file = (config_pid_file != NULL)
428     ? config_pid_file
429     : LOCALSTATEDIR "/run/rrdcached.pid";
430
431   status = unlink (file);
432   if (status == 0)
433     return (0);
434   return (errno);
435 } /* }}} int remove_pidfile */
436
437 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
438 {
439   char *eol;
440
441   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
442                sock->next_read - sock->next_cmd);
443
444   if (eol == NULL)
445   {
446     /* no commands left, move remainder back to front of rbuf */
447     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
448             sock->next_read - sock->next_cmd);
449     sock->next_read -= sock->next_cmd;
450     sock->next_cmd = 0;
451     *len = 0;
452     return NULL;
453   }
454   else
455   {
456     char *cmd = sock->rbuf + sock->next_cmd;
457     *eol = '\0';
458
459     sock->next_cmd = eol - sock->rbuf + 1;
460
461     if (eol > sock->rbuf && *(eol-1) == '\r')
462       *(--eol) = '\0'; /* handle "\r\n" EOL */
463
464     *len = eol - cmd;
465
466     return cmd;
467   }
468
469   /* NOTREACHED */
470   assert(1==0);
471 }
472
473 /* add the characters directly to the write buffer */
474 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
475 {
476   char *new_buf;
477
478   assert(sock != NULL);
479
480   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
481   if (new_buf == NULL)
482   {
483     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
484     return -1;
485   }
486
487   strncpy(new_buf + sock->wbuf_len, str, len + 1);
488
489   sock->wbuf = new_buf;
490   sock->wbuf_len += len;
491
492   return 0;
493 } /* }}} static int add_to_wbuf */
494
495 /* add the text to the "extra" info that's sent after the status line */
496 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
497 {
498   va_list argp;
499   char buffer[CMD_MAX];
500   int len;
501
502   if (sock == NULL) return 0; /* journal replay mode */
503   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
504
505   va_start(argp, fmt);
506 #ifdef HAVE_VSNPRINTF
507   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
508 #else
509   len = vsprintf(buffer, fmt, argp);
510 #endif
511   va_end(argp);
512   if (len < 0)
513   {
514     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
515     return -1;
516   }
517
518   return add_to_wbuf(sock, buffer, len);
519 } /* }}} static int add_response_info */
520
521 static int count_lines(char *str) /* {{{ */
522 {
523   int lines = 0;
524
525   if (str != NULL)
526   {
527     while ((str = strchr(str, '\n')) != NULL)
528     {
529       ++lines;
530       ++str;
531     }
532   }
533
534   return lines;
535 } /* }}} static int count_lines */
536
537 /* send the response back to the user.
538  * returns 0 on success, -1 on error
539  * write buffer is always zeroed after this call */
540 static int send_response (listen_socket_t *sock, response_code rc,
541                           char *fmt, ...) /* {{{ */
542 {
543   va_list argp;
544   char buffer[CMD_MAX];
545   int lines;
546   ssize_t wrote;
547   int rclen, len;
548
549   if (sock == NULL) return rc;  /* journal replay mode */
550
551   if (sock->batch_start)
552   {
553     if (rc == RESP_OK)
554       return rc; /* no response on success during BATCH */
555     lines = sock->batch_cmd;
556   }
557   else if (rc == RESP_OK)
558     lines = count_lines(sock->wbuf);
559   else
560     lines = -1;
561
562   rclen = sprintf(buffer, "%d ", lines);
563   va_start(argp, fmt);
564 #ifdef HAVE_VSNPRINTF
565   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
566 #else
567   len = vsprintf(buffer+rclen, fmt, argp);
568 #endif
569   va_end(argp);
570   if (len < 0)
571     return -1;
572
573   len += rclen;
574
575   /* append the result to the wbuf, don't write to the user */
576   if (sock->batch_start)
577     return add_to_wbuf(sock, buffer, len);
578
579   /* first write must be complete */
580   if (len != write(sock->fd, buffer, len))
581   {
582     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
583     return -1;
584   }
585
586   if (sock->wbuf != NULL && rc == RESP_OK)
587   {
588     wrote = 0;
589     while (wrote < sock->wbuf_len)
590     {
591       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
592       if (wb <= 0)
593       {
594         RRDD_LOG(LOG_INFO, "send_response: could not write results");
595         return -1;
596       }
597       wrote += wb;
598     }
599   }
600
601   free(sock->wbuf); sock->wbuf = NULL;
602   sock->wbuf_len = 0;
603
604   return 0;
605 } /* }}} */
606
607 static void wipe_ci_values(cache_item_t *ci, time_t when)
608 {
609   ci->values = NULL;
610   ci->values_num = 0;
611
612   ci->last_flush_time = when;
613   if (config_write_jitter > 0)
614     ci->last_flush_time += (rrd_random() % config_write_jitter);
615 }
616
617 /* remove_from_queue
618  * remove a "cache_item_t" item from the queue.
619  * must hold 'cache_lock' when calling this
620  */
621 static void remove_from_queue(cache_item_t *ci) /* {{{ */
622 {
623   if (ci == NULL) return;
624   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
625
626   if (ci->prev == NULL)
627     cache_queue_head = ci->next; /* reset head */
628   else
629     ci->prev->next = ci->next;
630
631   if (ci->next == NULL)
632     cache_queue_tail = ci->prev; /* reset the tail */
633   else
634     ci->next->prev = ci->prev;
635
636   ci->next = ci->prev = NULL;
637   ci->flags &= ~CI_FLAGS_IN_QUEUE;
638
639   pthread_mutex_lock (&stats_lock);
640   assert (stats_queue_length > 0);
641   stats_queue_length--;
642   pthread_mutex_unlock (&stats_lock);
643
644 } /* }}} static void remove_from_queue */
645
646 /* free the resources associated with the cache_item_t
647  * must hold cache_lock when calling this function
648  */
649 static void *free_cache_item(cache_item_t *ci) /* {{{ */
650 {
651   if (ci == NULL) return NULL;
652
653   remove_from_queue(ci);
654
655   for (size_t i=0; i < ci->values_num; i++)
656     free(ci->values[i]);
657
658   free (ci->values);
659   free (ci->file);
660
661   /* in case anyone is waiting */
662   pthread_cond_broadcast(&ci->flushed);
663   pthread_cond_destroy(&ci->flushed);
664
665   free (ci);
666
667   return NULL;
668 } /* }}} static void *free_cache_item */
669
670 /*
671  * enqueue_cache_item:
672  * `cache_lock' must be acquired before calling this function!
673  */
674 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
675     queue_side_t side)
676 {
677   if (ci == NULL)
678     return (-1);
679
680   if (ci->values_num == 0)
681     return (0);
682
683   if (side == HEAD)
684   {
685     if (cache_queue_head == ci)
686       return 0;
687
688     /* remove if further down in queue */
689     remove_from_queue(ci);
690
691     ci->prev = NULL;
692     ci->next = cache_queue_head;
693     if (ci->next != NULL)
694       ci->next->prev = ci;
695     cache_queue_head = ci;
696
697     if (cache_queue_tail == NULL)
698       cache_queue_tail = cache_queue_head;
699   }
700   else /* (side == TAIL) */
701   {
702     /* We don't move values back in the list.. */
703     if (ci->flags & CI_FLAGS_IN_QUEUE)
704       return (0);
705
706     assert (ci->next == NULL);
707     assert (ci->prev == NULL);
708
709     ci->prev = cache_queue_tail;
710
711     if (cache_queue_tail == NULL)
712       cache_queue_head = ci;
713     else
714       cache_queue_tail->next = ci;
715
716     cache_queue_tail = ci;
717   }
718
719   ci->flags |= CI_FLAGS_IN_QUEUE;
720
721   pthread_cond_signal(&queue_cond);
722   pthread_mutex_lock (&stats_lock);
723   stats_queue_length++;
724   pthread_mutex_unlock (&stats_lock);
725
726   return (0);
727 } /* }}} int enqueue_cache_item */
728
729 /*
730  * tree_callback_flush:
731  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
732  * while this is in progress.
733  */
734 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
735     gpointer data)
736 {
737   cache_item_t *ci;
738   callback_flush_data_t *cfd;
739
740   ci = (cache_item_t *) value;
741   cfd = (callback_flush_data_t *) data;
742
743   if (ci->flags & CI_FLAGS_IN_QUEUE)
744     return FALSE;
745
746   if (ci->values_num > 0
747       && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
748   {
749     enqueue_cache_item (ci, TAIL);
750   }
751   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
752       && (ci->values_num <= 0))
753   {
754     assert ((char *) key == ci->file);
755     if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
756     {
757       RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
758       return (FALSE);
759     }
760   }
761
762   return (FALSE);
763 } /* }}} gboolean tree_callback_flush */
764
765 static int flush_old_values (int max_age)
766 {
767   callback_flush_data_t cfd;
768   size_t k;
769
770   memset (&cfd, 0, sizeof (cfd));
771   /* Pass the current time as user data so that we don't need to call
772    * `time' for each node. */
773   cfd.now = time (NULL);
774   cfd.keys = NULL;
775   cfd.keys_num = 0;
776
777   if (max_age > 0)
778     cfd.abs_timeout = cfd.now - max_age;
779   else
780     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
781
782   /* `tree_callback_flush' will return the keys of all values that haven't
783    * been touched in the last `config_flush_interval' seconds in `cfd'.
784    * The char*'s in this array point to the same memory as ci->file, so we
785    * don't need to free them separately. */
786   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
787
788   for (k = 0; k < cfd.keys_num; k++)
789   {
790     /* should never fail, since we have held the cache_lock
791      * the entire time */
792     assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
793   }
794
795   if (cfd.keys != NULL)
796   {
797     free (cfd.keys);
798     cfd.keys = NULL;
799   }
800
801   return (0);
802 } /* int flush_old_values */
803
804 static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
805 {
806   struct timeval now;
807   struct timespec next_flush;
808   int status;
809
810   gettimeofday (&now, NULL);
811   next_flush.tv_sec = now.tv_sec + config_flush_interval;
812   next_flush.tv_nsec = 1000 * now.tv_usec;
813
814   pthread_mutex_lock(&cache_lock);
815
816   while (state == RUNNING)
817   {
818     gettimeofday (&now, NULL);
819     if ((now.tv_sec > next_flush.tv_sec)
820         || ((now.tv_sec == next_flush.tv_sec)
821           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
822     {
823       /* Flush all values that haven't been written in the last
824        * `config_write_interval' seconds. */
825       flush_old_values (config_write_interval);
826
827       /* Determine the time of the next cache flush. */
828       next_flush.tv_sec =
829         now.tv_sec + next_flush.tv_sec % config_flush_interval;
830
831       /* unlock the cache while we rotate so we don't block incoming
832        * updates if the fsync() blocks on disk I/O */
833       pthread_mutex_unlock(&cache_lock);
834       journal_rotate();
835       pthread_mutex_lock(&cache_lock);
836     }
837
838     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
839     if (status != 0 && status != ETIMEDOUT)
840     {
841       RRDD_LOG (LOG_ERR, "flush_thread_main: "
842                 "pthread_cond_timedwait returned %i.", status);
843     }
844   }
845
846   if (config_flush_at_shutdown)
847     flush_old_values (-1); /* flush everything */
848
849   state = SHUTDOWN;
850
851   pthread_mutex_unlock(&cache_lock);
852
853   return NULL;
854 } /* void *flush_thread_main */
855
856 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
857 {
858   pthread_mutex_lock (&cache_lock);
859
860   while (state != SHUTDOWN
861          || (cache_queue_head != NULL && config_flush_at_shutdown))
862   {
863     cache_item_t *ci;
864     char *file;
865     char **values;
866     size_t values_num;
867     int status;
868
869     /* Now, check if there's something to store away. If not, wait until
870      * something comes in. */
871     if (cache_queue_head == NULL)
872     {
873       status = pthread_cond_wait (&queue_cond, &cache_lock);
874       if ((status != 0) && (status != ETIMEDOUT))
875       {
876         RRDD_LOG (LOG_ERR, "queue_thread_main: "
877             "pthread_cond_wait returned %i.", status);
878       }
879     }
880
881     /* Check if a value has arrived. This may be NULL if we timed out or there
882      * was an interrupt such as a signal. */
883     if (cache_queue_head == NULL)
884       continue;
885
886     ci = cache_queue_head;
887
888     /* copy the relevant parts */
889     file = strdup (ci->file);
890     if (file == NULL)
891     {
892       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
893       continue;
894     }
895
896     assert(ci->values != NULL);
897     assert(ci->values_num > 0);
898
899     values = ci->values;
900     values_num = ci->values_num;
901
902     wipe_ci_values(ci, time(NULL));
903     remove_from_queue(ci);
904
905     pthread_mutex_unlock (&cache_lock);
906
907     rrd_clear_error ();
908     status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
909     if (status != 0)
910     {
911       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
912           "rrd_update_r (%s) failed with status %i. (%s)",
913           file, status, rrd_get_error());
914     }
915
916     journal_write("wrote", file);
917
918     /* Search again in the tree.  It's possible someone issued a "FORGET"
919      * while we were writing the update values. */
920     pthread_mutex_lock(&cache_lock);
921     ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
922     if (ci)
923       pthread_cond_broadcast(&ci->flushed);
924     pthread_mutex_unlock(&cache_lock);
925
926     rrd_free_ptrs((void ***) &values, &values_num);
927     free(file);
928
929     if (status == 0)
930     {
931       pthread_mutex_lock (&stats_lock);
932       stats_updates_written++;
933       stats_data_sets_written += values_num;
934       pthread_mutex_unlock (&stats_lock);
935     }
936
937     pthread_mutex_lock (&cache_lock);
938   }
939   pthread_mutex_unlock (&cache_lock);
940
941   return (NULL);
942 } /* }}} void *queue_thread_main */
943
944 static int buffer_get_field (char **buffer_ret, /* {{{ */
945     size_t *buffer_size_ret, char **field_ret)
946 {
947   char *buffer;
948   size_t buffer_pos;
949   size_t buffer_size;
950   char *field;
951   size_t field_size;
952   int status;
953
954   buffer = *buffer_ret;
955   buffer_pos = 0;
956   buffer_size = *buffer_size_ret;
957   field = *buffer_ret;
958   field_size = 0;
959
960   if (buffer_size <= 0)
961     return (-1);
962
963   /* This is ensured by `handle_request'. */
964   assert (buffer[buffer_size - 1] == '\0');
965
966   status = -1;
967   while (buffer_pos < buffer_size)
968   {
969     /* Check for end-of-field or end-of-buffer */
970     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
971     {
972       field[field_size] = 0;
973       field_size++;
974       buffer_pos++;
975       status = 0;
976       break;
977     }
978     /* Handle escaped characters. */
979     else if (buffer[buffer_pos] == '\\')
980     {
981       if (buffer_pos >= (buffer_size - 1))
982         break;
983       buffer_pos++;
984       field[field_size] = buffer[buffer_pos];
985       field_size++;
986       buffer_pos++;
987     }
988     /* Normal operation */ 
989     else
990     {
991       field[field_size] = buffer[buffer_pos];
992       field_size++;
993       buffer_pos++;
994     }
995   } /* while (buffer_pos < buffer_size) */
996
997   if (status != 0)
998     return (status);
999
1000   *buffer_ret = buffer + buffer_pos;
1001   *buffer_size_ret = buffer_size - buffer_pos;
1002   *field_ret = field;
1003
1004   return (0);
1005 } /* }}} int buffer_get_field */
1006
1007 /* if we're restricting writes to the base directory,
1008  * check whether the file falls within the dir
1009  * returns 1 if OK, otherwise 0
1010  */
1011 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1012 {
1013   assert(file != NULL);
1014
1015   if (!config_write_base_only
1016       || sock == NULL /* journal replay */
1017       || config_base_dir == NULL)
1018     return 1;
1019
1020   if (strstr(file, "../") != NULL) goto err;
1021
1022   /* relative paths without "../" are ok */
1023   if (*file != '/') return 1;
1024
1025   /* file must be of the format base + "/" + <1+ char filename> */
1026   if (strlen(file) < _config_base_dir_len + 2) goto err;
1027   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1028   if (*(file + _config_base_dir_len) != '/') goto err;
1029
1030   return 1;
1031
1032 err:
1033   if (sock != NULL && sock->fd >= 0)
1034     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1035
1036   return 0;
1037 } /* }}} static int check_file_access */
1038
1039 /* when using a base dir, convert relative paths to absolute paths.
1040  * if necessary, modifies the "filename" pointer to point
1041  * to the new path created in "tmp".  "tmp" is provided
1042  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1043  *
1044  * this allows us to optimize for the expected case (absolute path)
1045  * with a no-op.
1046  */
1047 static void get_abs_path(char **filename, char *tmp)
1048 {
1049   assert(tmp != NULL);
1050   assert(filename != NULL && *filename != NULL);
1051
1052   if (config_base_dir == NULL || **filename == '/')
1053     return;
1054
1055   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1056   *filename = tmp;
1057 } /* }}} static int get_abs_path */
1058
1059 /* returns 1 if we have the required privilege level,
1060  * otherwise issue an error to the user on sock */
1061 static int has_privilege (listen_socket_t *sock, /* {{{ */
1062                           socket_privilege priv)
1063 {
1064   if (sock == NULL) /* journal replay */
1065     return 1;
1066
1067   if (sock->privilege >= priv)
1068     return 1;
1069
1070   return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1071 } /* }}} static int has_privilege */
1072
1073 static int flush_file (const char *filename) /* {{{ */
1074 {
1075   cache_item_t *ci;
1076
1077   pthread_mutex_lock (&cache_lock);
1078
1079   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1080   if (ci == NULL)
1081   {
1082     pthread_mutex_unlock (&cache_lock);
1083     return (ENOENT);
1084   }
1085
1086   if (ci->values_num > 0)
1087   {
1088     /* Enqueue at head */
1089     enqueue_cache_item (ci, HEAD);
1090     pthread_cond_wait(&ci->flushed, &cache_lock);
1091   }
1092
1093   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1094    * may have been purged during our cond_wait() */
1095
1096   pthread_mutex_unlock(&cache_lock);
1097
1098   return (0);
1099 } /* }}} int flush_file */
1100
1101 static int syntax_error(listen_socket_t *sock, struct command *cmd) /* {{{ */
1102 {
1103   char *err = "Syntax error.\n";
1104
1105   if (cmd && cmd->syntax)
1106     err = cmd->syntax;
1107
1108   return send_response(sock, RESP_ERR, "Usage: %s", err);
1109 } /* }}} static int syntax_error() */
1110
1111 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1112 {
1113   uint64_t copy_queue_length;
1114   uint64_t copy_updates_received;
1115   uint64_t copy_flush_received;
1116   uint64_t copy_updates_written;
1117   uint64_t copy_data_sets_written;
1118   uint64_t copy_journal_bytes;
1119   uint64_t copy_journal_rotate;
1120
1121   uint64_t tree_nodes_number;
1122   uint64_t tree_depth;
1123
1124   pthread_mutex_lock (&stats_lock);
1125   copy_queue_length       = stats_queue_length;
1126   copy_updates_received   = stats_updates_received;
1127   copy_flush_received     = stats_flush_received;
1128   copy_updates_written    = stats_updates_written;
1129   copy_data_sets_written  = stats_data_sets_written;
1130   copy_journal_bytes      = stats_journal_bytes;
1131   copy_journal_rotate     = stats_journal_rotate;
1132   pthread_mutex_unlock (&stats_lock);
1133
1134   pthread_mutex_lock (&cache_lock);
1135   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1136   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1137   pthread_mutex_unlock (&cache_lock);
1138
1139   add_response_info(sock,
1140                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1141   add_response_info(sock,
1142                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1143   add_response_info(sock,
1144                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1145   add_response_info(sock,
1146                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1147   add_response_info(sock,
1148                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1149   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1150   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1151   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1152   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1153
1154   send_response(sock, RESP_OK, "Statistics follow\n");
1155
1156   return (0);
1157 } /* }}} int handle_request_stats */
1158
1159 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1160 {
1161   char *file, file_tmp[PATH_MAX];
1162   int status;
1163
1164   status = buffer_get_field (&buffer, &buffer_size, &file);
1165   if (status != 0)
1166   {
1167     return syntax_error(sock,cmd);
1168   }
1169   else
1170   {
1171     pthread_mutex_lock(&stats_lock);
1172     stats_flush_received++;
1173     pthread_mutex_unlock(&stats_lock);
1174
1175     get_abs_path(&file, file_tmp);
1176     if (!check_file_access(file, sock)) return 0;
1177
1178     status = flush_file (file);
1179     if (status == 0)
1180       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1181     else if (status == ENOENT)
1182     {
1183       /* no file in our tree; see whether it exists at all */
1184       struct stat statbuf;
1185
1186       memset(&statbuf, 0, sizeof(statbuf));
1187       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1188         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1189       else
1190         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1191     }
1192     else if (status < 0)
1193       return send_response(sock, RESP_ERR, "Internal error.\n");
1194     else
1195       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1196   }
1197
1198   /* NOTREACHED */
1199   assert(1==0);
1200 } /* }}} int handle_request_flush */
1201
1202 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1203 {
1204   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1205
1206   pthread_mutex_lock(&cache_lock);
1207   flush_old_values(-1);
1208   pthread_mutex_unlock(&cache_lock);
1209
1210   return send_response(sock, RESP_OK, "Started flush.\n");
1211 } /* }}} static int handle_request_flushall */
1212
1213 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1214 {
1215   int status;
1216   char *file, file_tmp[PATH_MAX];
1217   cache_item_t *ci;
1218
1219   status = buffer_get_field(&buffer, &buffer_size, &file);
1220   if (status != 0)
1221     return syntax_error(sock,cmd);
1222
1223   get_abs_path(&file, file_tmp);
1224
1225   pthread_mutex_lock(&cache_lock);
1226   ci = g_tree_lookup(cache_tree, file);
1227   if (ci == NULL)
1228   {
1229     pthread_mutex_unlock(&cache_lock);
1230     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1231   }
1232
1233   for (size_t i=0; i < ci->values_num; i++)
1234     add_response_info(sock, "%s\n", ci->values[i]);
1235
1236   pthread_mutex_unlock(&cache_lock);
1237   return send_response(sock, RESP_OK, "updates pending\n");
1238 } /* }}} static int handle_request_pending */
1239
1240 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1241 {
1242   int status;
1243   gboolean found;
1244   char *file, file_tmp[PATH_MAX];
1245
1246   status = buffer_get_field(&buffer, &buffer_size, &file);
1247   if (status != 0)
1248     return syntax_error(sock,cmd);
1249
1250   get_abs_path(&file, file_tmp);
1251   if (!check_file_access(file, sock)) return 0;
1252
1253   pthread_mutex_lock(&cache_lock);
1254   found = g_tree_remove(cache_tree, file);
1255   pthread_mutex_unlock(&cache_lock);
1256
1257   if (found == TRUE)
1258   {
1259     if (sock != NULL)
1260       journal_write("forget", file);
1261
1262     return send_response(sock, RESP_OK, "Gone!\n");
1263   }
1264   else
1265     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1266
1267   /* NOTREACHED */
1268   assert(1==0);
1269 } /* }}} static int handle_request_forget */
1270
1271 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1272 {
1273   cache_item_t *ci;
1274
1275   pthread_mutex_lock(&cache_lock);
1276
1277   ci = cache_queue_head;
1278   while (ci != NULL)
1279   {
1280     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1281     ci = ci->next;
1282   }
1283
1284   pthread_mutex_unlock(&cache_lock);
1285
1286   return send_response(sock, RESP_OK, "in queue.\n");
1287 } /* }}} int handle_request_queue */
1288
1289 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1290 {
1291   char *file, file_tmp[PATH_MAX];
1292   int values_num = 0;
1293   int status;
1294   char orig_buf[CMD_MAX];
1295
1296   cache_item_t *ci;
1297
1298   /* save it for the journal later */
1299   strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1300
1301   status = buffer_get_field (&buffer, &buffer_size, &file);
1302   if (status != 0)
1303     return syntax_error(sock,cmd);
1304
1305   pthread_mutex_lock(&stats_lock);
1306   stats_updates_received++;
1307   pthread_mutex_unlock(&stats_lock);
1308
1309   get_abs_path(&file, file_tmp);
1310   if (!check_file_access(file, sock)) return 0;
1311
1312   pthread_mutex_lock (&cache_lock);
1313   ci = g_tree_lookup (cache_tree, file);
1314
1315   if (ci == NULL) /* {{{ */
1316   {
1317     struct stat statbuf;
1318     cache_item_t *tmp;
1319
1320     /* don't hold the lock while we setup; stat(2) might block */
1321     pthread_mutex_unlock(&cache_lock);
1322
1323     memset (&statbuf, 0, sizeof (statbuf));
1324     status = stat (file, &statbuf);
1325     if (status != 0)
1326     {
1327       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1328
1329       status = errno;
1330       if (status == ENOENT)
1331         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1332       else
1333         return send_response(sock, RESP_ERR,
1334                              "stat failed with error %i.\n", status);
1335     }
1336     if (!S_ISREG (statbuf.st_mode))
1337       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1338
1339     if (access(file, R_OK|W_OK) != 0)
1340       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1341                            file, rrd_strerror(errno));
1342
1343     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1344     if (ci == NULL)
1345     {
1346       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1347
1348       return send_response(sock, RESP_ERR, "malloc failed.\n");
1349     }
1350     memset (ci, 0, sizeof (cache_item_t));
1351
1352     ci->file = strdup (file);
1353     if (ci->file == NULL)
1354     {
1355       free (ci);
1356       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1357
1358       return send_response(sock, RESP_ERR, "strdup failed.\n");
1359     }
1360
1361     wipe_ci_values(ci, now);
1362     ci->flags = CI_FLAGS_IN_TREE;
1363     pthread_cond_init(&ci->flushed, NULL);
1364
1365     pthread_mutex_lock(&cache_lock);
1366
1367     /* another UPDATE might have added this entry in the meantime */
1368     tmp = g_tree_lookup (cache_tree, file);
1369     if (tmp == NULL)
1370       g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1371     else
1372     {
1373       free_cache_item (ci);
1374       ci = tmp;
1375     }
1376
1377     /* state may have changed while we were unlocked */
1378     if (state == SHUTDOWN)
1379       return -1;
1380   } /* }}} */
1381   assert (ci != NULL);
1382
1383   /* don't re-write updates in replay mode */
1384   if (sock != NULL)
1385     journal_write("update", orig_buf);
1386
1387   while (buffer_size > 0)
1388   {
1389     char *value;
1390     time_t stamp;
1391     char *eostamp;
1392
1393     status = buffer_get_field (&buffer, &buffer_size, &value);
1394     if (status != 0)
1395     {
1396       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1397       break;
1398     }
1399
1400     /* make sure update time is always moving forward */
1401     stamp = strtol(value, &eostamp, 10);
1402     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1403     {
1404       pthread_mutex_unlock(&cache_lock);
1405       return send_response(sock, RESP_ERR,
1406                            "Cannot find timestamp in '%s'!\n", value);
1407     }
1408     else if (stamp <= ci->last_update_stamp)
1409     {
1410       pthread_mutex_unlock(&cache_lock);
1411       return send_response(sock, RESP_ERR,
1412                            "illegal attempt to update using time %ld when last"
1413                            " update time is %ld (minimum one second step)\n",
1414                            stamp, ci->last_update_stamp);
1415     }
1416     else
1417       ci->last_update_stamp = stamp;
1418
1419     if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
1420     {
1421       RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1422       continue;
1423     }
1424
1425     values_num++;
1426   }
1427
1428   if (((now - ci->last_flush_time) >= config_write_interval)
1429       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1430       && (ci->values_num > 0))
1431   {
1432     enqueue_cache_item (ci, TAIL);
1433   }
1434
1435   pthread_mutex_unlock (&cache_lock);
1436
1437   if (values_num < 1)
1438     return send_response(sock, RESP_ERR, "No values updated.\n");
1439   else
1440     return send_response(sock, RESP_OK,
1441                          "errors, enqueued %i value(s).\n", values_num);
1442
1443   /* NOTREACHED */
1444   assert(1==0);
1445
1446 } /* }}} int handle_request_update */
1447
1448 /* we came across a "WROTE" entry during journal replay.
1449  * throw away any values that we have accumulated for this file
1450  */
1451 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1452 {
1453   cache_item_t *ci;
1454   const char *file = buffer;
1455
1456   pthread_mutex_lock(&cache_lock);
1457
1458   ci = g_tree_lookup(cache_tree, file);
1459   if (ci == NULL)
1460   {
1461     pthread_mutex_unlock(&cache_lock);
1462     return (0);
1463   }
1464
1465   if (ci->values)
1466     rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1467
1468   wipe_ci_values(ci, now);
1469   remove_from_queue(ci);
1470
1471   pthread_mutex_unlock(&cache_lock);
1472   return (0);
1473 } /* }}} int handle_request_wrote */
1474
1475 /* start "BATCH" processing */
1476 static int batch_start (HANDLER_PROTO) /* {{{ */
1477 {
1478   int status;
1479   if (sock->batch_start)
1480     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1481
1482   status = send_response(sock, RESP_OK,
1483                          "Go ahead.  End with dot '.' on its own line.\n");
1484   sock->batch_start = time(NULL);
1485   sock->batch_cmd = 0;
1486
1487   return status;
1488 } /* }}} static int batch_start */
1489
1490 /* finish "BATCH" processing and return results to the client */
1491 static int batch_done (HANDLER_PROTO) /* {{{ */
1492 {
1493   assert(sock->batch_start);
1494   sock->batch_start = 0;
1495   sock->batch_cmd  = 0;
1496   return send_response(sock, RESP_OK, "errors\n");
1497 } /* }}} static int batch_done */
1498
1499 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1500 {
1501   return -1;
1502 } /* }}} static int handle_request_quit */
1503
1504 struct command COMMANDS[] = {
1505   {
1506     "UPDATE",
1507     handle_request_update,
1508     PRIV_HIGH,
1509     CMD_CONTEXT_ANY,
1510     "UPDATE <filename> <values> [<values> ...]\n"
1511     ,
1512     "Adds the given file to the internal cache if it is not yet known and\n"
1513     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1514     "for details.\n"
1515     "\n"
1516     "Each <values> has the following form:\n"
1517     "  <values> = <time>:<value>[:<value>[...]]\n"
1518     "See the rrdupdate(1) manpage for details.\n"
1519   },
1520   {
1521     "WROTE",
1522     handle_request_wrote,
1523     PRIV_HIGH,
1524     CMD_CONTEXT_JOURNAL,
1525     NULL,
1526     NULL
1527   },
1528   {
1529     "FLUSH",
1530     handle_request_flush,
1531     PRIV_LOW,
1532     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1533     "FLUSH <filename>\n"
1534     ,
1535     "Adds the given filename to the head of the update queue and returns\n"
1536     "after it has been dequeued.\n"
1537   },
1538   {
1539     "FLUSHALL",
1540     handle_request_flushall,
1541     PRIV_HIGH,
1542     CMD_CONTEXT_CLIENT,
1543     "FLUSHALL\n"
1544     ,
1545     "Triggers writing of all pending updates.  Returns immediately.\n"
1546   },
1547   {
1548     "PENDING",
1549     handle_request_pending,
1550     PRIV_HIGH,
1551     CMD_CONTEXT_CLIENT,
1552     "PENDING <filename>\n"
1553     ,
1554     "Shows any 'pending' updates for a file, in order.\n"
1555     "The updates shown have not yet been written to the underlying RRD file.\n"
1556   },
1557   {
1558     "FORGET",
1559     handle_request_forget,
1560     PRIV_HIGH,
1561     CMD_CONTEXT_ANY,
1562     "FORGET <filename>\n"
1563     ,
1564     "Removes the file completely from the cache.\n"
1565     "Any pending updates for the file will be lost.\n"
1566   },
1567   {
1568     "QUEUE",
1569     handle_request_queue,
1570     PRIV_LOW,
1571     CMD_CONTEXT_CLIENT,
1572     "QUEUE\n"
1573     ,
1574         "Shows all files in the output queue.\n"
1575     "The output is zero or more lines in the following format:\n"
1576     "(where <num_vals> is the number of values to be written)\n"
1577     "\n"
1578     "<num_vals> <filename>\n"
1579   },
1580   {
1581     "STATS",
1582     handle_request_stats,
1583     PRIV_LOW,
1584     CMD_CONTEXT_CLIENT,
1585     "STATS\n"
1586     ,
1587     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1588     "a description of the values.\n"
1589   },
1590   {
1591     "HELP",
1592     handle_request_help,
1593     PRIV_LOW,
1594     CMD_CONTEXT_CLIENT,
1595     "HELP [<command>]\n",
1596     NULL, /* special! */
1597   },
1598   {
1599     "BATCH",
1600     batch_start,
1601     PRIV_LOW,
1602     CMD_CONTEXT_CLIENT,
1603     "BATCH\n"
1604     ,
1605     "The 'BATCH' command permits the client to initiate a bulk load\n"
1606     "   of commands to rrdcached.\n"
1607     "\n"
1608     "Usage:\n"
1609     "\n"
1610     "    client: BATCH\n"
1611     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1612     "    client: command #1\n"
1613     "    client: command #2\n"
1614     "    client: ... and so on\n"
1615     "    client: .\n"
1616     "    server: 2 errors\n"
1617     "    server: 7 message for command #7\n"
1618     "    server: 9 message for command #9\n"
1619     "\n"
1620     "For more information, consult the rrdcached(1) documentation.\n"
1621   },
1622   {
1623     ".",   /* BATCH terminator */
1624     batch_done,
1625     PRIV_LOW,
1626     CMD_CONTEXT_BATCH,
1627     NULL,
1628     NULL
1629   },
1630   {
1631     "QUIT",
1632     handle_request_quit,
1633     PRIV_LOW,
1634     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1635     "QUIT\n"
1636     ,
1637     "Disconnect from rrdcached.\n"
1638   },
1639   {NULL,NULL,0,0,NULL,NULL}  /* LAST ENTRY */
1640 };
1641
1642 static struct command *find_command(char *cmd)
1643 {
1644   struct command *c = COMMANDS;
1645
1646   while (c->cmd != NULL)
1647   {
1648     if (strcasecmp(cmd, c->cmd) == 0)
1649       break;
1650     c++;
1651   }
1652
1653   if (c->cmd == NULL)
1654     return NULL;
1655   else
1656     return c;
1657 }
1658
1659 /* check whether commands are received in the expected context */
1660 static int command_check_context(listen_socket_t *sock, struct command *cmd)
1661 {
1662   if (sock == NULL)
1663     return (cmd->context & CMD_CONTEXT_JOURNAL);
1664   else if (sock->batch_start)
1665     return (cmd->context & CMD_CONTEXT_BATCH);
1666   else
1667     return (cmd->context & CMD_CONTEXT_CLIENT);
1668
1669   /* NOTREACHED */
1670   assert(1==0);
1671 }
1672
1673 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1674 {
1675   int status;
1676   char *cmd_str;
1677   char *resp_txt;
1678   struct command *help = NULL;
1679
1680   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1681   if (status == 0)
1682     help = find_command(cmd_str);
1683
1684   if (help && (help->syntax || help->help))
1685   {
1686     char tmp[CMD_MAX];
1687
1688     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1689     resp_txt = tmp;
1690
1691     if (help->syntax)
1692       add_response_info(sock, "Usage: %s\n", help->syntax);
1693
1694     if (help->help)
1695       add_response_info(sock, "%s\n", help->help);
1696   }
1697   else
1698   {
1699     help = COMMANDS;
1700     resp_txt = "Command overview\n";
1701
1702     while (help->cmd)
1703     {
1704       if (help->syntax)
1705         add_response_info(sock, "%s", help->syntax);
1706       help++;
1707     }
1708   }
1709
1710   return send_response(sock, RESP_OK, resp_txt);
1711 } /* }}} int handle_request_help */
1712
1713 /* if sock==NULL, we are in journal replay mode */
1714 static int handle_request (DISPATCH_PROTO) /* {{{ */
1715 {
1716   char *buffer_ptr = buffer;
1717   char *cmd_str = NULL;
1718   struct command *cmd = NULL;
1719   int status;
1720
1721   assert (buffer[buffer_size - 1] == '\0');
1722
1723   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1724   if (status != 0)
1725   {
1726     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1727     return (-1);
1728   }
1729
1730   if (sock != NULL && sock->batch_start)
1731     sock->batch_cmd++;
1732
1733   cmd = find_command(cmd_str);
1734   if (!cmd)
1735     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1736
1737   status = has_privilege(sock, cmd->min_priv);
1738   if (status <= 0)
1739     return status;
1740
1741   if (!command_check_context(sock, cmd))
1742     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1743
1744   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1745 } /* }}} int handle_request */
1746
1747 /* MUST NOT hold journal_lock before calling this */
1748 static void journal_rotate(void) /* {{{ */
1749 {
1750   FILE *old_fh = NULL;
1751   int new_fd;
1752
1753   if (journal_cur == NULL || journal_old == NULL)
1754     return;
1755
1756   pthread_mutex_lock(&journal_lock);
1757
1758   /* we rotate this way (rename before close) so that the we can release
1759    * the journal lock as fast as possible.  Journal writes to the new
1760    * journal can proceed immediately after the new file is opened.  The
1761    * fclose can then block without affecting new updates.
1762    */
1763   if (journal_fh != NULL)
1764   {
1765     old_fh = journal_fh;
1766     journal_fh = NULL;
1767     rename(journal_cur, journal_old);
1768     ++stats_journal_rotate;
1769   }
1770
1771   new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1772                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1773   if (new_fd >= 0)
1774   {
1775     journal_fh = fdopen(new_fd, "a");
1776     if (journal_fh == NULL)
1777       close(new_fd);
1778   }
1779
1780   pthread_mutex_unlock(&journal_lock);
1781
1782   if (old_fh != NULL)
1783     fclose(old_fh);
1784
1785   if (journal_fh == NULL)
1786   {
1787     RRDD_LOG(LOG_CRIT,
1788              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1789              journal_cur, rrd_strerror(errno));
1790
1791     RRDD_LOG(LOG_ERR,
1792              "JOURNALING DISABLED: All values will be flushed at shutdown");
1793     config_flush_at_shutdown = 1;
1794   }
1795
1796 } /* }}} static void journal_rotate */
1797
1798 static void journal_done(void) /* {{{ */
1799 {
1800   if (journal_cur == NULL)
1801     return;
1802
1803   pthread_mutex_lock(&journal_lock);
1804   if (journal_fh != NULL)
1805   {
1806     fclose(journal_fh);
1807     journal_fh = NULL;
1808   }
1809
1810   if (config_flush_at_shutdown)
1811   {
1812     RRDD_LOG(LOG_INFO, "removing journals");
1813     unlink(journal_old);
1814     unlink(journal_cur);
1815   }
1816   else
1817   {
1818     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1819              "journals will be used at next startup");
1820   }
1821
1822   pthread_mutex_unlock(&journal_lock);
1823
1824 } /* }}} static void journal_done */
1825
1826 static int journal_write(char *cmd, char *args) /* {{{ */
1827 {
1828   int chars;
1829
1830   if (journal_fh == NULL)
1831     return 0;
1832
1833   pthread_mutex_lock(&journal_lock);
1834   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1835   pthread_mutex_unlock(&journal_lock);
1836
1837   if (chars > 0)
1838   {
1839     pthread_mutex_lock(&stats_lock);
1840     stats_journal_bytes += chars;
1841     pthread_mutex_unlock(&stats_lock);
1842   }
1843
1844   return chars;
1845 } /* }}} static int journal_write */
1846
1847 static int journal_replay (const char *file) /* {{{ */
1848 {
1849   FILE *fh;
1850   int entry_cnt = 0;
1851   int fail_cnt = 0;
1852   uint64_t line = 0;
1853   char entry[CMD_MAX];
1854   time_t now;
1855
1856   if (file == NULL) return 0;
1857
1858   {
1859     char *reason = "unknown error";
1860     int status = 0;
1861     struct stat statbuf;
1862
1863     memset(&statbuf, 0, sizeof(statbuf));
1864     if (stat(file, &statbuf) != 0)
1865     {
1866       if (errno == ENOENT)
1867         return 0;
1868
1869       reason = "stat error";
1870       status = errno;
1871     }
1872     else if (!S_ISREG(statbuf.st_mode))
1873     {
1874       reason = "not a regular file";
1875       status = EPERM;
1876     }
1877     if (statbuf.st_uid != daemon_uid)
1878     {
1879       reason = "not owned by daemon user";
1880       status = EACCES;
1881     }
1882     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1883     {
1884       reason = "must not be user/group writable";
1885       status = EACCES;
1886     }
1887
1888     if (status != 0)
1889     {
1890       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1891                file, rrd_strerror(status), reason);
1892       return 0;
1893     }
1894   }
1895
1896   fh = fopen(file, "r");
1897   if (fh == NULL)
1898   {
1899     if (errno != ENOENT)
1900       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1901                file, rrd_strerror(errno));
1902     return 0;
1903   }
1904   else
1905     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1906
1907   now = time(NULL);
1908
1909   while(!feof(fh))
1910   {
1911     size_t entry_len;
1912
1913     ++line;
1914     if (fgets(entry, sizeof(entry), fh) == NULL)
1915       break;
1916     entry_len = strlen(entry);
1917
1918     /* check \n termination in case journal writing crashed mid-line */
1919     if (entry_len == 0)
1920       continue;
1921     else if (entry[entry_len - 1] != '\n')
1922     {
1923       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1924       ++fail_cnt;
1925       continue;
1926     }
1927
1928     entry[entry_len - 1] = '\0';
1929
1930     if (handle_request(NULL, now, entry, entry_len) == 0)
1931       ++entry_cnt;
1932     else
1933       ++fail_cnt;
1934   }
1935
1936   fclose(fh);
1937
1938   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1939            entry_cnt, fail_cnt);
1940
1941   return entry_cnt > 0 ? 1 : 0;
1942 } /* }}} static int journal_replay */
1943
1944 static void journal_init(void) /* {{{ */
1945 {
1946   int had_journal = 0;
1947
1948   if (journal_cur == NULL) return;
1949
1950   pthread_mutex_lock(&journal_lock);
1951
1952   RRDD_LOG(LOG_INFO, "checking for journal files");
1953
1954   had_journal += journal_replay(journal_old);
1955   had_journal += journal_replay(journal_cur);
1956
1957   /* it must have been a crash.  start a flush */
1958   if (had_journal && config_flush_at_shutdown)
1959     flush_old_values(-1);
1960
1961   pthread_mutex_unlock(&journal_lock);
1962   journal_rotate();
1963
1964   RRDD_LOG(LOG_INFO, "journal processing complete");
1965
1966 } /* }}} static void journal_init */
1967
1968 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
1969 {
1970   assert(sock != NULL);
1971
1972   free(sock->rbuf);  sock->rbuf = NULL;
1973   free(sock->wbuf);  sock->wbuf = NULL;
1974   free(sock);
1975 } /* }}} void free_listen_socket */
1976
1977 static void close_connection(listen_socket_t *sock) /* {{{ */
1978 {
1979   if (sock->fd >= 0)
1980   {
1981     close(sock->fd);
1982     sock->fd = -1;
1983   }
1984
1985   free_listen_socket(sock);
1986
1987 } /* }}} void close_connection */
1988
1989 static void *connection_thread_main (void *args) /* {{{ */
1990 {
1991   listen_socket_t *sock;
1992   int fd;
1993
1994   sock = (listen_socket_t *) args;
1995   fd = sock->fd;
1996
1997   /* init read buffers */
1998   sock->next_read = sock->next_cmd = 0;
1999   sock->rbuf = malloc(RBUF_SIZE);
2000   if (sock->rbuf == NULL)
2001   {
2002     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2003     close_connection(sock);
2004     return NULL;
2005   }
2006
2007   pthread_mutex_lock (&connection_threads_lock);
2008   connection_threads_num++;
2009   pthread_mutex_unlock (&connection_threads_lock);
2010
2011   while (state == RUNNING)
2012   {
2013     char *cmd;
2014     ssize_t cmd_len;
2015     ssize_t rbytes;
2016     time_t now;
2017
2018     struct pollfd pollfd;
2019     int status;
2020
2021     pollfd.fd = fd;
2022     pollfd.events = POLLIN | POLLPRI;
2023     pollfd.revents = 0;
2024
2025     status = poll (&pollfd, 1, /* timeout = */ 500);
2026     if (state != RUNNING)
2027       break;
2028     else if (status == 0) /* timeout */
2029       continue;
2030     else if (status < 0) /* error */
2031     {
2032       status = errno;
2033       if (status != EINTR)
2034         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2035       continue;
2036     }
2037
2038     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2039       break;
2040     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2041     {
2042       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2043           "poll(2) returned something unexpected: %#04hx",
2044           pollfd.revents);
2045       break;
2046     }
2047
2048     rbytes = read(fd, sock->rbuf + sock->next_read,
2049                   RBUF_SIZE - sock->next_read);
2050     if (rbytes < 0)
2051     {
2052       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2053       break;
2054     }
2055     else if (rbytes == 0)
2056       break; /* eof */
2057
2058     sock->next_read += rbytes;
2059
2060     if (sock->batch_start)
2061       now = sock->batch_start;
2062     else
2063       now = time(NULL);
2064
2065     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2066     {
2067       status = handle_request (sock, now, cmd, cmd_len+1);
2068       if (status != 0)
2069         goto out_close;
2070     }
2071   }
2072
2073 out_close:
2074   close_connection(sock);
2075
2076   /* Remove this thread from the connection threads list */
2077   pthread_mutex_lock (&connection_threads_lock);
2078   connection_threads_num--;
2079   if (connection_threads_num <= 0)
2080     pthread_cond_broadcast(&connection_threads_done);
2081   pthread_mutex_unlock (&connection_threads_lock);
2082
2083   return (NULL);
2084 } /* }}} void *connection_thread_main */
2085
2086 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2087 {
2088   int fd;
2089   struct sockaddr_un sa;
2090   listen_socket_t *temp;
2091   int status;
2092   const char *path;
2093
2094   path = sock->addr;
2095   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2096     path += strlen("unix:");
2097
2098   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2099       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2100   if (temp == NULL)
2101   {
2102     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2103     return (-1);
2104   }
2105   listen_fds = temp;
2106   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2107
2108   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2109   if (fd < 0)
2110   {
2111     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2112              rrd_strerror(errno));
2113     return (-1);
2114   }
2115
2116   memset (&sa, 0, sizeof (sa));
2117   sa.sun_family = AF_UNIX;
2118   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2119
2120   /* if we've gotten this far, we own the pid file.  any daemon started
2121    * with the same args must not be alive.  therefore, ensure that we can
2122    * create the socket...
2123    */
2124   unlink(path);
2125
2126   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2127   if (status != 0)
2128   {
2129     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2130              path, rrd_strerror(errno));
2131     close (fd);
2132     return (-1);
2133   }
2134
2135   status = listen (fd, /* backlog = */ 10);
2136   if (status != 0)
2137   {
2138     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2139              path, rrd_strerror(errno));
2140     close (fd);
2141     unlink (path);
2142     return (-1);
2143   }
2144
2145   listen_fds[listen_fds_num].fd = fd;
2146   listen_fds[listen_fds_num].family = PF_UNIX;
2147   strncpy(listen_fds[listen_fds_num].addr, path,
2148           sizeof (listen_fds[listen_fds_num].addr) - 1);
2149   listen_fds_num++;
2150
2151   return (0);
2152 } /* }}} int open_listen_socket_unix */
2153
2154 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2155 {
2156   struct addrinfo ai_hints;
2157   struct addrinfo *ai_res;
2158   struct addrinfo *ai_ptr;
2159   char addr_copy[NI_MAXHOST];
2160   char *addr;
2161   char *port;
2162   int status;
2163
2164   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2165   addr_copy[sizeof (addr_copy) - 1] = 0;
2166   addr = addr_copy;
2167
2168   memset (&ai_hints, 0, sizeof (ai_hints));
2169   ai_hints.ai_flags = 0;
2170 #ifdef AI_ADDRCONFIG
2171   ai_hints.ai_flags |= AI_ADDRCONFIG;
2172 #endif
2173   ai_hints.ai_family = AF_UNSPEC;
2174   ai_hints.ai_socktype = SOCK_STREAM;
2175
2176   port = NULL;
2177   if (*addr == '[') /* IPv6+port format */
2178   {
2179     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2180     addr++;
2181
2182     port = strchr (addr, ']');
2183     if (port == NULL)
2184     {
2185       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2186       return (-1);
2187     }
2188     *port = 0;
2189     port++;
2190
2191     if (*port == ':')
2192       port++;
2193     else if (*port == 0)
2194       port = NULL;
2195     else
2196     {
2197       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2198       return (-1);
2199     }
2200   } /* if (*addr = ']') */
2201   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2202   {
2203     port = rindex(addr, ':');
2204     if (port != NULL)
2205     {
2206       *port = 0;
2207       port++;
2208     }
2209   }
2210   ai_res = NULL;
2211   status = getaddrinfo (addr,
2212                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2213                         &ai_hints, &ai_res);
2214   if (status != 0)
2215   {
2216     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2217              addr, gai_strerror (status));
2218     return (-1);
2219   }
2220
2221   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2222   {
2223     int fd;
2224     listen_socket_t *temp;
2225     int one = 1;
2226
2227     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2228         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2229     if (temp == NULL)
2230     {
2231       fprintf (stderr,
2232                "rrdcached: open_listen_socket_network: realloc failed.\n");
2233       continue;
2234     }
2235     listen_fds = temp;
2236     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2237
2238     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2239     if (fd < 0)
2240     {
2241       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2242                rrd_strerror(errno));
2243       continue;
2244     }
2245
2246     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2247
2248     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2249     if (status != 0)
2250     {
2251       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2252                sock->addr, rrd_strerror(errno));
2253       close (fd);
2254       continue;
2255     }
2256
2257     status = listen (fd, /* backlog = */ 10);
2258     if (status != 0)
2259     {
2260       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2261                sock->addr, rrd_strerror(errno));
2262       close (fd);
2263       freeaddrinfo(ai_res);
2264       return (-1);
2265     }
2266
2267     listen_fds[listen_fds_num].fd = fd;
2268     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2269     listen_fds_num++;
2270   } /* for (ai_ptr) */
2271
2272   freeaddrinfo(ai_res);
2273   return (0);
2274 } /* }}} static int open_listen_socket_network */
2275
2276 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2277 {
2278   assert(sock != NULL);
2279   assert(sock->addr != NULL);
2280
2281   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2282       || sock->addr[0] == '/')
2283     return (open_listen_socket_unix(sock));
2284   else
2285     return (open_listen_socket_network(sock));
2286 } /* }}} int open_listen_socket */
2287
2288 static int close_listen_sockets (void) /* {{{ */
2289 {
2290   size_t i;
2291
2292   for (i = 0; i < listen_fds_num; i++)
2293   {
2294     close (listen_fds[i].fd);
2295
2296     if (listen_fds[i].family == PF_UNIX)
2297       unlink(listen_fds[i].addr);
2298   }
2299
2300   free (listen_fds);
2301   listen_fds = NULL;
2302   listen_fds_num = 0;
2303
2304   return (0);
2305 } /* }}} int close_listen_sockets */
2306
2307 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2308 {
2309   struct pollfd *pollfds;
2310   int pollfds_num;
2311   int status;
2312   int i;
2313
2314   if (listen_fds_num < 1)
2315   {
2316     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2317     return (NULL);
2318   }
2319
2320   pollfds_num = listen_fds_num;
2321   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2322   if (pollfds == NULL)
2323   {
2324     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2325     return (NULL);
2326   }
2327   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2328
2329   RRDD_LOG(LOG_INFO, "listening for connections");
2330
2331   while (state == RUNNING)
2332   {
2333     for (i = 0; i < pollfds_num; i++)
2334     {
2335       pollfds[i].fd = listen_fds[i].fd;
2336       pollfds[i].events = POLLIN | POLLPRI;
2337       pollfds[i].revents = 0;
2338     }
2339
2340     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2341     if (state != RUNNING)
2342       break;
2343     else if (status == 0) /* timeout */
2344       continue;
2345     else if (status < 0) /* error */
2346     {
2347       status = errno;
2348       if (status != EINTR)
2349       {
2350         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2351       }
2352       continue;
2353     }
2354
2355     for (i = 0; i < pollfds_num; i++)
2356     {
2357       listen_socket_t *client_sock;
2358       struct sockaddr_storage client_sa;
2359       socklen_t client_sa_size;
2360       pthread_t tid;
2361       pthread_attr_t attr;
2362
2363       if (pollfds[i].revents == 0)
2364         continue;
2365
2366       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2367       {
2368         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2369             "poll(2) returned something unexpected for listen FD #%i.",
2370             pollfds[i].fd);
2371         continue;
2372       }
2373
2374       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2375       if (client_sock == NULL)
2376       {
2377         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2378         continue;
2379       }
2380       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2381
2382       client_sa_size = sizeof (client_sa);
2383       client_sock->fd = accept (pollfds[i].fd,
2384           (struct sockaddr *) &client_sa, &client_sa_size);
2385       if (client_sock->fd < 0)
2386       {
2387         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2388         free(client_sock);
2389         continue;
2390       }
2391
2392       pthread_attr_init (&attr);
2393       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2394
2395       status = pthread_create (&tid, &attr, connection_thread_main,
2396                                client_sock);
2397       if (status != 0)
2398       {
2399         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2400         close_connection(client_sock);
2401         continue;
2402       }
2403     } /* for (pollfds_num) */
2404   } /* while (state == RUNNING) */
2405
2406   RRDD_LOG(LOG_INFO, "starting shutdown");
2407
2408   close_listen_sockets ();
2409
2410   pthread_mutex_lock (&connection_threads_lock);
2411   while (connection_threads_num > 0)
2412     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2413   pthread_mutex_unlock (&connection_threads_lock);
2414
2415   free(pollfds);
2416
2417   return (NULL);
2418 } /* }}} void *listen_thread_main */
2419
2420 static int daemonize (void) /* {{{ */
2421 {
2422   int pid_fd;
2423   char *base_dir;
2424
2425   daemon_uid = geteuid();
2426
2427   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2428   if (pid_fd < 0)
2429     pid_fd = check_pidfile();
2430   if (pid_fd < 0)
2431     return pid_fd;
2432
2433   /* open all the listen sockets */
2434   if (config_listen_address_list_len > 0)
2435   {
2436     for (size_t i = 0; i < config_listen_address_list_len; i++)
2437       open_listen_socket (config_listen_address_list[i]);
2438
2439     rrd_free_ptrs((void ***) &config_listen_address_list,
2440                   &config_listen_address_list_len);
2441   }
2442   else
2443   {
2444     listen_socket_t sock;
2445     memset(&sock, 0, sizeof(sock));
2446     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2447     open_listen_socket (&sock);
2448   }
2449
2450   if (listen_fds_num < 1)
2451   {
2452     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2453     goto error;
2454   }
2455
2456   if (!stay_foreground)
2457   {
2458     pid_t child;
2459
2460     child = fork ();
2461     if (child < 0)
2462     {
2463       fprintf (stderr, "daemonize: fork(2) failed.\n");
2464       goto error;
2465     }
2466     else if (child > 0)
2467       exit(0);
2468
2469     /* Become session leader */
2470     setsid ();
2471
2472     /* Open the first three file descriptors to /dev/null */
2473     close (2);
2474     close (1);
2475     close (0);
2476
2477     open ("/dev/null", O_RDWR);
2478     if (dup(0) == -1 || dup(0) == -1){
2479         RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2480     }
2481   } /* if (!stay_foreground) */
2482
2483   /* Change into the /tmp directory. */
2484   base_dir = (config_base_dir != NULL)
2485     ? config_base_dir
2486     : "/tmp";
2487
2488   if (chdir (base_dir) != 0)
2489   {
2490     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2491     goto error;
2492   }
2493
2494   install_signal_handlers();
2495
2496   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2497   RRDD_LOG(LOG_INFO, "starting up");
2498
2499   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2500                                 (GDestroyNotify) free_cache_item);
2501   if (cache_tree == NULL)
2502   {
2503     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2504     goto error;
2505   }
2506
2507   return write_pidfile (pid_fd);
2508
2509 error:
2510   remove_pidfile();
2511   return -1;
2512 } /* }}} int daemonize */
2513
2514 static int cleanup (void) /* {{{ */
2515 {
2516   pthread_cond_broadcast (&flush_cond);
2517   pthread_join (flush_thread, NULL);
2518
2519   pthread_cond_broadcast (&queue_cond);
2520   for (int i = 0; i < config_queue_threads; i++)
2521     pthread_join (queue_threads[i], NULL);
2522
2523   if (config_flush_at_shutdown)
2524   {
2525     assert(cache_queue_head == NULL);
2526     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2527   }
2528
2529   journal_done();
2530   remove_pidfile ();
2531
2532   free(queue_threads);
2533   free(config_base_dir);
2534   free(config_pid_file);
2535   free(journal_cur);
2536   free(journal_old);
2537
2538   pthread_mutex_lock(&cache_lock);
2539   g_tree_destroy(cache_tree);
2540
2541   RRDD_LOG(LOG_INFO, "goodbye");
2542   closelog ();
2543
2544   return (0);
2545 } /* }}} int cleanup */
2546
2547 static int read_options (int argc, char **argv) /* {{{ */
2548 {
2549   int option;
2550   int status = 0;
2551
2552   while ((option = getopt(argc, argv, "gl:L:f:w:z:t:Bb:p:Fj:h?")) != -1)
2553   {
2554     switch (option)
2555     {
2556       case 'g':
2557         stay_foreground=1;
2558         break;
2559
2560       case 'L':
2561       case 'l':
2562       {
2563         listen_socket_t *new;
2564
2565         new = malloc(sizeof(listen_socket_t));
2566         if (new == NULL)
2567         {
2568           fprintf(stderr, "read_options: malloc failed.\n");
2569           return(2);
2570         }
2571         memset(new, 0, sizeof(listen_socket_t));
2572
2573         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2574         new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2575
2576         if (!rrd_add_ptr((void ***)&config_listen_address_list,
2577                          &config_listen_address_list_len, new))
2578         {
2579           fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2580           return (2);
2581         }
2582       }
2583       break;
2584
2585       case 'f':
2586       {
2587         int temp;
2588
2589         temp = atoi (optarg);
2590         if (temp > 0)
2591           config_flush_interval = temp;
2592         else
2593         {
2594           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2595           status = 3;
2596         }
2597       }
2598       break;
2599
2600       case 'w':
2601       {
2602         int temp;
2603
2604         temp = atoi (optarg);
2605         if (temp > 0)
2606           config_write_interval = temp;
2607         else
2608         {
2609           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2610           status = 2;
2611         }
2612       }
2613       break;
2614
2615       case 'z':
2616       {
2617         int temp;
2618
2619         temp = atoi(optarg);
2620         if (temp > 0)
2621           config_write_jitter = temp;
2622         else
2623         {
2624           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2625           status = 2;
2626         }
2627
2628         break;
2629       }
2630
2631       case 't':
2632       {
2633         int threads;
2634         threads = atoi(optarg);
2635         if (threads >= 1)
2636           config_queue_threads = threads;
2637         else
2638         {
2639           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2640           return 1;
2641         }
2642       }
2643       break;
2644
2645       case 'B':
2646         config_write_base_only = 1;
2647         break;
2648
2649       case 'b':
2650       {
2651         size_t len;
2652         char base_realpath[PATH_MAX];
2653
2654         if (config_base_dir != NULL)
2655           free (config_base_dir);
2656         config_base_dir = strdup (optarg);
2657         if (config_base_dir == NULL)
2658         {
2659           fprintf (stderr, "read_options: strdup failed.\n");
2660           return (3);
2661         }
2662
2663         /* make sure that the base directory is not resolved via
2664          * symbolic links.  this makes some performance-enhancing
2665          * assumptions possible (we don't have to resolve paths
2666          * that start with a "/")
2667          */
2668         if (realpath(config_base_dir, base_realpath) == NULL)
2669         {
2670           fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2671           return 5;
2672         }
2673         else if (strncmp(config_base_dir,
2674                          base_realpath, sizeof(base_realpath)) != 0)
2675         {
2676           fprintf(stderr,
2677                   "Base directory (-b) resolved via file system links!\n"
2678                   "Please consult rrdcached '-b' documentation!\n"
2679                   "Consider specifying the real directory (%s)\n",
2680                   base_realpath);
2681           return 5;
2682         }
2683
2684         len = strlen (config_base_dir);
2685         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2686         {
2687           config_base_dir[len - 1] = 0;
2688           len--;
2689         }
2690
2691         if (len < 1)
2692         {
2693           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2694           return (4);
2695         }
2696
2697         _config_base_dir_len = len;
2698       }
2699       break;
2700
2701       case 'p':
2702       {
2703         if (config_pid_file != NULL)
2704           free (config_pid_file);
2705         config_pid_file = strdup (optarg);
2706         if (config_pid_file == NULL)
2707         {
2708           fprintf (stderr, "read_options: strdup failed.\n");
2709           return (3);
2710         }
2711       }
2712       break;
2713
2714       case 'F':
2715         config_flush_at_shutdown = 1;
2716         break;
2717
2718       case 'j':
2719       {
2720         struct stat statbuf;
2721         const char *dir = optarg;
2722
2723         status = stat(dir, &statbuf);
2724         if (status != 0)
2725         {
2726           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2727           return 6;
2728         }
2729
2730         if (!S_ISDIR(statbuf.st_mode)
2731             || access(dir, R_OK|W_OK|X_OK) != 0)
2732         {
2733           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2734                   errno ? rrd_strerror(errno) : "");
2735           return 6;
2736         }
2737
2738         journal_cur = malloc(PATH_MAX + 1);
2739         journal_old = malloc(PATH_MAX + 1);
2740         if (journal_cur == NULL || journal_old == NULL)
2741         {
2742           fprintf(stderr, "malloc failure for journal files\n");
2743           return 6;
2744         }
2745         else 
2746         {
2747           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2748           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2749         }
2750       }
2751       break;
2752
2753       case 'h':
2754       case '?':
2755         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2756             "\n"
2757             "Usage: rrdcached [options]\n"
2758             "\n"
2759             "Valid options are:\n"
2760             "  -l <address>  Socket address to listen to.\n"
2761             "  -L <address>  Socket address to listen to ('FLUSH' only).\n"
2762             "  -w <seconds>  Interval in which to write data.\n"
2763             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2764             "  -t <threads>  Number of write threads.\n"
2765             "  -f <seconds>  Interval in which to flush dead data.\n"
2766             "  -p <file>     Location of the PID-file.\n"
2767             "  -b <dir>      Base directory to change to.\n"
2768             "  -B            Restrict file access to paths within -b <dir>\n"
2769             "  -g            Do not fork and run in the foreground.\n"
2770             "  -j <dir>      Directory in which to create the journal files.\n"
2771             "  -F            Always flush all updates at shutdown\n"
2772             "\n"
2773             "For more information and a detailed description of all options "
2774             "please refer\n"
2775             "to the rrdcached(1) manual page.\n",
2776             VERSION);
2777         status = -1;
2778         break;
2779     } /* switch (option) */
2780   } /* while (getopt) */
2781
2782   /* advise the user when values are not sane */
2783   if (config_flush_interval < 2 * config_write_interval)
2784     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2785             " 2x write interval (-w) !\n");
2786   if (config_write_jitter > config_write_interval)
2787     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2788             " write interval (-w) !\n");
2789
2790   if (config_write_base_only && config_base_dir == NULL)
2791     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2792             "  Consult the rrdcached documentation\n");
2793
2794   if (journal_cur == NULL)
2795     config_flush_at_shutdown = 1;
2796
2797   return (status);
2798 } /* }}} int read_options */
2799
2800 int main (int argc, char **argv)
2801 {
2802   int status;
2803
2804   status = read_options (argc, argv);
2805   if (status != 0)
2806   {
2807     if (status < 0)
2808       status = 0;
2809     return (status);
2810   }
2811
2812   status = daemonize ();
2813   if (status != 0)
2814   {
2815     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
2816     return (1);
2817   }
2818
2819   journal_init();
2820
2821   /* start the queue threads */
2822   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
2823   if (queue_threads == NULL)
2824   {
2825     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
2826     cleanup();
2827     return (1);
2828   }
2829   for (int i = 0; i < config_queue_threads; i++)
2830   {
2831     memset (&queue_threads[i], 0, sizeof (*queue_threads));
2832     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
2833     if (status != 0)
2834     {
2835       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2836       cleanup();
2837       return (1);
2838     }
2839   }
2840
2841   /* start the flush thread */
2842   memset(&flush_thread, 0, sizeof(flush_thread));
2843   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
2844   if (status != 0)
2845   {
2846     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
2847     cleanup();
2848     return (1);
2849   }
2850
2851   listen_thread_main (NULL);
2852   cleanup ();
2853
2854   return (0);
2855 } /* int main */
2856
2857 /*
2858  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2859  */