rrdcached: journal_replay default failure reason, just in case
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
73
74 #include "rrd.h"
75 #include "rrd_client.h"
76
77 #include <stdlib.h>
78 #include <stdint.h>
79 #include <stdio.h>
80 #include <unistd.h>
81 #include <string.h>
82 #include <strings.h>
83 #include <stdint.h>
84 #include <inttypes.h>
85
86 #include <sys/types.h>
87 #include <sys/stat.h>
88 #include <fcntl.h>
89 #include <signal.h>
90 #include <sys/socket.h>
91 #include <sys/un.h>
92 #include <netdb.h>
93 #include <poll.h>
94 #include <syslog.h>
95 #include <pthread.h>
96 #include <errno.h>
97 #include <assert.h>
98 #include <sys/time.h>
99 #include <time.h>
100
101 #include <glib-2.0/glib.h>
102 /* }}} */
103
104 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
105
106 #ifndef __GNUC__
107 # define __attribute__(x) /**/
108 #endif
109
110 /*
111  * Types
112  */
113 typedef enum
114 {
115   PRIV_LOW,
116   PRIV_HIGH
117 } socket_privilege;
118
119 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
120
121 struct listen_socket_s
122 {
123   int fd;
124   char addr[PATH_MAX + 1];
125   int family;
126   socket_privilege privilege;
127
128   /* state for BATCH processing */
129   time_t batch_start;
130   int batch_cmd;
131
132   /* buffered IO */
133   char *rbuf;
134   off_t next_cmd;
135   off_t next_read;
136
137   char *wbuf;
138   ssize_t wbuf_len;
139 };
140 typedef struct listen_socket_s listen_socket_t;
141
142 struct cache_item_s;
143 typedef struct cache_item_s cache_item_t;
144 struct cache_item_s
145 {
146   char *file;
147   char **values;
148   int values_num;
149   time_t last_flush_time;
150   time_t last_update_stamp;
151 #define CI_FLAGS_IN_TREE  (1<<0)
152 #define CI_FLAGS_IN_QUEUE (1<<1)
153   int flags;
154   pthread_cond_t  flushed;
155   cache_item_t *prev;
156   cache_item_t *next;
157 };
158
159 struct callback_flush_data_s
160 {
161   time_t now;
162   time_t abs_timeout;
163   char **keys;
164   size_t keys_num;
165 };
166 typedef struct callback_flush_data_s callback_flush_data_t;
167
168 enum queue_side_e
169 {
170   HEAD,
171   TAIL
172 };
173 typedef enum queue_side_e queue_side_t;
174
175 /* max length of socket command or response */
176 #define CMD_MAX 4096
177 #define RBUF_SIZE (CMD_MAX*2)
178
179 /*
180  * Variables
181  */
182 static int stay_foreground = 0;
183 static uid_t daemon_uid;
184
185 static listen_socket_t *listen_fds = NULL;
186 static size_t listen_fds_num = 0;
187
188 static int do_shutdown = 0;
189
190 static pthread_t queue_thread;
191
192 static pthread_t *connection_threads = NULL;
193 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
194 static int connection_threads_num = 0;
195
196 /* Cache stuff */
197 static GTree          *cache_tree = NULL;
198 static cache_item_t   *cache_queue_head = NULL;
199 static cache_item_t   *cache_queue_tail = NULL;
200 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
201 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
202
203 static int config_write_interval = 300;
204 static int config_write_jitter   = 0;
205 static int config_flush_interval = 3600;
206 static int config_flush_at_shutdown = 0;
207 static char *config_pid_file = NULL;
208 static char *config_base_dir = NULL;
209 static size_t _config_base_dir_len = 0;
210 static int config_write_base_only = 0;
211
212 static listen_socket_t **config_listen_address_list = NULL;
213 static int config_listen_address_list_len = 0;
214
215 static uint64_t stats_queue_length = 0;
216 static uint64_t stats_updates_received = 0;
217 static uint64_t stats_flush_received = 0;
218 static uint64_t stats_updates_written = 0;
219 static uint64_t stats_data_sets_written = 0;
220 static uint64_t stats_journal_bytes = 0;
221 static uint64_t stats_journal_rotate = 0;
222 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
223
224 /* Journaled updates */
225 static char *journal_cur = NULL;
226 static char *journal_old = NULL;
227 static FILE *journal_fh = NULL;
228 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
229 static int journal_write(char *cmd, char *args);
230 static void journal_done(void);
231 static void journal_rotate(void);
232
233 /* 
234  * Functions
235  */
236 static void sig_common (const char *sig) /* {{{ */
237 {
238   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
239   do_shutdown++;
240   pthread_cond_broadcast(&cache_cond);
241 } /* }}} void sig_common */
242
243 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
244 {
245   sig_common("INT");
246 } /* }}} void sig_int_handler */
247
248 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
249 {
250   sig_common("TERM");
251 } /* }}} void sig_term_handler */
252
253 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
254 {
255   config_flush_at_shutdown = 1;
256   sig_common("USR1");
257 } /* }}} void sig_usr1_handler */
258
259 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
260 {
261   config_flush_at_shutdown = 0;
262   sig_common("USR2");
263 } /* }}} void sig_usr2_handler */
264
265 static void install_signal_handlers(void) /* {{{ */
266 {
267   /* These structures are static, because `sigaction' behaves weird if the are
268    * overwritten.. */
269   static struct sigaction sa_int;
270   static struct sigaction sa_term;
271   static struct sigaction sa_pipe;
272   static struct sigaction sa_usr1;
273   static struct sigaction sa_usr2;
274
275   /* Install signal handlers */
276   memset (&sa_int, 0, sizeof (sa_int));
277   sa_int.sa_handler = sig_int_handler;
278   sigaction (SIGINT, &sa_int, NULL);
279
280   memset (&sa_term, 0, sizeof (sa_term));
281   sa_term.sa_handler = sig_term_handler;
282   sigaction (SIGTERM, &sa_term, NULL);
283
284   memset (&sa_pipe, 0, sizeof (sa_pipe));
285   sa_pipe.sa_handler = SIG_IGN;
286   sigaction (SIGPIPE, &sa_pipe, NULL);
287
288   memset (&sa_pipe, 0, sizeof (sa_usr1));
289   sa_usr1.sa_handler = sig_usr1_handler;
290   sigaction (SIGUSR1, &sa_usr1, NULL);
291
292   memset (&sa_usr2, 0, sizeof (sa_usr2));
293   sa_usr2.sa_handler = sig_usr2_handler;
294   sigaction (SIGUSR2, &sa_usr2, NULL);
295
296 } /* }}} void install_signal_handlers */
297
298 static int open_pidfile(char *action, int oflag) /* {{{ */
299 {
300   int fd;
301   char *file;
302
303   file = (config_pid_file != NULL)
304     ? config_pid_file
305     : LOCALSTATEDIR "/run/rrdcached.pid";
306
307   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
308   if (fd < 0)
309     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
310             action, file, rrd_strerror(errno));
311
312   return(fd);
313 } /* }}} static int open_pidfile */
314
315 /* check existing pid file to see whether a daemon is running */
316 static int check_pidfile(void)
317 {
318   int pid_fd;
319   pid_t pid;
320   char pid_str[16];
321
322   pid_fd = open_pidfile("open", O_RDWR);
323   if (pid_fd < 0)
324     return pid_fd;
325
326   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
327     return -1;
328
329   pid = atoi(pid_str);
330   if (pid <= 0)
331     return -1;
332
333   /* another running process that we can signal COULD be
334    * a competing rrdcached */
335   if (pid != getpid() && kill(pid, 0) == 0)
336   {
337     fprintf(stderr,
338             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
339     close(pid_fd);
340     return -1;
341   }
342
343   lseek(pid_fd, 0, SEEK_SET);
344   ftruncate(pid_fd, 0);
345
346   fprintf(stderr,
347           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
348           "rrdcached: starting normally.\n", pid);
349
350   return pid_fd;
351 } /* }}} static int check_pidfile */
352
353 static int write_pidfile (int fd) /* {{{ */
354 {
355   pid_t pid;
356   FILE *fh;
357
358   pid = getpid ();
359
360   fh = fdopen (fd, "w");
361   if (fh == NULL)
362   {
363     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
364     close(fd);
365     return (-1);
366   }
367
368   fprintf (fh, "%i\n", (int) pid);
369   fclose (fh);
370
371   return (0);
372 } /* }}} int write_pidfile */
373
374 static int remove_pidfile (void) /* {{{ */
375 {
376   char *file;
377   int status;
378
379   file = (config_pid_file != NULL)
380     ? config_pid_file
381     : LOCALSTATEDIR "/run/rrdcached.pid";
382
383   status = unlink (file);
384   if (status == 0)
385     return (0);
386   return (errno);
387 } /* }}} int remove_pidfile */
388
389 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
390 {
391   char *eol;
392
393   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
394                sock->next_read - sock->next_cmd);
395
396   if (eol == NULL)
397   {
398     /* no commands left, move remainder back to front of rbuf */
399     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
400             sock->next_read - sock->next_cmd);
401     sock->next_read -= sock->next_cmd;
402     sock->next_cmd = 0;
403     *len = 0;
404     return NULL;
405   }
406   else
407   {
408     char *cmd = sock->rbuf + sock->next_cmd;
409     *eol = '\0';
410
411     sock->next_cmd = eol - sock->rbuf + 1;
412
413     if (eol > sock->rbuf && *(eol-1) == '\r')
414       *(--eol) = '\0'; /* handle "\r\n" EOL */
415
416     *len = eol - cmd;
417
418     return cmd;
419   }
420
421   /* NOTREACHED */
422   assert(1==0);
423 }
424
425 /* add the characters directly to the write buffer */
426 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
427 {
428   char *new_buf;
429
430   assert(sock != NULL);
431
432   new_buf = realloc(sock->wbuf, sock->wbuf_len + len + 1);
433   if (new_buf == NULL)
434   {
435     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
436     return -1;
437   }
438
439   strncpy(new_buf + sock->wbuf_len, str, len + 1);
440
441   sock->wbuf = new_buf;
442   sock->wbuf_len += len;
443
444   return 0;
445 } /* }}} static int add_to_wbuf */
446
447 /* add the text to the "extra" info that's sent after the status line */
448 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
449 {
450   va_list argp;
451   char buffer[CMD_MAX];
452   int len;
453
454   if (sock == NULL) return 0; /* journal replay mode */
455   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
456
457   va_start(argp, fmt);
458 #ifdef HAVE_VSNPRINTF
459   len = vsnprintf(buffer, sizeof(buffer)-1, fmt, argp);
460 #else
461   len = vsprintf(buffer, fmt, argp);
462 #endif
463   va_end(argp);
464   if (len < 0)
465   {
466     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
467     return -1;
468   }
469
470   return add_to_wbuf(sock, buffer, len);
471 } /* }}} static int add_response_info */
472
473 static int count_lines(char *str) /* {{{ */
474 {
475   int lines = 0;
476
477   if (str != NULL)
478   {
479     while ((str = strchr(str, '\n')) != NULL)
480     {
481       ++lines;
482       ++str;
483     }
484   }
485
486   return lines;
487 } /* }}} static int count_lines */
488
489 /* send the response back to the user.
490  * returns 0 on success, -1 on error
491  * write buffer is always zeroed after this call */
492 static int send_response (listen_socket_t *sock, response_code rc,
493                           char *fmt, ...) /* {{{ */
494 {
495   va_list argp;
496   char buffer[CMD_MAX];
497   int lines;
498   ssize_t wrote;
499   int rclen, len;
500
501   if (sock == NULL) return rc;  /* journal replay mode */
502
503   if (sock->batch_start)
504   {
505     if (rc == RESP_OK)
506       return rc; /* no response on success during BATCH */
507     lines = sock->batch_cmd;
508   }
509   else if (rc == RESP_OK)
510     lines = count_lines(sock->wbuf);
511   else
512     lines = -1;
513
514   rclen = sprintf(buffer, "%d ", lines);
515   va_start(argp, fmt);
516 #ifdef HAVE_VSNPRINTF
517   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen-1, fmt, argp);
518 #else
519   len = vsprintf(buffer+rclen, fmt, argp);
520 #endif
521   va_end(argp);
522   if (len < 0)
523     return -1;
524
525   len += rclen;
526
527   /* append the result to the wbuf, don't write to the user */
528   if (sock->batch_start)
529     return add_to_wbuf(sock, buffer, len);
530
531   /* first write must be complete */
532   if (len != write(sock->fd, buffer, len))
533   {
534     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
535     return -1;
536   }
537
538   if (sock->wbuf != NULL && rc == RESP_OK)
539   {
540     wrote = 0;
541     while (wrote < sock->wbuf_len)
542     {
543       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
544       if (wb <= 0)
545       {
546         RRDD_LOG(LOG_INFO, "send_response: could not write results");
547         return -1;
548       }
549       wrote += wb;
550     }
551   }
552
553   free(sock->wbuf); sock->wbuf = NULL;
554   sock->wbuf_len = 0;
555
556   return 0;
557 } /* }}} */
558
559 static void wipe_ci_values(cache_item_t *ci, time_t when)
560 {
561   ci->values = NULL;
562   ci->values_num = 0;
563
564   ci->last_flush_time = when;
565   if (config_write_jitter > 0)
566     ci->last_flush_time += (random() % config_write_jitter);
567 }
568
569 /* remove_from_queue
570  * remove a "cache_item_t" item from the queue.
571  * must hold 'cache_lock' when calling this
572  */
573 static void remove_from_queue(cache_item_t *ci) /* {{{ */
574 {
575   if (ci == NULL) return;
576   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
577
578   if (ci->prev == NULL)
579     cache_queue_head = ci->next; /* reset head */
580   else
581     ci->prev->next = ci->next;
582
583   if (ci->next == NULL)
584     cache_queue_tail = ci->prev; /* reset the tail */
585   else
586     ci->next->prev = ci->prev;
587
588   ci->next = ci->prev = NULL;
589   ci->flags &= ~CI_FLAGS_IN_QUEUE;
590 } /* }}} static void remove_from_queue */
591
592 /* free the resources associated with the cache_item_t
593  * must hold cache_lock when calling this function
594  */
595 static void *free_cache_item(cache_item_t *ci) /* {{{ */
596 {
597   if (ci == NULL) return NULL;
598
599   remove_from_queue(ci);
600
601   for (int i=0; i < ci->values_num; i++)
602     free(ci->values[i]);
603
604   free (ci->values);
605   free (ci->file);
606
607   /* in case anyone is waiting */
608   pthread_cond_broadcast(&ci->flushed);
609
610   free (ci);
611
612   return NULL;
613 } /* }}} static void *free_cache_item */
614
615 /*
616  * enqueue_cache_item:
617  * `cache_lock' must be acquired before calling this function!
618  */
619 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
620     queue_side_t side)
621 {
622   if (ci == NULL)
623     return (-1);
624
625   if (ci->values_num == 0)
626     return (0);
627
628   if (side == HEAD)
629   {
630     if (cache_queue_head == ci)
631       return 0;
632
633     /* remove if further down in queue */
634     remove_from_queue(ci);
635
636     ci->prev = NULL;
637     ci->next = cache_queue_head;
638     if (ci->next != NULL)
639       ci->next->prev = ci;
640     cache_queue_head = ci;
641
642     if (cache_queue_tail == NULL)
643       cache_queue_tail = cache_queue_head;
644   }
645   else /* (side == TAIL) */
646   {
647     /* We don't move values back in the list.. */
648     if (ci->flags & CI_FLAGS_IN_QUEUE)
649       return (0);
650
651     assert (ci->next == NULL);
652     assert (ci->prev == NULL);
653
654     ci->prev = cache_queue_tail;
655
656     if (cache_queue_tail == NULL)
657       cache_queue_head = ci;
658     else
659       cache_queue_tail->next = ci;
660
661     cache_queue_tail = ci;
662   }
663
664   ci->flags |= CI_FLAGS_IN_QUEUE;
665
666   pthread_cond_broadcast(&cache_cond);
667   pthread_mutex_lock (&stats_lock);
668   stats_queue_length++;
669   pthread_mutex_unlock (&stats_lock);
670
671   return (0);
672 } /* }}} int enqueue_cache_item */
673
674 /*
675  * tree_callback_flush:
676  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
677  * while this is in progress.
678  */
679 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
680     gpointer data)
681 {
682   cache_item_t *ci;
683   callback_flush_data_t *cfd;
684
685   ci = (cache_item_t *) value;
686   cfd = (callback_flush_data_t *) data;
687
688   if (ci->flags & CI_FLAGS_IN_QUEUE)
689     return FALSE;
690
691   if ((ci->last_flush_time <= cfd->abs_timeout)
692       && (ci->values_num > 0))
693   {
694     enqueue_cache_item (ci, TAIL);
695   }
696   else if ((do_shutdown != 0)
697       && (ci->values_num > 0))
698   {
699     enqueue_cache_item (ci, TAIL);
700   }
701   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
702       && (ci->values_num <= 0))
703   {
704     char **temp;
705
706     temp = (char **) realloc (cfd->keys,
707         sizeof (char *) * (cfd->keys_num + 1));
708     if (temp == NULL)
709     {
710       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
711       return (FALSE);
712     }
713     cfd->keys = temp;
714     /* Make really sure this points to the _same_ place */
715     assert ((char *) key == ci->file);
716     cfd->keys[cfd->keys_num] = (char *) key;
717     cfd->keys_num++;
718   }
719
720   return (FALSE);
721 } /* }}} gboolean tree_callback_flush */
722
723 static int flush_old_values (int max_age)
724 {
725   callback_flush_data_t cfd;
726   size_t k;
727
728   memset (&cfd, 0, sizeof (cfd));
729   /* Pass the current time as user data so that we don't need to call
730    * `time' for each node. */
731   cfd.now = time (NULL);
732   cfd.keys = NULL;
733   cfd.keys_num = 0;
734
735   if (max_age > 0)
736     cfd.abs_timeout = cfd.now - max_age;
737   else
738     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
739
740   /* `tree_callback_flush' will return the keys of all values that haven't
741    * been touched in the last `config_flush_interval' seconds in `cfd'.
742    * The char*'s in this array point to the same memory as ci->file, so we
743    * don't need to free them separately. */
744   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
745
746   for (k = 0; k < cfd.keys_num; k++)
747   {
748     /* should never fail, since we have held the cache_lock
749      * the entire time */
750     assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
751   }
752
753   if (cfd.keys != NULL)
754   {
755     free (cfd.keys);
756     cfd.keys = NULL;
757   }
758
759   return (0);
760 } /* int flush_old_values */
761
762 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
763 {
764   struct timeval now;
765   struct timespec next_flush;
766   int final_flush = 0; /* make sure we only flush once on shutdown */
767
768   gettimeofday (&now, NULL);
769   next_flush.tv_sec = now.tv_sec + config_flush_interval;
770   next_flush.tv_nsec = 1000 * now.tv_usec;
771
772   pthread_mutex_lock (&cache_lock);
773   while ((do_shutdown == 0) || (cache_queue_head != NULL))
774   {
775     cache_item_t *ci;
776     char *file;
777     char **values;
778     int values_num;
779     int status;
780     int i;
781
782     /* First, check if it's time to do the cache flush. */
783     gettimeofday (&now, NULL);
784     if ((now.tv_sec > next_flush.tv_sec)
785         || ((now.tv_sec == next_flush.tv_sec)
786           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
787     {
788       /* Flush all values that haven't been written in the last
789        * `config_write_interval' seconds. */
790       flush_old_values (config_write_interval);
791
792       /* Determine the time of the next cache flush. */
793       next_flush.tv_sec =
794         now.tv_sec + next_flush.tv_sec % config_flush_interval;
795
796       /* unlock the cache while we rotate so we don't block incoming
797        * updates if the fsync() blocks on disk I/O */
798       pthread_mutex_unlock(&cache_lock);
799       journal_rotate();
800       pthread_mutex_lock(&cache_lock);
801     }
802
803     /* Now, check if there's something to store away. If not, wait until
804      * something comes in or it's time to do the cache flush.  if we are
805      * shutting down, do not wait around.  */
806     if (cache_queue_head == NULL && !do_shutdown)
807     {
808       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
809       if ((status != 0) && (status != ETIMEDOUT))
810       {
811         RRDD_LOG (LOG_ERR, "queue_thread_main: "
812             "pthread_cond_timedwait returned %i.", status);
813       }
814     }
815
816     /* We're about to shut down */
817     if (do_shutdown != 0 && !final_flush++)
818     {
819       if (config_flush_at_shutdown)
820         flush_old_values (-1); /* flush everything */
821       else
822         break;
823     }
824
825     /* Check if a value has arrived. This may be NULL if we timed out or there
826      * was an interrupt such as a signal. */
827     if (cache_queue_head == NULL)
828       continue;
829
830     ci = cache_queue_head;
831
832     /* copy the relevant parts */
833     file = strdup (ci->file);
834     if (file == NULL)
835     {
836       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
837       continue;
838     }
839
840     assert(ci->values != NULL);
841     assert(ci->values_num > 0);
842
843     values = ci->values;
844     values_num = ci->values_num;
845
846     wipe_ci_values(ci, time(NULL));
847     remove_from_queue(ci);
848
849     pthread_mutex_lock (&stats_lock);
850     assert (stats_queue_length > 0);
851     stats_queue_length--;
852     pthread_mutex_unlock (&stats_lock);
853
854     pthread_mutex_unlock (&cache_lock);
855
856     rrd_clear_error ();
857     status = rrd_update_r (file, NULL, values_num, (void *) values);
858     if (status != 0)
859     {
860       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
861           "rrd_update_r (%s) failed with status %i. (%s)",
862           file, status, rrd_get_error());
863     }
864
865     journal_write("wrote", file);
866     pthread_cond_broadcast(&ci->flushed);
867
868     for (i = 0; i < values_num; i++)
869       free (values[i]);
870
871     free(values);
872     free(file);
873
874     if (status == 0)
875     {
876       pthread_mutex_lock (&stats_lock);
877       stats_updates_written++;
878       stats_data_sets_written += values_num;
879       pthread_mutex_unlock (&stats_lock);
880     }
881
882     pthread_mutex_lock (&cache_lock);
883
884     /* We're about to shut down */
885     if (do_shutdown != 0 && !final_flush++)
886     {
887       if (config_flush_at_shutdown)
888           flush_old_values (-1); /* flush everything */
889       else
890         break;
891     }
892   } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
893   pthread_mutex_unlock (&cache_lock);
894
895   if (config_flush_at_shutdown)
896   {
897     assert(cache_queue_head == NULL);
898     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
899   }
900
901   journal_done();
902
903   return (NULL);
904 } /* }}} void *queue_thread_main */
905
906 static int buffer_get_field (char **buffer_ret, /* {{{ */
907     size_t *buffer_size_ret, char **field_ret)
908 {
909   char *buffer;
910   size_t buffer_pos;
911   size_t buffer_size;
912   char *field;
913   size_t field_size;
914   int status;
915
916   buffer = *buffer_ret;
917   buffer_pos = 0;
918   buffer_size = *buffer_size_ret;
919   field = *buffer_ret;
920   field_size = 0;
921
922   if (buffer_size <= 0)
923     return (-1);
924
925   /* This is ensured by `handle_request'. */
926   assert (buffer[buffer_size - 1] == '\0');
927
928   status = -1;
929   while (buffer_pos < buffer_size)
930   {
931     /* Check for end-of-field or end-of-buffer */
932     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
933     {
934       field[field_size] = 0;
935       field_size++;
936       buffer_pos++;
937       status = 0;
938       break;
939     }
940     /* Handle escaped characters. */
941     else if (buffer[buffer_pos] == '\\')
942     {
943       if (buffer_pos >= (buffer_size - 1))
944         break;
945       buffer_pos++;
946       field[field_size] = buffer[buffer_pos];
947       field_size++;
948       buffer_pos++;
949     }
950     /* Normal operation */ 
951     else
952     {
953       field[field_size] = buffer[buffer_pos];
954       field_size++;
955       buffer_pos++;
956     }
957   } /* while (buffer_pos < buffer_size) */
958
959   if (status != 0)
960     return (status);
961
962   *buffer_ret = buffer + buffer_pos;
963   *buffer_size_ret = buffer_size - buffer_pos;
964   *field_ret = field;
965
966   return (0);
967 } /* }}} int buffer_get_field */
968
969 /* if we're restricting writes to the base directory,
970  * check whether the file falls within the dir
971  * returns 1 if OK, otherwise 0
972  */
973 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
974 {
975   assert(file != NULL);
976
977   if (!config_write_base_only
978       || sock == NULL /* journal replay */
979       || config_base_dir == NULL)
980     return 1;
981
982   if (strstr(file, "../") != NULL) goto err;
983
984   /* relative paths without "../" are ok */
985   if (*file != '/') return 1;
986
987   /* file must be of the format base + "/" + <1+ char filename> */
988   if (strlen(file) < _config_base_dir_len + 2) goto err;
989   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
990   if (*(file + _config_base_dir_len) != '/') goto err;
991
992   return 1;
993
994 err:
995   if (sock != NULL && sock->fd >= 0)
996     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
997
998   return 0;
999 } /* }}} static int check_file_access */
1000
1001 /* when using a base dir, convert relative paths to absolute paths.
1002  * if necessary, modifies the "filename" pointer to point
1003  * to the new path created in "tmp".  "tmp" is provided
1004  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1005  *
1006  * this allows us to optimize for the expected case (absolute path)
1007  * with a no-op.
1008  */
1009 static void get_abs_path(char **filename, char *tmp)
1010 {
1011   assert(tmp != NULL);
1012   assert(filename != NULL && *filename != NULL);
1013
1014   if (config_base_dir == NULL || **filename == '/')
1015     return;
1016
1017   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1018   *filename = tmp;
1019 } /* }}} static int get_abs_path */
1020
1021 /* returns 1 if we have the required privilege level,
1022  * otherwise issue an error to the user on sock */
1023 static int has_privilege (listen_socket_t *sock, /* {{{ */
1024                           socket_privilege priv)
1025 {
1026   if (sock == NULL) /* journal replay */
1027     return 1;
1028
1029   if (sock->privilege >= priv)
1030     return 1;
1031
1032   return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1033 } /* }}} static int has_privilege */
1034
1035 static int flush_file (const char *filename) /* {{{ */
1036 {
1037   cache_item_t *ci;
1038
1039   pthread_mutex_lock (&cache_lock);
1040
1041   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1042   if (ci == NULL)
1043   {
1044     pthread_mutex_unlock (&cache_lock);
1045     return (ENOENT);
1046   }
1047
1048   if (ci->values_num > 0)
1049   {
1050     /* Enqueue at head */
1051     enqueue_cache_item (ci, HEAD);
1052     pthread_cond_wait(&ci->flushed, &cache_lock);
1053   }
1054
1055   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1056    * may have been purged during our cond_wait() */
1057
1058   pthread_mutex_unlock(&cache_lock);
1059
1060   return (0);
1061 } /* }}} int flush_file */
1062
1063 static int handle_request_help (listen_socket_t *sock, /* {{{ */
1064     char *buffer, size_t buffer_size)
1065 {
1066   int status;
1067   char **help_text;
1068   char *command;
1069
1070   char *help_help[2] =
1071   {
1072     "Command overview\n"
1073     ,
1074     "HELP [<command>]\n"
1075     "FLUSH <filename>\n"
1076     "FLUSHALL\n"
1077     "PENDING <filename>\n"
1078     "FORGET <filename>\n"
1079     "UPDATE <filename> <values> [<values> ...]\n"
1080     "BATCH\n"
1081     "STATS\n"
1082   };
1083
1084   char *help_flush[2] =
1085   {
1086     "Help for FLUSH\n"
1087     ,
1088     "Usage: FLUSH <filename>\n"
1089     "\n"
1090     "Adds the given filename to the head of the update queue and returns\n"
1091     "after is has been dequeued.\n"
1092   };
1093
1094   char *help_flushall[2] =
1095   {
1096     "Help for FLUSHALL\n"
1097     ,
1098     "Usage: FLUSHALL\n"
1099     "\n"
1100     "Triggers writing of all pending updates.  Returns immediately.\n"
1101   };
1102
1103   char *help_pending[2] =
1104   {
1105     "Help for PENDING\n"
1106     ,
1107     "Usage: PENDING <filename>\n"
1108     "\n"
1109     "Shows any 'pending' updates for a file, in order.\n"
1110     "The updates shown have not yet been written to the underlying RRD file.\n"
1111   };
1112
1113   char *help_forget[2] =
1114   {
1115     "Help for FORGET\n"
1116     ,
1117     "Usage: FORGET <filename>\n"
1118     "\n"
1119     "Removes the file completely from the cache.\n"
1120     "Any pending updates for the file will be lost.\n"
1121   };
1122
1123   char *help_update[2] =
1124   {
1125     "Help for UPDATE\n"
1126     ,
1127     "Usage: UPDATE <filename> <values> [<values> ...]\n"
1128     "\n"
1129     "Adds the given file to the internal cache if it is not yet known and\n"
1130     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1131     "for details.\n"
1132     "\n"
1133     "Each <values> has the following form:\n"
1134     "  <values> = <time>:<value>[:<value>[...]]\n"
1135     "See the rrdupdate(1) manpage for details.\n"
1136   };
1137
1138   char *help_stats[2] =
1139   {
1140     "Help for STATS\n"
1141     ,
1142     "Usage: STATS\n"
1143     "\n"
1144     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1145     "a description of the values.\n"
1146   };
1147
1148   char *help_batch[2] =
1149   {
1150     "Help for BATCH\n"
1151     ,
1152     "The 'BATCH' command permits the client to initiate a bulk load\n"
1153     "   of commands to rrdcached.\n"
1154     "\n"
1155     "Usage:\n"
1156     "\n"
1157     "    client: BATCH\n"
1158     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1159     "    client: command #1\n"
1160     "    client: command #2\n"
1161     "    client: ... and so on\n"
1162     "    client: .\n"
1163     "    server: 2 errors\n"
1164     "    server: 7 message for command #7\n"
1165     "    server: 9 message for command #9\n"
1166     "\n"
1167     "For more information, consult the rrdcached(1) documentation.\n"
1168   };
1169
1170   status = buffer_get_field (&buffer, &buffer_size, &command);
1171   if (status != 0)
1172     help_text = help_help;
1173   else
1174   {
1175     if (strcasecmp (command, "update") == 0)
1176       help_text = help_update;
1177     else if (strcasecmp (command, "flush") == 0)
1178       help_text = help_flush;
1179     else if (strcasecmp (command, "flushall") == 0)
1180       help_text = help_flushall;
1181     else if (strcasecmp (command, "pending") == 0)
1182       help_text = help_pending;
1183     else if (strcasecmp (command, "forget") == 0)
1184       help_text = help_forget;
1185     else if (strcasecmp (command, "stats") == 0)
1186       help_text = help_stats;
1187     else if (strcasecmp (command, "batch") == 0)
1188       help_text = help_batch;
1189     else
1190       help_text = help_help;
1191   }
1192
1193   add_response_info(sock, help_text[1]);
1194   return send_response(sock, RESP_OK, help_text[0]);
1195 } /* }}} int handle_request_help */
1196
1197 static int handle_request_stats (listen_socket_t *sock) /* {{{ */
1198 {
1199   uint64_t copy_queue_length;
1200   uint64_t copy_updates_received;
1201   uint64_t copy_flush_received;
1202   uint64_t copy_updates_written;
1203   uint64_t copy_data_sets_written;
1204   uint64_t copy_journal_bytes;
1205   uint64_t copy_journal_rotate;
1206
1207   uint64_t tree_nodes_number;
1208   uint64_t tree_depth;
1209
1210   pthread_mutex_lock (&stats_lock);
1211   copy_queue_length       = stats_queue_length;
1212   copy_updates_received   = stats_updates_received;
1213   copy_flush_received     = stats_flush_received;
1214   copy_updates_written    = stats_updates_written;
1215   copy_data_sets_written  = stats_data_sets_written;
1216   copy_journal_bytes      = stats_journal_bytes;
1217   copy_journal_rotate     = stats_journal_rotate;
1218   pthread_mutex_unlock (&stats_lock);
1219
1220   pthread_mutex_lock (&cache_lock);
1221   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1222   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1223   pthread_mutex_unlock (&cache_lock);
1224
1225   add_response_info(sock,
1226                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1227   add_response_info(sock,
1228                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1229   add_response_info(sock,
1230                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1231   add_response_info(sock,
1232                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1233   add_response_info(sock,
1234                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1235   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1236   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1237   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1238   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1239
1240   send_response(sock, RESP_OK, "Statistics follow\n");
1241
1242   return (0);
1243 } /* }}} int handle_request_stats */
1244
1245 static int handle_request_flush (listen_socket_t *sock, /* {{{ */
1246     char *buffer, size_t buffer_size)
1247 {
1248   char *file, file_tmp[PATH_MAX];
1249   int status;
1250
1251   status = buffer_get_field (&buffer, &buffer_size, &file);
1252   if (status != 0)
1253   {
1254     return send_response(sock, RESP_ERR, "Usage: flush <filename>\n");
1255   }
1256   else
1257   {
1258     pthread_mutex_lock(&stats_lock);
1259     stats_flush_received++;
1260     pthread_mutex_unlock(&stats_lock);
1261
1262     get_abs_path(&file, file_tmp);
1263     if (!check_file_access(file, sock)) return 0;
1264
1265     status = flush_file (file);
1266     if (status == 0)
1267       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1268     else if (status == ENOENT)
1269     {
1270       /* no file in our tree; see whether it exists at all */
1271       struct stat statbuf;
1272
1273       memset(&statbuf, 0, sizeof(statbuf));
1274       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1275         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1276       else
1277         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1278     }
1279     else if (status < 0)
1280       return send_response(sock, RESP_ERR, "Internal error.\n");
1281     else
1282       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1283   }
1284
1285   /* NOTREACHED */
1286   assert(1==0);
1287 } /* }}} int handle_request_flush */
1288
1289 static int handle_request_flushall(listen_socket_t *sock) /* {{{ */
1290 {
1291   int status;
1292
1293   status = has_privilege(sock, PRIV_HIGH);
1294   if (status <= 0)
1295     return status;
1296
1297   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1298
1299   pthread_mutex_lock(&cache_lock);
1300   flush_old_values(-1);
1301   pthread_mutex_unlock(&cache_lock);
1302
1303   return send_response(sock, RESP_OK, "Started flush.\n");
1304 } /* }}} static int handle_request_flushall */
1305
1306 static int handle_request_pending(listen_socket_t *sock, /* {{{ */
1307                                   char *buffer, size_t buffer_size)
1308 {
1309   int status;
1310   char *file, file_tmp[PATH_MAX];
1311   cache_item_t *ci;
1312
1313   status = buffer_get_field(&buffer, &buffer_size, &file);
1314   if (status != 0)
1315     return send_response(sock, RESP_ERR,
1316                          "Usage: PENDING <filename>\n");
1317
1318   status = has_privilege(sock, PRIV_HIGH);
1319   if (status <= 0)
1320     return status;
1321
1322   get_abs_path(&file, file_tmp);
1323
1324   pthread_mutex_lock(&cache_lock);
1325   ci = g_tree_lookup(cache_tree, file);
1326   if (ci == NULL)
1327   {
1328     pthread_mutex_unlock(&cache_lock);
1329     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1330   }
1331
1332   for (int i=0; i < ci->values_num; i++)
1333     add_response_info(sock, "%s\n", ci->values[i]);
1334
1335   pthread_mutex_unlock(&cache_lock);
1336   return send_response(sock, RESP_OK, "updates pending\n");
1337 } /* }}} static int handle_request_pending */
1338
1339 static int handle_request_forget(listen_socket_t *sock, /* {{{ */
1340                                  char *buffer, size_t buffer_size)
1341 {
1342   int status;
1343   gboolean found;
1344   char *file, file_tmp[PATH_MAX];
1345
1346   status = buffer_get_field(&buffer, &buffer_size, &file);
1347   if (status != 0)
1348     return send_response(sock, RESP_ERR,
1349                          "Usage: FORGET <filename>\n");
1350
1351   status = has_privilege(sock, PRIV_HIGH);
1352   if (status <= 0)
1353     return status;
1354
1355   get_abs_path(&file, file_tmp);
1356   if (!check_file_access(file, sock)) return 0;
1357
1358   pthread_mutex_lock(&cache_lock);
1359   found = g_tree_remove(cache_tree, file);
1360   pthread_mutex_unlock(&cache_lock);
1361
1362   if (found == TRUE)
1363   {
1364     if (sock != NULL)
1365       journal_write("forget", file);
1366
1367     return send_response(sock, RESP_OK, "Gone!\n");
1368   }
1369   else
1370     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1371
1372   /* NOTREACHED */
1373   assert(1==0);
1374 } /* }}} static int handle_request_forget */
1375
1376 static int handle_request_update (listen_socket_t *sock, /* {{{ */
1377                                   time_t now,
1378                                   char *buffer, size_t buffer_size)
1379 {
1380   char *file, file_tmp[PATH_MAX];
1381   int values_num = 0;
1382   int status;
1383   char orig_buf[CMD_MAX];
1384
1385   cache_item_t *ci;
1386
1387   status = has_privilege(sock, PRIV_HIGH);
1388   if (status <= 0)
1389     return status;
1390
1391   /* save it for the journal later */
1392   strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1393
1394   status = buffer_get_field (&buffer, &buffer_size, &file);
1395   if (status != 0)
1396     return send_response(sock, RESP_ERR,
1397                          "Usage: UPDATE <filename> <values> [<values> ...]\n");
1398
1399   pthread_mutex_lock(&stats_lock);
1400   stats_updates_received++;
1401   pthread_mutex_unlock(&stats_lock);
1402
1403   get_abs_path(&file, file_tmp);
1404   if (!check_file_access(file, sock)) return 0;
1405
1406   pthread_mutex_lock (&cache_lock);
1407   ci = g_tree_lookup (cache_tree, file);
1408
1409   if (ci == NULL) /* {{{ */
1410   {
1411     struct stat statbuf;
1412
1413     /* don't hold the lock while we setup; stat(2) might block */
1414     pthread_mutex_unlock(&cache_lock);
1415
1416     memset (&statbuf, 0, sizeof (statbuf));
1417     status = stat (file, &statbuf);
1418     if (status != 0)
1419     {
1420       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1421
1422       status = errno;
1423       if (status == ENOENT)
1424         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1425       else
1426         return send_response(sock, RESP_ERR,
1427                              "stat failed with error %i.\n", status);
1428     }
1429     if (!S_ISREG (statbuf.st_mode))
1430       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1431
1432     if (access(file, R_OK|W_OK) != 0)
1433       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1434                            file, rrd_strerror(errno));
1435
1436     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1437     if (ci == NULL)
1438     {
1439       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1440
1441       return send_response(sock, RESP_ERR, "malloc failed.\n");
1442     }
1443     memset (ci, 0, sizeof (cache_item_t));
1444
1445     ci->file = strdup (file);
1446     if (ci->file == NULL)
1447     {
1448       free (ci);
1449       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1450
1451       return send_response(sock, RESP_ERR, "strdup failed.\n");
1452     }
1453
1454     wipe_ci_values(ci, now);
1455     ci->flags = CI_FLAGS_IN_TREE;
1456     pthread_cond_init(&ci->flushed, NULL);
1457
1458     pthread_mutex_lock(&cache_lock);
1459     g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1460   } /* }}} */
1461   assert (ci != NULL);
1462
1463   /* don't re-write updates in replay mode */
1464   if (sock != NULL)
1465     journal_write("update", orig_buf);
1466
1467   while (buffer_size > 0)
1468   {
1469     char **temp;
1470     char *value;
1471     time_t stamp;
1472     char *eostamp;
1473
1474     status = buffer_get_field (&buffer, &buffer_size, &value);
1475     if (status != 0)
1476     {
1477       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1478       break;
1479     }
1480
1481     /* make sure update time is always moving forward */
1482     stamp = strtol(value, &eostamp, 10);
1483     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1484     {
1485       pthread_mutex_unlock(&cache_lock);
1486       return send_response(sock, RESP_ERR,
1487                            "Cannot find timestamp in '%s'!\n", value);
1488     }
1489     else if (stamp <= ci->last_update_stamp)
1490     {
1491       pthread_mutex_unlock(&cache_lock);
1492       return send_response(sock, RESP_ERR,
1493                            "illegal attempt to update using time %ld when last"
1494                            " update time is %ld (minimum one second step)\n",
1495                            stamp, ci->last_update_stamp);
1496     }
1497     else
1498       ci->last_update_stamp = stamp;
1499
1500     temp = (char **) realloc (ci->values,
1501         sizeof (char *) * (ci->values_num + 1));
1502     if (temp == NULL)
1503     {
1504       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1505       continue;
1506     }
1507     ci->values = temp;
1508
1509     ci->values[ci->values_num] = strdup (value);
1510     if (ci->values[ci->values_num] == NULL)
1511     {
1512       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1513       continue;
1514     }
1515     ci->values_num++;
1516
1517     values_num++;
1518   }
1519
1520   if (((now - ci->last_flush_time) >= config_write_interval)
1521       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1522       && (ci->values_num > 0))
1523   {
1524     enqueue_cache_item (ci, TAIL);
1525   }
1526
1527   pthread_mutex_unlock (&cache_lock);
1528
1529   if (values_num < 1)
1530     return send_response(sock, RESP_ERR, "No values updated.\n");
1531   else
1532     return send_response(sock, RESP_OK,
1533                          "errors, enqueued %i value(s).\n", values_num);
1534
1535   /* NOTREACHED */
1536   assert(1==0);
1537
1538 } /* }}} int handle_request_update */
1539
1540 /* we came across a "WROTE" entry during journal replay.
1541  * throw away any values that we have accumulated for this file
1542  */
1543 static int handle_request_wrote (const char *buffer, time_t now) /* {{{ */
1544 {
1545   int i;
1546   cache_item_t *ci;
1547   const char *file = buffer;
1548
1549   pthread_mutex_lock(&cache_lock);
1550
1551   ci = g_tree_lookup(cache_tree, file);
1552   if (ci == NULL)
1553   {
1554     pthread_mutex_unlock(&cache_lock);
1555     return (0);
1556   }
1557
1558   if (ci->values)
1559   {
1560     for (i=0; i < ci->values_num; i++)
1561       free(ci->values[i]);
1562
1563     free(ci->values);
1564   }
1565
1566   wipe_ci_values(ci, now);
1567   remove_from_queue(ci);
1568
1569   pthread_mutex_unlock(&cache_lock);
1570   return (0);
1571 } /* }}} int handle_request_wrote */
1572
1573 /* start "BATCH" processing */
1574 static int batch_start (listen_socket_t *sock) /* {{{ */
1575 {
1576   int status;
1577   if (sock->batch_start)
1578     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1579
1580   status = send_response(sock, RESP_OK,
1581                          "Go ahead.  End with dot '.' on its own line.\n");
1582   sock->batch_start = time(NULL);
1583   sock->batch_cmd = 0;
1584
1585   return status;
1586 } /* }}} static int batch_start */
1587
1588 /* finish "BATCH" processing and return results to the client */
1589 static int batch_done (listen_socket_t *sock) /* {{{ */
1590 {
1591   assert(sock->batch_start);
1592   sock->batch_start = 0;
1593   sock->batch_cmd  = 0;
1594   return send_response(sock, RESP_OK, "errors\n");
1595 } /* }}} static int batch_done */
1596
1597 /* if sock==NULL, we are in journal replay mode */
1598 static int handle_request (listen_socket_t *sock, /* {{{ */
1599                            time_t now,
1600                            char *buffer, size_t buffer_size)
1601 {
1602   char *buffer_ptr;
1603   char *command;
1604   int status;
1605
1606   assert (buffer[buffer_size - 1] == '\0');
1607
1608   buffer_ptr = buffer;
1609   command = NULL;
1610   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1611   if (status != 0)
1612   {
1613     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1614     return (-1);
1615   }
1616
1617   if (sock != NULL && sock->batch_start)
1618     sock->batch_cmd++;
1619
1620   if (strcasecmp (command, "update") == 0)
1621     return (handle_request_update (sock, now, buffer_ptr, buffer_size));
1622   else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
1623   {
1624     /* this is only valid in replay mode */
1625     return (handle_request_wrote (buffer_ptr, now));
1626   }
1627   else if (strcasecmp (command, "flush") == 0)
1628     return (handle_request_flush (sock, buffer_ptr, buffer_size));
1629   else if (strcasecmp (command, "flushall") == 0)
1630     return (handle_request_flushall(sock));
1631   else if (strcasecmp (command, "pending") == 0)
1632     return (handle_request_pending(sock, buffer_ptr, buffer_size));
1633   else if (strcasecmp (command, "forget") == 0)
1634     return (handle_request_forget(sock, buffer_ptr, buffer_size));
1635   else if (strcasecmp (command, "stats") == 0)
1636     return (handle_request_stats (sock));
1637   else if (strcasecmp (command, "help") == 0)
1638     return (handle_request_help (sock, buffer_ptr, buffer_size));
1639   else if (strcasecmp (command, "batch") == 0 && sock != NULL)
1640     return batch_start(sock);
1641   else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_start)
1642     return batch_done(sock);
1643   else if (strcasecmp (command, "quit") == 0)
1644     return -1;
1645   else
1646     return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
1647
1648   /* NOTREACHED */
1649   assert(1==0);
1650 } /* }}} int handle_request */
1651
1652 /* MUST NOT hold journal_lock before calling this */
1653 static void journal_rotate(void) /* {{{ */
1654 {
1655   FILE *old_fh = NULL;
1656   int new_fd;
1657
1658   if (journal_cur == NULL || journal_old == NULL)
1659     return;
1660
1661   pthread_mutex_lock(&journal_lock);
1662
1663   /* we rotate this way (rename before close) so that the we can release
1664    * the journal lock as fast as possible.  Journal writes to the new
1665    * journal can proceed immediately after the new file is opened.  The
1666    * fclose can then block without affecting new updates.
1667    */
1668   if (journal_fh != NULL)
1669   {
1670     old_fh = journal_fh;
1671     journal_fh = NULL;
1672     rename(journal_cur, journal_old);
1673     ++stats_journal_rotate;
1674   }
1675
1676   new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1677                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1678   if (new_fd >= 0)
1679   {
1680     journal_fh = fdopen(new_fd, "a");
1681     if (journal_fh == NULL)
1682       close(new_fd);
1683   }
1684
1685   pthread_mutex_unlock(&journal_lock);
1686
1687   if (old_fh != NULL)
1688     fclose(old_fh);
1689
1690   if (journal_fh == NULL)
1691   {
1692     RRDD_LOG(LOG_CRIT,
1693              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1694              journal_cur, rrd_strerror(errno));
1695
1696     RRDD_LOG(LOG_ERR,
1697              "JOURNALING DISABLED: All values will be flushed at shutdown");
1698     config_flush_at_shutdown = 1;
1699   }
1700
1701 } /* }}} static void journal_rotate */
1702
1703 static void journal_done(void) /* {{{ */
1704 {
1705   if (journal_cur == NULL)
1706     return;
1707
1708   pthread_mutex_lock(&journal_lock);
1709   if (journal_fh != NULL)
1710   {
1711     fclose(journal_fh);
1712     journal_fh = NULL;
1713   }
1714
1715   if (config_flush_at_shutdown)
1716   {
1717     RRDD_LOG(LOG_INFO, "removing journals");
1718     unlink(journal_old);
1719     unlink(journal_cur);
1720   }
1721   else
1722   {
1723     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1724              "journals will be used at next startup");
1725   }
1726
1727   pthread_mutex_unlock(&journal_lock);
1728
1729 } /* }}} static void journal_done */
1730
1731 static int journal_write(char *cmd, char *args) /* {{{ */
1732 {
1733   int chars;
1734
1735   if (journal_fh == NULL)
1736     return 0;
1737
1738   pthread_mutex_lock(&journal_lock);
1739   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1740   pthread_mutex_unlock(&journal_lock);
1741
1742   if (chars > 0)
1743   {
1744     pthread_mutex_lock(&stats_lock);
1745     stats_journal_bytes += chars;
1746     pthread_mutex_unlock(&stats_lock);
1747   }
1748
1749   return chars;
1750 } /* }}} static int journal_write */
1751
1752 static int journal_replay (const char *file) /* {{{ */
1753 {
1754   FILE *fh;
1755   int entry_cnt = 0;
1756   int fail_cnt = 0;
1757   uint64_t line = 0;
1758   char entry[CMD_MAX];
1759   time_t now;
1760
1761   if (file == NULL) return 0;
1762
1763   {
1764     char *reason = "unknown error";
1765     int status = 0;
1766     struct stat statbuf;
1767
1768     memset(&statbuf, 0, sizeof(statbuf));
1769     if (stat(file, &statbuf) != 0)
1770     {
1771       if (errno == ENOENT)
1772         return 0;
1773
1774       reason = "stat error";
1775       status = errno;
1776     }
1777     else if (!S_ISREG(statbuf.st_mode))
1778     {
1779       reason = "not a regular file";
1780       status = EPERM;
1781     }
1782     if (statbuf.st_uid != daemon_uid)
1783     {
1784       reason = "not owned by daemon user";
1785       status = EACCES;
1786     }
1787     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1788     {
1789       reason = "must not be user/group writable";
1790       status = EACCES;
1791     }
1792
1793     if (status != 0)
1794     {
1795       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1796                file, rrd_strerror(status), reason);
1797       return 0;
1798     }
1799   }
1800
1801   fh = fopen(file, "r");
1802   if (fh == NULL)
1803   {
1804     if (errno != ENOENT)
1805       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1806                file, rrd_strerror(errno));
1807     return 0;
1808   }
1809   else
1810     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1811
1812   now = time(NULL);
1813
1814   while(!feof(fh))
1815   {
1816     size_t entry_len;
1817
1818     ++line;
1819     if (fgets(entry, sizeof(entry), fh) == NULL)
1820       break;
1821     entry_len = strlen(entry);
1822
1823     /* check \n termination in case journal writing crashed mid-line */
1824     if (entry_len == 0)
1825       continue;
1826     else if (entry[entry_len - 1] != '\n')
1827     {
1828       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1829       ++fail_cnt;
1830       continue;
1831     }
1832
1833     entry[entry_len - 1] = '\0';
1834
1835     if (handle_request(NULL, now, entry, entry_len) == 0)
1836       ++entry_cnt;
1837     else
1838       ++fail_cnt;
1839   }
1840
1841   fclose(fh);
1842
1843   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1844            entry_cnt, fail_cnt);
1845
1846   return entry_cnt > 0 ? 1 : 0;
1847 } /* }}} static int journal_replay */
1848
1849 static void journal_init(void) /* {{{ */
1850 {
1851   int had_journal = 0;
1852
1853   if (journal_cur == NULL) return;
1854
1855   pthread_mutex_lock(&journal_lock);
1856
1857   RRDD_LOG(LOG_INFO, "checking for journal files");
1858
1859   had_journal += journal_replay(journal_old);
1860   had_journal += journal_replay(journal_cur);
1861
1862   /* it must have been a crash.  start a flush */
1863   if (had_journal && config_flush_at_shutdown)
1864     flush_old_values(-1);
1865
1866   pthread_mutex_unlock(&journal_lock);
1867   journal_rotate();
1868
1869   RRDD_LOG(LOG_INFO, "journal processing complete");
1870
1871 } /* }}} static void journal_init */
1872
1873 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
1874 {
1875   assert(sock != NULL);
1876
1877   free(sock->rbuf);  sock->rbuf = NULL;
1878   free(sock->wbuf);  sock->wbuf = NULL;
1879   free(sock);
1880 } /* }}} void free_listen_socket */
1881
1882 static void close_connection(listen_socket_t *sock) /* {{{ */
1883 {
1884   if (sock->fd >= 0)
1885   {
1886     close(sock->fd);
1887     sock->fd = -1;
1888   }
1889
1890   free_listen_socket(sock);
1891
1892 } /* }}} void close_connection */
1893
1894 static void *connection_thread_main (void *args) /* {{{ */
1895 {
1896   listen_socket_t *sock;
1897   int i;
1898   int fd;
1899
1900   sock = (listen_socket_t *) args;
1901   fd = sock->fd;
1902
1903   /* init read buffers */
1904   sock->next_read = sock->next_cmd = 0;
1905   sock->rbuf = malloc(RBUF_SIZE);
1906   if (sock->rbuf == NULL)
1907   {
1908     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1909     close_connection(sock);
1910     return NULL;
1911   }
1912
1913   pthread_mutex_lock (&connection_threads_lock);
1914   {
1915     pthread_t *temp;
1916
1917     temp = (pthread_t *) realloc (connection_threads,
1918         sizeof (pthread_t) * (connection_threads_num + 1));
1919     if (temp == NULL)
1920     {
1921       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc(++) failed.");
1922     }
1923     else
1924     {
1925       connection_threads = temp;
1926       connection_threads[connection_threads_num] = pthread_self ();
1927       connection_threads_num++;
1928     }
1929   }
1930   pthread_mutex_unlock (&connection_threads_lock);
1931
1932   while (do_shutdown == 0)
1933   {
1934     char *cmd;
1935     ssize_t cmd_len;
1936     ssize_t rbytes;
1937     time_t now;
1938
1939     struct pollfd pollfd;
1940     int status;
1941
1942     pollfd.fd = fd;
1943     pollfd.events = POLLIN | POLLPRI;
1944     pollfd.revents = 0;
1945
1946     status = poll (&pollfd, 1, /* timeout = */ 500);
1947     if (do_shutdown)
1948       break;
1949     else if (status == 0) /* timeout */
1950       continue;
1951     else if (status < 0) /* error */
1952     {
1953       status = errno;
1954       if (status != EINTR)
1955         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1956       continue;
1957     }
1958
1959     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1960       break;
1961     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1962     {
1963       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1964           "poll(2) returned something unexpected: %#04hx",
1965           pollfd.revents);
1966       break;
1967     }
1968
1969     rbytes = read(fd, sock->rbuf + sock->next_read,
1970                   RBUF_SIZE - sock->next_read);
1971     if (rbytes < 0)
1972     {
1973       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
1974       break;
1975     }
1976     else if (rbytes == 0)
1977       break; /* eof */
1978
1979     sock->next_read += rbytes;
1980
1981     if (sock->batch_start)
1982       now = sock->batch_start;
1983     else
1984       now = time(NULL);
1985
1986     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
1987     {
1988       status = handle_request (sock, now, cmd, cmd_len+1);
1989       if (status != 0)
1990         goto out_close;
1991     }
1992   }
1993
1994 out_close:
1995   close_connection(sock);
1996
1997   /* Remove this thread from the connection threads list */
1998   pthread_mutex_lock (&connection_threads_lock);
1999   {
2000     pthread_t self;
2001     pthread_t *temp;
2002
2003     /* Find out own index in the array */
2004     self = pthread_self ();
2005     for (i = 0; i < connection_threads_num; i++)
2006       if (pthread_equal (connection_threads[i], self) != 0)
2007         break;
2008     assert (i < connection_threads_num);
2009
2010     /* Move the trailing threads forward. */
2011     if (i < (connection_threads_num - 1))
2012     {
2013       memmove (connection_threads + i,
2014                connection_threads + i + 1,
2015                sizeof (pthread_t) * (connection_threads_num - i - 1));
2016     }
2017
2018     connection_threads_num--;
2019
2020     temp = realloc(connection_threads,
2021                    sizeof(*connection_threads) * connection_threads_num);
2022     if (connection_threads_num > 0 && temp == NULL)
2023       RRDD_LOG(LOG_ERR, "connection_thread_main: realloc(--) failed.");
2024     else
2025       connection_threads = temp;
2026   }
2027   pthread_mutex_unlock (&connection_threads_lock);
2028
2029   return (NULL);
2030 } /* }}} void *connection_thread_main */
2031
2032 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2033 {
2034   int fd;
2035   struct sockaddr_un sa;
2036   listen_socket_t *temp;
2037   int status;
2038   const char *path;
2039
2040   path = sock->addr;
2041   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2042     path += strlen("unix:");
2043
2044   temp = (listen_socket_t *) realloc (listen_fds,
2045       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2046   if (temp == NULL)
2047   {
2048     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2049     return (-1);
2050   }
2051   listen_fds = temp;
2052   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2053
2054   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2055   if (fd < 0)
2056   {
2057     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2058              rrd_strerror(errno));
2059     return (-1);
2060   }
2061
2062   memset (&sa, 0, sizeof (sa));
2063   sa.sun_family = AF_UNIX;
2064   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2065
2066   /* if we've gotten this far, we own the pid file.  any daemon started
2067    * with the same args must not be alive.  therefore, ensure that we can
2068    * create the socket...
2069    */
2070   unlink(path);
2071
2072   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2073   if (status != 0)
2074   {
2075     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2076              path, rrd_strerror(errno));
2077     close (fd);
2078     return (-1);
2079   }
2080
2081   status = listen (fd, /* backlog = */ 10);
2082   if (status != 0)
2083   {
2084     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2085              path, rrd_strerror(errno));
2086     close (fd);
2087     unlink (path);
2088     return (-1);
2089   }
2090
2091   listen_fds[listen_fds_num].fd = fd;
2092   listen_fds[listen_fds_num].family = PF_UNIX;
2093   strncpy(listen_fds[listen_fds_num].addr, path,
2094           sizeof (listen_fds[listen_fds_num].addr) - 1);
2095   listen_fds_num++;
2096
2097   return (0);
2098 } /* }}} int open_listen_socket_unix */
2099
2100 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2101 {
2102   struct addrinfo ai_hints;
2103   struct addrinfo *ai_res;
2104   struct addrinfo *ai_ptr;
2105   char addr_copy[NI_MAXHOST];
2106   char *addr;
2107   char *port;
2108   int status;
2109
2110   strncpy (addr_copy, sock->addr, sizeof (addr_copy));
2111   addr_copy[sizeof (addr_copy) - 1] = 0;
2112   addr = addr_copy;
2113
2114   memset (&ai_hints, 0, sizeof (ai_hints));
2115   ai_hints.ai_flags = 0;
2116 #ifdef AI_ADDRCONFIG
2117   ai_hints.ai_flags |= AI_ADDRCONFIG;
2118 #endif
2119   ai_hints.ai_family = AF_UNSPEC;
2120   ai_hints.ai_socktype = SOCK_STREAM;
2121
2122   port = NULL;
2123   if (*addr == '[') /* IPv6+port format */
2124   {
2125     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2126     addr++;
2127
2128     port = strchr (addr, ']');
2129     if (port == NULL)
2130     {
2131       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2132       return (-1);
2133     }
2134     *port = 0;
2135     port++;
2136
2137     if (*port == ':')
2138       port++;
2139     else if (*port == 0)
2140       port = NULL;
2141     else
2142     {
2143       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2144       return (-1);
2145     }
2146   } /* if (*addr = ']') */
2147   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2148   {
2149     port = rindex(addr, ':');
2150     if (port != NULL)
2151     {
2152       *port = 0;
2153       port++;
2154     }
2155   }
2156   ai_res = NULL;
2157   status = getaddrinfo (addr,
2158                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2159                         &ai_hints, &ai_res);
2160   if (status != 0)
2161   {
2162     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2163              addr, gai_strerror (status));
2164     return (-1);
2165   }
2166
2167   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2168   {
2169     int fd;
2170     listen_socket_t *temp;
2171     int one = 1;
2172
2173     temp = (listen_socket_t *) realloc (listen_fds,
2174         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2175     if (temp == NULL)
2176     {
2177       fprintf (stderr,
2178                "rrdcached: open_listen_socket_network: realloc failed.\n");
2179       continue;
2180     }
2181     listen_fds = temp;
2182     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2183
2184     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2185     if (fd < 0)
2186     {
2187       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2188                rrd_strerror(errno));
2189       continue;
2190     }
2191
2192     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2193
2194     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2195     if (status != 0)
2196     {
2197       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2198                sock->addr, rrd_strerror(errno));
2199       close (fd);
2200       continue;
2201     }
2202
2203     status = listen (fd, /* backlog = */ 10);
2204     if (status != 0)
2205     {
2206       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2207                sock->addr, rrd_strerror(errno));
2208       close (fd);
2209       freeaddrinfo(ai_res);
2210       return (-1);
2211     }
2212
2213     listen_fds[listen_fds_num].fd = fd;
2214     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2215     listen_fds_num++;
2216   } /* for (ai_ptr) */
2217
2218   freeaddrinfo(ai_res);
2219   return (0);
2220 } /* }}} static int open_listen_socket_network */
2221
2222 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2223 {
2224   assert(sock != NULL);
2225   assert(sock->addr != NULL);
2226
2227   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2228       || sock->addr[0] == '/')
2229     return (open_listen_socket_unix(sock));
2230   else
2231     return (open_listen_socket_network(sock));
2232 } /* }}} int open_listen_socket */
2233
2234 static int close_listen_sockets (void) /* {{{ */
2235 {
2236   size_t i;
2237
2238   for (i = 0; i < listen_fds_num; i++)
2239   {
2240     close (listen_fds[i].fd);
2241
2242     if (listen_fds[i].family == PF_UNIX)
2243       unlink(listen_fds[i].addr);
2244   }
2245
2246   free (listen_fds);
2247   listen_fds = NULL;
2248   listen_fds_num = 0;
2249
2250   return (0);
2251 } /* }}} int close_listen_sockets */
2252
2253 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2254 {
2255   struct pollfd *pollfds;
2256   int pollfds_num;
2257   int status;
2258   int i;
2259
2260   if (listen_fds_num < 1)
2261   {
2262     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2263     return (NULL);
2264   }
2265
2266   pollfds_num = listen_fds_num;
2267   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2268   if (pollfds == NULL)
2269   {
2270     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2271     return (NULL);
2272   }
2273   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2274
2275   RRDD_LOG(LOG_INFO, "listening for connections");
2276
2277   while (do_shutdown == 0)
2278   {
2279     for (i = 0; i < pollfds_num; i++)
2280     {
2281       pollfds[i].fd = listen_fds[i].fd;
2282       pollfds[i].events = POLLIN | POLLPRI;
2283       pollfds[i].revents = 0;
2284     }
2285
2286     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2287     if (do_shutdown)
2288       break;
2289     else if (status == 0) /* timeout */
2290       continue;
2291     else if (status < 0) /* error */
2292     {
2293       status = errno;
2294       if (status != EINTR)
2295       {
2296         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2297       }
2298       continue;
2299     }
2300
2301     for (i = 0; i < pollfds_num; i++)
2302     {
2303       listen_socket_t *client_sock;
2304       struct sockaddr_storage client_sa;
2305       socklen_t client_sa_size;
2306       pthread_t tid;
2307       pthread_attr_t attr;
2308
2309       if (pollfds[i].revents == 0)
2310         continue;
2311
2312       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2313       {
2314         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2315             "poll(2) returned something unexpected for listen FD #%i.",
2316             pollfds[i].fd);
2317         continue;
2318       }
2319
2320       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2321       if (client_sock == NULL)
2322       {
2323         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2324         continue;
2325       }
2326       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2327
2328       client_sa_size = sizeof (client_sa);
2329       client_sock->fd = accept (pollfds[i].fd,
2330           (struct sockaddr *) &client_sa, &client_sa_size);
2331       if (client_sock->fd < 0)
2332       {
2333         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2334         free(client_sock);
2335         continue;
2336       }
2337
2338       pthread_attr_init (&attr);
2339       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2340
2341       status = pthread_create (&tid, &attr, connection_thread_main,
2342                                client_sock);
2343       if (status != 0)
2344       {
2345         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2346         close_connection(client_sock);
2347         continue;
2348       }
2349     } /* for (pollfds_num) */
2350   } /* while (do_shutdown == 0) */
2351
2352   RRDD_LOG(LOG_INFO, "starting shutdown");
2353
2354   close_listen_sockets ();
2355
2356   pthread_mutex_lock (&connection_threads_lock);
2357   while (connection_threads_num > 0)
2358   {
2359     pthread_t wait_for;
2360
2361     wait_for = connection_threads[0];
2362
2363     pthread_mutex_unlock (&connection_threads_lock);
2364     pthread_join (wait_for, /* retval = */ NULL);
2365     pthread_mutex_lock (&connection_threads_lock);
2366   }
2367   pthread_mutex_unlock (&connection_threads_lock);
2368
2369   free(pollfds);
2370
2371   return (NULL);
2372 } /* }}} void *listen_thread_main */
2373
2374 static int daemonize (void) /* {{{ */
2375 {
2376   int pid_fd;
2377   char *base_dir;
2378
2379   daemon_uid = geteuid();
2380
2381   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2382   if (pid_fd < 0)
2383     pid_fd = check_pidfile();
2384   if (pid_fd < 0)
2385     return pid_fd;
2386
2387   /* open all the listen sockets */
2388   if (config_listen_address_list_len > 0)
2389   {
2390     for (int i = 0; i < config_listen_address_list_len; i++)
2391     {
2392       open_listen_socket (config_listen_address_list[i]);
2393       free_listen_socket (config_listen_address_list[i]);
2394     }
2395
2396     free(config_listen_address_list);
2397   }
2398   else
2399   {
2400     listen_socket_t sock;
2401     memset(&sock, 0, sizeof(sock));
2402     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
2403     open_listen_socket (&sock);
2404   }
2405
2406   if (listen_fds_num < 1)
2407   {
2408     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2409     goto error;
2410   }
2411
2412   if (!stay_foreground)
2413   {
2414     pid_t child;
2415
2416     child = fork ();
2417     if (child < 0)
2418     {
2419       fprintf (stderr, "daemonize: fork(2) failed.\n");
2420       goto error;
2421     }
2422     else if (child > 0)
2423       exit(0);
2424
2425     /* Become session leader */
2426     setsid ();
2427
2428     /* Open the first three file descriptors to /dev/null */
2429     close (2);
2430     close (1);
2431     close (0);
2432
2433     open ("/dev/null", O_RDWR);
2434     dup (0);
2435     dup (0);
2436   } /* if (!stay_foreground) */
2437
2438   /* Change into the /tmp directory. */
2439   base_dir = (config_base_dir != NULL)
2440     ? config_base_dir
2441     : "/tmp";
2442
2443   if (chdir (base_dir) != 0)
2444   {
2445     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2446     goto error;
2447   }
2448
2449   install_signal_handlers();
2450
2451   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2452   RRDD_LOG(LOG_INFO, "starting up");
2453
2454   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2455                                 (GDestroyNotify) free_cache_item);
2456   if (cache_tree == NULL)
2457   {
2458     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2459     goto error;
2460   }
2461
2462   return write_pidfile (pid_fd);
2463
2464 error:
2465   remove_pidfile();
2466   return -1;
2467 } /* }}} int daemonize */
2468
2469 static int cleanup (void) /* {{{ */
2470 {
2471   do_shutdown++;
2472
2473   pthread_cond_signal (&cache_cond);
2474   pthread_join (queue_thread, /* return = */ NULL);
2475
2476   remove_pidfile ();
2477
2478   free(config_base_dir);
2479   free(config_pid_file);
2480   free(journal_cur);
2481   free(journal_old);
2482
2483   pthread_mutex_lock(&cache_lock);
2484   g_tree_destroy(cache_tree);
2485
2486   RRDD_LOG(LOG_INFO, "goodbye");
2487   closelog ();
2488
2489   return (0);
2490 } /* }}} int cleanup */
2491
2492 static int read_options (int argc, char **argv) /* {{{ */
2493 {
2494   int option;
2495   int status = 0;
2496
2497   while ((option = getopt(argc, argv, "gl:L:f:w:b:Bz:p:j:h?F")) != -1)
2498   {
2499     switch (option)
2500     {
2501       case 'g':
2502         stay_foreground=1;
2503         break;
2504
2505       case 'L':
2506       case 'l':
2507       {
2508         listen_socket_t **temp;
2509         listen_socket_t *new;
2510
2511         new = malloc(sizeof(listen_socket_t));
2512         if (new == NULL)
2513         {
2514           fprintf(stderr, "read_options: malloc failed.\n");
2515           return(2);
2516         }
2517         memset(new, 0, sizeof(listen_socket_t));
2518
2519         temp = (listen_socket_t **) realloc (config_listen_address_list,
2520             sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2521         if (temp == NULL)
2522         {
2523           fprintf (stderr, "read_options: realloc failed.\n");
2524           return (2);
2525         }
2526         config_listen_address_list = temp;
2527
2528         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2529         new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2530
2531         temp[config_listen_address_list_len] = new;
2532         config_listen_address_list_len++;
2533       }
2534       break;
2535
2536       case 'f':
2537       {
2538         int temp;
2539
2540         temp = atoi (optarg);
2541         if (temp > 0)
2542           config_flush_interval = temp;
2543         else
2544         {
2545           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2546           status = 3;
2547         }
2548       }
2549       break;
2550
2551       case 'w':
2552       {
2553         int temp;
2554
2555         temp = atoi (optarg);
2556         if (temp > 0)
2557           config_write_interval = temp;
2558         else
2559         {
2560           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2561           status = 2;
2562         }
2563       }
2564       break;
2565
2566       case 'z':
2567       {
2568         int temp;
2569
2570         temp = atoi(optarg);
2571         if (temp > 0)
2572           config_write_jitter = temp;
2573         else
2574         {
2575           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2576           status = 2;
2577         }
2578
2579         break;
2580       }
2581
2582       case 'B':
2583         config_write_base_only = 1;
2584         break;
2585
2586       case 'b':
2587       {
2588         size_t len;
2589         char base_realpath[PATH_MAX];
2590
2591         if (config_base_dir != NULL)
2592           free (config_base_dir);
2593         config_base_dir = strdup (optarg);
2594         if (config_base_dir == NULL)
2595         {
2596           fprintf (stderr, "read_options: strdup failed.\n");
2597           return (3);
2598         }
2599
2600         /* make sure that the base directory is not resolved via
2601          * symbolic links.  this makes some performance-enhancing
2602          * assumptions possible (we don't have to resolve paths
2603          * that start with a "/")
2604          */
2605         if (realpath(config_base_dir, base_realpath) == NULL)
2606         {
2607           fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2608           return 5;
2609         }
2610         else if (strncmp(config_base_dir,
2611                          base_realpath, sizeof(base_realpath)) != 0)
2612         {
2613           fprintf(stderr,
2614                   "Base directory (-b) resolved via file system links!\n"
2615                   "Please consult rrdcached '-b' documentation!\n"
2616                   "Consider specifying the real directory (%s)\n",
2617                   base_realpath);
2618           return 5;
2619         }
2620
2621         len = strlen (config_base_dir);
2622         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2623         {
2624           config_base_dir[len - 1] = 0;
2625           len--;
2626         }
2627
2628         if (len < 1)
2629         {
2630           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2631           return (4);
2632         }
2633
2634         _config_base_dir_len = len;
2635       }
2636       break;
2637
2638       case 'p':
2639       {
2640         if (config_pid_file != NULL)
2641           free (config_pid_file);
2642         config_pid_file = strdup (optarg);
2643         if (config_pid_file == NULL)
2644         {
2645           fprintf (stderr, "read_options: strdup failed.\n");
2646           return (3);
2647         }
2648       }
2649       break;
2650
2651       case 'F':
2652         config_flush_at_shutdown = 1;
2653         break;
2654
2655       case 'j':
2656       {
2657         struct stat statbuf;
2658         const char *dir = optarg;
2659
2660         status = stat(dir, &statbuf);
2661         if (status != 0)
2662         {
2663           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2664           return 6;
2665         }
2666
2667         if (!S_ISDIR(statbuf.st_mode)
2668             || access(dir, R_OK|W_OK|X_OK) != 0)
2669         {
2670           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2671                   errno ? rrd_strerror(errno) : "");
2672           return 6;
2673         }
2674
2675         journal_cur = malloc(PATH_MAX + 1);
2676         journal_old = malloc(PATH_MAX + 1);
2677         if (journal_cur == NULL || journal_old == NULL)
2678         {
2679           fprintf(stderr, "malloc failure for journal files\n");
2680           return 6;
2681         }
2682         else 
2683         {
2684           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2685           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2686         }
2687       }
2688       break;
2689
2690       case 'h':
2691       case '?':
2692         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2693             "\n"
2694             "Usage: rrdcached [options]\n"
2695             "\n"
2696             "Valid options are:\n"
2697             "  -l <address>  Socket address to listen to.\n"
2698             "  -L <address>  Socket address to listen to ('FLUSH' only).\n"
2699             "  -w <seconds>  Interval in which to write data.\n"
2700             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2701             "  -f <seconds>  Interval in which to flush dead data.\n"
2702             "  -p <file>     Location of the PID-file.\n"
2703             "  -b <dir>      Base directory to change to.\n"
2704             "  -B            Restrict file access to paths within -b <dir>\n"
2705             "  -g            Do not fork and run in the foreground.\n"
2706             "  -j <dir>      Directory in which to create the journal files.\n"
2707             "  -F            Always flush all updates at shutdown\n"
2708             "\n"
2709             "For more information and a detailed description of all options "
2710             "please refer\n"
2711             "to the rrdcached(1) manual page.\n",
2712             VERSION);
2713         status = -1;
2714         break;
2715     } /* switch (option) */
2716   } /* while (getopt) */
2717
2718   /* advise the user when values are not sane */
2719   if (config_flush_interval < 2 * config_write_interval)
2720     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2721             " 2x write interval (-w) !\n");
2722   if (config_write_jitter > config_write_interval)
2723     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2724             " write interval (-w) !\n");
2725
2726   if (config_write_base_only && config_base_dir == NULL)
2727     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2728             "  Consult the rrdcached documentation\n");
2729
2730   if (journal_cur == NULL)
2731     config_flush_at_shutdown = 1;
2732
2733   return (status);
2734 } /* }}} int read_options */
2735
2736 int main (int argc, char **argv)
2737 {
2738   int status;
2739
2740   status = read_options (argc, argv);
2741   if (status != 0)
2742   {
2743     if (status < 0)
2744       status = 0;
2745     return (status);
2746   }
2747
2748   status = daemonize ();
2749   if (status != 0)
2750   {
2751     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
2752     return (1);
2753   }
2754
2755   journal_init();
2756
2757   /* start the queue thread */
2758   memset (&queue_thread, 0, sizeof (queue_thread));
2759   status = pthread_create (&queue_thread,
2760                            NULL, /* attr */
2761                            queue_thread_main,
2762                            NULL); /* args */
2763   if (status != 0)
2764   {
2765     RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2766     cleanup();
2767     return (1);
2768   }
2769
2770   listen_thread_main (NULL);
2771   cleanup ();
2772
2773   return (0);
2774 } /* int main */
2775
2776 /*
2777  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2778  */