rrdcached: pull in rrd_config.h so we can use its defines
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
73
74 #include "rrd.h"
75 #include "rrd_client.h"
76
77 #include <stdlib.h>
78 #include <stdint.h>
79 #include <stdio.h>
80 #include <unistd.h>
81 #include <string.h>
82 #include <strings.h>
83 #include <stdint.h>
84 #include <inttypes.h>
85
86 #include <sys/types.h>
87 #include <sys/stat.h>
88 #include <fcntl.h>
89 #include <signal.h>
90 #include <sys/socket.h>
91 #include <sys/un.h>
92 #include <netdb.h>
93 #include <poll.h>
94 #include <syslog.h>
95 #include <pthread.h>
96 #include <errno.h>
97 #include <assert.h>
98 #include <sys/time.h>
99 #include <time.h>
100
101 #include <glib-2.0/glib.h>
102 /* }}} */
103
104 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
105
106 #ifndef __GNUC__
107 # define __attribute__(x) /**/
108 #endif
109
110 /*
111  * Types
112  */
113 typedef enum
114 {
115   PRIV_LOW,
116   PRIV_HIGH
117 } socket_privilege;
118
119 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
120
121 struct listen_socket_s
122 {
123   int fd;
124   char addr[PATH_MAX + 1];
125   int family;
126   socket_privilege privilege;
127
128   /* state for BATCH processing */
129   time_t batch_start;
130   int batch_cmd;
131
132   /* buffered IO */
133   char *rbuf;
134   off_t next_cmd;
135   off_t next_read;
136
137   char *wbuf;
138   ssize_t wbuf_len;
139 };
140 typedef struct listen_socket_s listen_socket_t;
141
142 struct cache_item_s;
143 typedef struct cache_item_s cache_item_t;
144 struct cache_item_s
145 {
146   char *file;
147   char **values;
148   int values_num;
149   time_t last_flush_time;
150   time_t last_update_stamp;
151 #define CI_FLAGS_IN_TREE  (1<<0)
152 #define CI_FLAGS_IN_QUEUE (1<<1)
153   int flags;
154   pthread_cond_t  flushed;
155   cache_item_t *prev;
156   cache_item_t *next;
157 };
158
159 struct callback_flush_data_s
160 {
161   time_t now;
162   time_t abs_timeout;
163   char **keys;
164   size_t keys_num;
165 };
166 typedef struct callback_flush_data_s callback_flush_data_t;
167
168 enum queue_side_e
169 {
170   HEAD,
171   TAIL
172 };
173 typedef enum queue_side_e queue_side_t;
174
175 /* max length of socket command or response */
176 #define CMD_MAX 4096
177 #define RBUF_SIZE (CMD_MAX*2)
178
179 /*
180  * Variables
181  */
182 static int stay_foreground = 0;
183 static uid_t daemon_uid;
184
185 static listen_socket_t *listen_fds = NULL;
186 static size_t listen_fds_num = 0;
187
188 static int do_shutdown = 0;
189
190 static pthread_t queue_thread;
191
192 static pthread_t *connection_threads = NULL;
193 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
194 static int connection_threads_num = 0;
195
196 /* Cache stuff */
197 static GTree          *cache_tree = NULL;
198 static cache_item_t   *cache_queue_head = NULL;
199 static cache_item_t   *cache_queue_tail = NULL;
200 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
201 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
202
203 static int config_write_interval = 300;
204 static int config_write_jitter   = 0;
205 static int config_flush_interval = 3600;
206 static int config_flush_at_shutdown = 0;
207 static char *config_pid_file = NULL;
208 static char *config_base_dir = NULL;
209 static size_t _config_base_dir_len = 0;
210 static int config_write_base_only = 0;
211
212 static listen_socket_t **config_listen_address_list = NULL;
213 static int config_listen_address_list_len = 0;
214
215 static uint64_t stats_queue_length = 0;
216 static uint64_t stats_updates_received = 0;
217 static uint64_t stats_flush_received = 0;
218 static uint64_t stats_updates_written = 0;
219 static uint64_t stats_data_sets_written = 0;
220 static uint64_t stats_journal_bytes = 0;
221 static uint64_t stats_journal_rotate = 0;
222 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
223
224 /* Journaled updates */
225 static char *journal_cur = NULL;
226 static char *journal_old = NULL;
227 static FILE *journal_fh = NULL;
228 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
229 static int journal_write(char *cmd, char *args);
230 static void journal_done(void);
231 static void journal_rotate(void);
232
233 /* 
234  * Functions
235  */
236 static void sig_common (const char *sig) /* {{{ */
237 {
238   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
239   do_shutdown++;
240   pthread_cond_broadcast(&cache_cond);
241 } /* }}} void sig_common */
242
243 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
244 {
245   sig_common("INT");
246 } /* }}} void sig_int_handler */
247
248 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
249 {
250   sig_common("TERM");
251 } /* }}} void sig_term_handler */
252
253 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
254 {
255   config_flush_at_shutdown = 1;
256   sig_common("USR1");
257 } /* }}} void sig_usr1_handler */
258
259 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
260 {
261   config_flush_at_shutdown = 0;
262   sig_common("USR2");
263 } /* }}} void sig_usr2_handler */
264
265 static void install_signal_handlers(void) /* {{{ */
266 {
267   /* These structures are static, because `sigaction' behaves weird if the are
268    * overwritten.. */
269   static struct sigaction sa_int;
270   static struct sigaction sa_term;
271   static struct sigaction sa_pipe;
272   static struct sigaction sa_usr1;
273   static struct sigaction sa_usr2;
274
275   /* Install signal handlers */
276   memset (&sa_int, 0, sizeof (sa_int));
277   sa_int.sa_handler = sig_int_handler;
278   sigaction (SIGINT, &sa_int, NULL);
279
280   memset (&sa_term, 0, sizeof (sa_term));
281   sa_term.sa_handler = sig_term_handler;
282   sigaction (SIGTERM, &sa_term, NULL);
283
284   memset (&sa_pipe, 0, sizeof (sa_pipe));
285   sa_pipe.sa_handler = SIG_IGN;
286   sigaction (SIGPIPE, &sa_pipe, NULL);
287
288   memset (&sa_pipe, 0, sizeof (sa_usr1));
289   sa_usr1.sa_handler = sig_usr1_handler;
290   sigaction (SIGUSR1, &sa_usr1, NULL);
291
292   memset (&sa_usr2, 0, sizeof (sa_usr2));
293   sa_usr2.sa_handler = sig_usr2_handler;
294   sigaction (SIGUSR2, &sa_usr2, NULL);
295
296 } /* }}} void install_signal_handlers */
297
298 static int open_pidfile(char *action, int oflag) /* {{{ */
299 {
300   int fd;
301   char *file;
302
303   file = (config_pid_file != NULL)
304     ? config_pid_file
305     : LOCALSTATEDIR "/run/rrdcached.pid";
306
307   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
308   if (fd < 0)
309     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
310             action, file, rrd_strerror(errno));
311
312   return(fd);
313 } /* }}} static int open_pidfile */
314
315 /* check existing pid file to see whether a daemon is running */
316 static int check_pidfile(void)
317 {
318   int pid_fd;
319   pid_t pid;
320   char pid_str[16];
321
322   pid_fd = open_pidfile("open", O_RDWR);
323   if (pid_fd < 0)
324     return pid_fd;
325
326   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
327     return -1;
328
329   pid = atoi(pid_str);
330   if (pid <= 0)
331     return -1;
332
333   /* another running process that we can signal COULD be
334    * a competing rrdcached */
335   if (pid != getpid() && kill(pid, 0) == 0)
336   {
337     fprintf(stderr,
338             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
339     close(pid_fd);
340     return -1;
341   }
342
343   lseek(pid_fd, 0, SEEK_SET);
344   ftruncate(pid_fd, 0);
345
346   fprintf(stderr,
347           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
348           "rrdcached: starting normally.\n", pid);
349
350   return pid_fd;
351 } /* }}} static int check_pidfile */
352
353 static int write_pidfile (int fd) /* {{{ */
354 {
355   pid_t pid;
356   FILE *fh;
357
358   pid = getpid ();
359
360   fh = fdopen (fd, "w");
361   if (fh == NULL)
362   {
363     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
364     close(fd);
365     return (-1);
366   }
367
368   fprintf (fh, "%i\n", (int) pid);
369   fclose (fh);
370
371   return (0);
372 } /* }}} int write_pidfile */
373
374 static int remove_pidfile (void) /* {{{ */
375 {
376   char *file;
377   int status;
378
379   file = (config_pid_file != NULL)
380     ? config_pid_file
381     : LOCALSTATEDIR "/run/rrdcached.pid";
382
383   status = unlink (file);
384   if (status == 0)
385     return (0);
386   return (errno);
387 } /* }}} int remove_pidfile */
388
389 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
390 {
391   char *eol;
392
393   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
394                sock->next_read - sock->next_cmd);
395
396   if (eol == NULL)
397   {
398     /* no commands left, move remainder back to front of rbuf */
399     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
400             sock->next_read - sock->next_cmd);
401     sock->next_read -= sock->next_cmd;
402     sock->next_cmd = 0;
403     *len = 0;
404     return NULL;
405   }
406   else
407   {
408     char *cmd = sock->rbuf + sock->next_cmd;
409     *eol = '\0';
410
411     sock->next_cmd = eol - sock->rbuf + 1;
412
413     if (eol > sock->rbuf && *(eol-1) == '\r')
414       *(--eol) = '\0'; /* handle "\r\n" EOL */
415
416     *len = eol - cmd;
417
418     return cmd;
419   }
420
421   /* NOTREACHED */
422   assert(1==0);
423 }
424
425 /* add the characters directly to the write buffer */
426 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
427 {
428   char *new_buf;
429
430   assert(sock != NULL);
431
432   new_buf = realloc(sock->wbuf, sock->wbuf_len + len + 1);
433   if (new_buf == NULL)
434   {
435     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
436     return -1;
437   }
438
439   strncpy(new_buf + sock->wbuf_len, str, len + 1);
440
441   sock->wbuf = new_buf;
442   sock->wbuf_len += len;
443
444   return 0;
445 } /* }}} static int add_to_wbuf */
446
447 /* add the text to the "extra" info that's sent after the status line */
448 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
449 {
450   va_list argp;
451   char buffer[CMD_MAX];
452   int len;
453
454   if (sock == NULL) return 0; /* journal replay mode */
455   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
456
457   va_start(argp, fmt);
458 #ifdef HAVE_VSNPRINTF
459   len = vsnprintf(buffer, sizeof(buffer)-1, fmt, argp);
460 #else
461   len = vsprintf(buffer, fmt, argp);
462 #endif
463   va_end(argp);
464   if (len < 0)
465   {
466     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
467     return -1;
468   }
469
470   return add_to_wbuf(sock, buffer, len);
471 } /* }}} static int add_response_info */
472
473 static int count_lines(char *str) /* {{{ */
474 {
475   int lines = 0;
476
477   if (str != NULL)
478   {
479     while ((str = strchr(str, '\n')) != NULL)
480     {
481       ++lines;
482       ++str;
483     }
484   }
485
486   return lines;
487 } /* }}} static int count_lines */
488
489 /* send the response back to the user.
490  * returns 0 on success, -1 on error
491  * write buffer is always zeroed after this call */
492 static int send_response (listen_socket_t *sock, response_code rc,
493                           char *fmt, ...) /* {{{ */
494 {
495   va_list argp;
496   char buffer[CMD_MAX];
497   int lines;
498   ssize_t wrote;
499   int rclen, len;
500
501   if (sock == NULL) return rc;  /* journal replay mode */
502
503   if (sock->batch_start)
504   {
505     if (rc == RESP_OK)
506       return rc; /* no response on success during BATCH */
507     lines = sock->batch_cmd;
508   }
509   else if (rc == RESP_OK)
510     lines = count_lines(sock->wbuf);
511   else
512     lines = -1;
513
514   rclen = sprintf(buffer, "%d ", lines);
515   va_start(argp, fmt);
516 #ifdef HAVE_VSNPRINTF
517   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen-1, fmt, argp);
518 #else
519   len = vsprintf(buffer+rclen, fmt, argp);
520 #endif
521   va_end(argp);
522   if (len < 0)
523     return -1;
524
525   len += rclen;
526
527   /* append the result to the wbuf, don't write to the user */
528   if (sock->batch_start)
529     return add_to_wbuf(sock, buffer, len);
530
531   /* first write must be complete */
532   if (len != write(sock->fd, buffer, len))
533   {
534     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
535     return -1;
536   }
537
538   if (sock->wbuf != NULL && rc == RESP_OK)
539   {
540     wrote = 0;
541     while (wrote < sock->wbuf_len)
542     {
543       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
544       if (wb <= 0)
545       {
546         RRDD_LOG(LOG_INFO, "send_response: could not write results");
547         return -1;
548       }
549       wrote += wb;
550     }
551   }
552
553   free(sock->wbuf); sock->wbuf = NULL;
554   sock->wbuf_len = 0;
555
556   return 0;
557 } /* }}} */
558
559 static void wipe_ci_values(cache_item_t *ci, time_t when)
560 {
561   ci->values = NULL;
562   ci->values_num = 0;
563
564   ci->last_flush_time = when;
565   if (config_write_jitter > 0)
566     ci->last_flush_time += (random() % config_write_jitter);
567 }
568
569 /* remove_from_queue
570  * remove a "cache_item_t" item from the queue.
571  * must hold 'cache_lock' when calling this
572  */
573 static void remove_from_queue(cache_item_t *ci) /* {{{ */
574 {
575   if (ci == NULL) return;
576   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
577
578   if (ci->prev == NULL)
579     cache_queue_head = ci->next; /* reset head */
580   else
581     ci->prev->next = ci->next;
582
583   if (ci->next == NULL)
584     cache_queue_tail = ci->prev; /* reset the tail */
585   else
586     ci->next->prev = ci->prev;
587
588   ci->next = ci->prev = NULL;
589   ci->flags &= ~CI_FLAGS_IN_QUEUE;
590 } /* }}} static void remove_from_queue */
591
592 /* remove an entry from the tree and free all its resources.
593  * must hold 'cache lock' while calling this.
594  * returns 0 on success, otherwise errno */
595 static int forget_file(const char *file)
596 {
597   cache_item_t *ci;
598
599   ci = g_tree_lookup(cache_tree, file);
600   if (ci == NULL)
601     return ENOENT;
602
603   g_tree_remove (cache_tree, file);
604   remove_from_queue(ci);
605
606   for (int i=0; i < ci->values_num; i++)
607     free(ci->values[i]);
608
609   free (ci->values);
610   free (ci->file);
611
612   /* in case anyone is waiting */
613   pthread_cond_broadcast(&ci->flushed);
614
615   free (ci);
616
617   return 0;
618 } /* }}} static int forget_file */
619
620 /*
621  * enqueue_cache_item:
622  * `cache_lock' must be acquired before calling this function!
623  */
624 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
625     queue_side_t side)
626 {
627   if (ci == NULL)
628     return (-1);
629
630   if (ci->values_num == 0)
631     return (0);
632
633   if (side == HEAD)
634   {
635     if (cache_queue_head == ci)
636       return 0;
637
638     /* remove if further down in queue */
639     remove_from_queue(ci);
640
641     ci->prev = NULL;
642     ci->next = cache_queue_head;
643     if (ci->next != NULL)
644       ci->next->prev = ci;
645     cache_queue_head = ci;
646
647     if (cache_queue_tail == NULL)
648       cache_queue_tail = cache_queue_head;
649   }
650   else /* (side == TAIL) */
651   {
652     /* We don't move values back in the list.. */
653     if (ci->flags & CI_FLAGS_IN_QUEUE)
654       return (0);
655
656     assert (ci->next == NULL);
657     assert (ci->prev == NULL);
658
659     ci->prev = cache_queue_tail;
660
661     if (cache_queue_tail == NULL)
662       cache_queue_head = ci;
663     else
664       cache_queue_tail->next = ci;
665
666     cache_queue_tail = ci;
667   }
668
669   ci->flags |= CI_FLAGS_IN_QUEUE;
670
671   pthread_cond_broadcast(&cache_cond);
672   pthread_mutex_lock (&stats_lock);
673   stats_queue_length++;
674   pthread_mutex_unlock (&stats_lock);
675
676   return (0);
677 } /* }}} int enqueue_cache_item */
678
679 /*
680  * tree_callback_flush:
681  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
682  * while this is in progress.
683  */
684 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
685     gpointer data)
686 {
687   cache_item_t *ci;
688   callback_flush_data_t *cfd;
689
690   ci = (cache_item_t *) value;
691   cfd = (callback_flush_data_t *) data;
692
693   if (ci->flags & CI_FLAGS_IN_QUEUE)
694     return FALSE;
695
696   if ((ci->last_flush_time <= cfd->abs_timeout)
697       && (ci->values_num > 0))
698   {
699     enqueue_cache_item (ci, TAIL);
700   }
701   else if ((do_shutdown != 0)
702       && (ci->values_num > 0))
703   {
704     enqueue_cache_item (ci, TAIL);
705   }
706   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
707       && (ci->values_num <= 0))
708   {
709     char **temp;
710
711     temp = (char **) realloc (cfd->keys,
712         sizeof (char *) * (cfd->keys_num + 1));
713     if (temp == NULL)
714     {
715       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
716       return (FALSE);
717     }
718     cfd->keys = temp;
719     /* Make really sure this points to the _same_ place */
720     assert ((char *) key == ci->file);
721     cfd->keys[cfd->keys_num] = (char *) key;
722     cfd->keys_num++;
723   }
724
725   return (FALSE);
726 } /* }}} gboolean tree_callback_flush */
727
728 static int flush_old_values (int max_age)
729 {
730   callback_flush_data_t cfd;
731   size_t k;
732
733   memset (&cfd, 0, sizeof (cfd));
734   /* Pass the current time as user data so that we don't need to call
735    * `time' for each node. */
736   cfd.now = time (NULL);
737   cfd.keys = NULL;
738   cfd.keys_num = 0;
739
740   if (max_age > 0)
741     cfd.abs_timeout = cfd.now - max_age;
742   else
743     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
744
745   /* `tree_callback_flush' will return the keys of all values that haven't
746    * been touched in the last `config_flush_interval' seconds in `cfd'.
747    * The char*'s in this array point to the same memory as ci->file, so we
748    * don't need to free them separately. */
749   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
750
751   for (k = 0; k < cfd.keys_num; k++)
752   {
753     /* should never fail, since we have held the cache_lock
754      * the entire time */
755     assert( forget_file(cfd.keys[k]) == 0 );
756   }
757
758   if (cfd.keys != NULL)
759   {
760     free (cfd.keys);
761     cfd.keys = NULL;
762   }
763
764   return (0);
765 } /* int flush_old_values */
766
767 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
768 {
769   struct timeval now;
770   struct timespec next_flush;
771   int final_flush = 0; /* make sure we only flush once on shutdown */
772
773   gettimeofday (&now, NULL);
774   next_flush.tv_sec = now.tv_sec + config_flush_interval;
775   next_flush.tv_nsec = 1000 * now.tv_usec;
776
777   pthread_mutex_lock (&cache_lock);
778   while ((do_shutdown == 0) || (cache_queue_head != NULL))
779   {
780     cache_item_t *ci;
781     char *file;
782     char **values;
783     int values_num;
784     int status;
785     int i;
786
787     /* First, check if it's time to do the cache flush. */
788     gettimeofday (&now, NULL);
789     if ((now.tv_sec > next_flush.tv_sec)
790         || ((now.tv_sec == next_flush.tv_sec)
791           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
792     {
793       /* Flush all values that haven't been written in the last
794        * `config_write_interval' seconds. */
795       flush_old_values (config_write_interval);
796
797       /* Determine the time of the next cache flush. */
798       next_flush.tv_sec =
799         now.tv_sec + next_flush.tv_sec % config_flush_interval;
800
801       /* unlock the cache while we rotate so we don't block incoming
802        * updates if the fsync() blocks on disk I/O */
803       pthread_mutex_unlock(&cache_lock);
804       journal_rotate();
805       pthread_mutex_lock(&cache_lock);
806     }
807
808     /* Now, check if there's something to store away. If not, wait until
809      * something comes in or it's time to do the cache flush.  if we are
810      * shutting down, do not wait around.  */
811     if (cache_queue_head == NULL && !do_shutdown)
812     {
813       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
814       if ((status != 0) && (status != ETIMEDOUT))
815       {
816         RRDD_LOG (LOG_ERR, "queue_thread_main: "
817             "pthread_cond_timedwait returned %i.", status);
818       }
819     }
820
821     /* We're about to shut down */
822     if (do_shutdown != 0 && !final_flush++)
823     {
824       if (config_flush_at_shutdown)
825         flush_old_values (-1); /* flush everything */
826       else
827         break;
828     }
829
830     /* Check if a value has arrived. This may be NULL if we timed out or there
831      * was an interrupt such as a signal. */
832     if (cache_queue_head == NULL)
833       continue;
834
835     ci = cache_queue_head;
836
837     /* copy the relevant parts */
838     file = strdup (ci->file);
839     if (file == NULL)
840     {
841       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
842       continue;
843     }
844
845     assert(ci->values != NULL);
846     assert(ci->values_num > 0);
847
848     values = ci->values;
849     values_num = ci->values_num;
850
851     wipe_ci_values(ci, time(NULL));
852     remove_from_queue(ci);
853
854     pthread_mutex_lock (&stats_lock);
855     assert (stats_queue_length > 0);
856     stats_queue_length--;
857     pthread_mutex_unlock (&stats_lock);
858
859     pthread_mutex_unlock (&cache_lock);
860
861     rrd_clear_error ();
862     status = rrd_update_r (file, NULL, values_num, (void *) values);
863     if (status != 0)
864     {
865       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
866           "rrd_update_r (%s) failed with status %i. (%s)",
867           file, status, rrd_get_error());
868     }
869
870     journal_write("wrote", file);
871     pthread_cond_broadcast(&ci->flushed);
872
873     for (i = 0; i < values_num; i++)
874       free (values[i]);
875
876     free(values);
877     free(file);
878
879     if (status == 0)
880     {
881       pthread_mutex_lock (&stats_lock);
882       stats_updates_written++;
883       stats_data_sets_written += values_num;
884       pthread_mutex_unlock (&stats_lock);
885     }
886
887     pthread_mutex_lock (&cache_lock);
888
889     /* We're about to shut down */
890     if (do_shutdown != 0 && !final_flush++)
891     {
892       if (config_flush_at_shutdown)
893           flush_old_values (-1); /* flush everything */
894       else
895         break;
896     }
897   } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
898   pthread_mutex_unlock (&cache_lock);
899
900   if (config_flush_at_shutdown)
901   {
902     assert(cache_queue_head == NULL);
903     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
904   }
905
906   journal_done();
907
908   return (NULL);
909 } /* }}} void *queue_thread_main */
910
911 static int buffer_get_field (char **buffer_ret, /* {{{ */
912     size_t *buffer_size_ret, char **field_ret)
913 {
914   char *buffer;
915   size_t buffer_pos;
916   size_t buffer_size;
917   char *field;
918   size_t field_size;
919   int status;
920
921   buffer = *buffer_ret;
922   buffer_pos = 0;
923   buffer_size = *buffer_size_ret;
924   field = *buffer_ret;
925   field_size = 0;
926
927   if (buffer_size <= 0)
928     return (-1);
929
930   /* This is ensured by `handle_request'. */
931   assert (buffer[buffer_size - 1] == '\0');
932
933   status = -1;
934   while (buffer_pos < buffer_size)
935   {
936     /* Check for end-of-field or end-of-buffer */
937     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
938     {
939       field[field_size] = 0;
940       field_size++;
941       buffer_pos++;
942       status = 0;
943       break;
944     }
945     /* Handle escaped characters. */
946     else if (buffer[buffer_pos] == '\\')
947     {
948       if (buffer_pos >= (buffer_size - 1))
949         break;
950       buffer_pos++;
951       field[field_size] = buffer[buffer_pos];
952       field_size++;
953       buffer_pos++;
954     }
955     /* Normal operation */ 
956     else
957     {
958       field[field_size] = buffer[buffer_pos];
959       field_size++;
960       buffer_pos++;
961     }
962   } /* while (buffer_pos < buffer_size) */
963
964   if (status != 0)
965     return (status);
966
967   *buffer_ret = buffer + buffer_pos;
968   *buffer_size_ret = buffer_size - buffer_pos;
969   *field_ret = field;
970
971   return (0);
972 } /* }}} int buffer_get_field */
973
974 /* if we're restricting writes to the base directory,
975  * check whether the file falls within the dir
976  * returns 1 if OK, otherwise 0
977  */
978 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
979 {
980   assert(file != NULL);
981
982   if (!config_write_base_only
983       || sock == NULL /* journal replay */
984       || config_base_dir == NULL)
985     return 1;
986
987   if (strstr(file, "../") != NULL) goto err;
988
989   /* relative paths without "../" are ok */
990   if (*file != '/') return 1;
991
992   /* file must be of the format base + "/" + <1+ char filename> */
993   if (strlen(file) < _config_base_dir_len + 2) goto err;
994   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
995   if (*(file + _config_base_dir_len) != '/') goto err;
996
997   return 1;
998
999 err:
1000   if (sock != NULL && sock->fd >= 0)
1001     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1002
1003   return 0;
1004 } /* }}} static int check_file_access */
1005
1006 /* when using a base dir, convert relative paths to absolute paths.
1007  * if necessary, modifies the "filename" pointer to point
1008  * to the new path created in "tmp".  "tmp" is provided
1009  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1010  *
1011  * this allows us to optimize for the expected case (absolute path)
1012  * with a no-op.
1013  */
1014 static void get_abs_path(char **filename, char *tmp)
1015 {
1016   assert(tmp != NULL);
1017   assert(filename != NULL && *filename != NULL);
1018
1019   if (config_base_dir == NULL || **filename == '/')
1020     return;
1021
1022   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1023   *filename = tmp;
1024 } /* }}} static int get_abs_path */
1025
1026 /* returns 1 if we have the required privilege level,
1027  * otherwise issue an error to the user on sock */
1028 static int has_privilege (listen_socket_t *sock, /* {{{ */
1029                           socket_privilege priv)
1030 {
1031   if (sock == NULL) /* journal replay */
1032     return 1;
1033
1034   if (sock->privilege >= priv)
1035     return 1;
1036
1037   return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1038 } /* }}} static int has_privilege */
1039
1040 static int flush_file (const char *filename) /* {{{ */
1041 {
1042   cache_item_t *ci;
1043
1044   pthread_mutex_lock (&cache_lock);
1045
1046   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1047   if (ci == NULL)
1048   {
1049     pthread_mutex_unlock (&cache_lock);
1050     return (ENOENT);
1051   }
1052
1053   if (ci->values_num > 0)
1054   {
1055     /* Enqueue at head */
1056     enqueue_cache_item (ci, HEAD);
1057     pthread_cond_wait(&ci->flushed, &cache_lock);
1058   }
1059
1060   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1061    * may have been purged during our cond_wait() */
1062
1063   pthread_mutex_unlock(&cache_lock);
1064
1065   return (0);
1066 } /* }}} int flush_file */
1067
1068 static int handle_request_help (listen_socket_t *sock, /* {{{ */
1069     char *buffer, size_t buffer_size)
1070 {
1071   int status;
1072   char **help_text;
1073   char *command;
1074
1075   char *help_help[2] =
1076   {
1077     "Command overview\n"
1078     ,
1079     "HELP [<command>]\n"
1080     "FLUSH <filename>\n"
1081     "FLUSHALL\n"
1082     "PENDING <filename>\n"
1083     "FORGET <filename>\n"
1084     "UPDATE <filename> <values> [<values> ...]\n"
1085     "BATCH\n"
1086     "STATS\n"
1087   };
1088
1089   char *help_flush[2] =
1090   {
1091     "Help for FLUSH\n"
1092     ,
1093     "Usage: FLUSH <filename>\n"
1094     "\n"
1095     "Adds the given filename to the head of the update queue and returns\n"
1096     "after is has been dequeued.\n"
1097   };
1098
1099   char *help_flushall[2] =
1100   {
1101     "Help for FLUSHALL\n"
1102     ,
1103     "Usage: FLUSHALL\n"
1104     "\n"
1105     "Triggers writing of all pending updates.  Returns immediately.\n"
1106   };
1107
1108   char *help_pending[2] =
1109   {
1110     "Help for PENDING\n"
1111     ,
1112     "Usage: PENDING <filename>\n"
1113     "\n"
1114     "Shows any 'pending' updates for a file, in order.\n"
1115     "The updates shown have not yet been written to the underlying RRD file.\n"
1116   };
1117
1118   char *help_forget[2] =
1119   {
1120     "Help for FORGET\n"
1121     ,
1122     "Usage: FORGET <filename>\n"
1123     "\n"
1124     "Removes the file completely from the cache.\n"
1125     "Any pending updates for the file will be lost.\n"
1126   };
1127
1128   char *help_update[2] =
1129   {
1130     "Help for UPDATE\n"
1131     ,
1132     "Usage: UPDATE <filename> <values> [<values> ...]\n"
1133     "\n"
1134     "Adds the given file to the internal cache if it is not yet known and\n"
1135     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1136     "for details.\n"
1137     "\n"
1138     "Each <values> has the following form:\n"
1139     "  <values> = <time>:<value>[:<value>[...]]\n"
1140     "See the rrdupdate(1) manpage for details.\n"
1141   };
1142
1143   char *help_stats[2] =
1144   {
1145     "Help for STATS\n"
1146     ,
1147     "Usage: STATS\n"
1148     "\n"
1149     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1150     "a description of the values.\n"
1151   };
1152
1153   char *help_batch[2] =
1154   {
1155     "Help for BATCH\n"
1156     ,
1157     "The 'BATCH' command permits the client to initiate a bulk load\n"
1158     "   of commands to rrdcached.\n"
1159     "\n"
1160     "Usage:\n"
1161     "\n"
1162     "    client: BATCH\n"
1163     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1164     "    client: command #1\n"
1165     "    client: command #2\n"
1166     "    client: ... and so on\n"
1167     "    client: .\n"
1168     "    server: 2 errors\n"
1169     "    server: 7 message for command #7\n"
1170     "    server: 9 message for command #9\n"
1171     "\n"
1172     "For more information, consult the rrdcached(1) documentation.\n"
1173   };
1174
1175   status = buffer_get_field (&buffer, &buffer_size, &command);
1176   if (status != 0)
1177     help_text = help_help;
1178   else
1179   {
1180     if (strcasecmp (command, "update") == 0)
1181       help_text = help_update;
1182     else if (strcasecmp (command, "flush") == 0)
1183       help_text = help_flush;
1184     else if (strcasecmp (command, "flushall") == 0)
1185       help_text = help_flushall;
1186     else if (strcasecmp (command, "pending") == 0)
1187       help_text = help_pending;
1188     else if (strcasecmp (command, "forget") == 0)
1189       help_text = help_forget;
1190     else if (strcasecmp (command, "stats") == 0)
1191       help_text = help_stats;
1192     else if (strcasecmp (command, "batch") == 0)
1193       help_text = help_batch;
1194     else
1195       help_text = help_help;
1196   }
1197
1198   add_response_info(sock, help_text[1]);
1199   return send_response(sock, RESP_OK, help_text[0]);
1200 } /* }}} int handle_request_help */
1201
1202 static int handle_request_stats (listen_socket_t *sock) /* {{{ */
1203 {
1204   uint64_t copy_queue_length;
1205   uint64_t copy_updates_received;
1206   uint64_t copy_flush_received;
1207   uint64_t copy_updates_written;
1208   uint64_t copy_data_sets_written;
1209   uint64_t copy_journal_bytes;
1210   uint64_t copy_journal_rotate;
1211
1212   uint64_t tree_nodes_number;
1213   uint64_t tree_depth;
1214
1215   pthread_mutex_lock (&stats_lock);
1216   copy_queue_length       = stats_queue_length;
1217   copy_updates_received   = stats_updates_received;
1218   copy_flush_received     = stats_flush_received;
1219   copy_updates_written    = stats_updates_written;
1220   copy_data_sets_written  = stats_data_sets_written;
1221   copy_journal_bytes      = stats_journal_bytes;
1222   copy_journal_rotate     = stats_journal_rotate;
1223   pthread_mutex_unlock (&stats_lock);
1224
1225   pthread_mutex_lock (&cache_lock);
1226   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1227   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1228   pthread_mutex_unlock (&cache_lock);
1229
1230   add_response_info(sock,
1231                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1232   add_response_info(sock,
1233                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1234   add_response_info(sock,
1235                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1236   add_response_info(sock,
1237                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1238   add_response_info(sock,
1239                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1240   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1241   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1242   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1243   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1244
1245   send_response(sock, RESP_OK, "Statistics follow\n");
1246
1247   return (0);
1248 } /* }}} int handle_request_stats */
1249
1250 static int handle_request_flush (listen_socket_t *sock, /* {{{ */
1251     char *buffer, size_t buffer_size)
1252 {
1253   char *file, file_tmp[PATH_MAX];
1254   int status;
1255
1256   status = buffer_get_field (&buffer, &buffer_size, &file);
1257   if (status != 0)
1258   {
1259     return send_response(sock, RESP_ERR, "Usage: flush <filename>\n");
1260   }
1261   else
1262   {
1263     pthread_mutex_lock(&stats_lock);
1264     stats_flush_received++;
1265     pthread_mutex_unlock(&stats_lock);
1266
1267     get_abs_path(&file, file_tmp);
1268     if (!check_file_access(file, sock)) return 0;
1269
1270     status = flush_file (file);
1271     if (status == 0)
1272       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1273     else if (status == ENOENT)
1274     {
1275       /* no file in our tree; see whether it exists at all */
1276       struct stat statbuf;
1277
1278       memset(&statbuf, 0, sizeof(statbuf));
1279       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1280         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1281       else
1282         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1283     }
1284     else if (status < 0)
1285       return send_response(sock, RESP_ERR, "Internal error.\n");
1286     else
1287       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1288   }
1289
1290   /* NOTREACHED */
1291   assert(1==0);
1292 } /* }}} int handle_request_flush */
1293
1294 static int handle_request_flushall(listen_socket_t *sock) /* {{{ */
1295 {
1296   int status;
1297
1298   status = has_privilege(sock, PRIV_HIGH);
1299   if (status <= 0)
1300     return status;
1301
1302   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1303
1304   pthread_mutex_lock(&cache_lock);
1305   flush_old_values(-1);
1306   pthread_mutex_unlock(&cache_lock);
1307
1308   return send_response(sock, RESP_OK, "Started flush.\n");
1309 } /* }}} static int handle_request_flushall */
1310
1311 static int handle_request_pending(listen_socket_t *sock, /* {{{ */
1312                                   char *buffer, size_t buffer_size)
1313 {
1314   int status;
1315   char *file, file_tmp[PATH_MAX];
1316   cache_item_t *ci;
1317
1318   status = buffer_get_field(&buffer, &buffer_size, &file);
1319   if (status != 0)
1320     return send_response(sock, RESP_ERR,
1321                          "Usage: PENDING <filename>\n");
1322
1323   status = has_privilege(sock, PRIV_HIGH);
1324   if (status <= 0)
1325     return status;
1326
1327   get_abs_path(&file, file_tmp);
1328
1329   pthread_mutex_lock(&cache_lock);
1330   ci = g_tree_lookup(cache_tree, file);
1331   if (ci == NULL)
1332   {
1333     pthread_mutex_unlock(&cache_lock);
1334     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1335   }
1336
1337   for (int i=0; i < ci->values_num; i++)
1338     add_response_info(sock, "%s\n", ci->values[i]);
1339
1340   pthread_mutex_unlock(&cache_lock);
1341   return send_response(sock, RESP_OK, "updates pending\n");
1342 } /* }}} static int handle_request_pending */
1343
1344 static int handle_request_forget(listen_socket_t *sock, /* {{{ */
1345                                  char *buffer, size_t buffer_size)
1346 {
1347   int status;
1348   char *file, file_tmp[PATH_MAX];
1349
1350   status = buffer_get_field(&buffer, &buffer_size, &file);
1351   if (status != 0)
1352     return send_response(sock, RESP_ERR,
1353                          "Usage: FORGET <filename>\n");
1354
1355   status = has_privilege(sock, PRIV_HIGH);
1356   if (status <= 0)
1357     return status;
1358
1359   get_abs_path(&file, file_tmp);
1360   if (!check_file_access(file, sock)) return 0;
1361
1362   pthread_mutex_lock(&cache_lock);
1363   status = forget_file(file);
1364   pthread_mutex_unlock(&cache_lock);
1365
1366   if (status == 0)
1367   {
1368     if (sock != NULL)
1369       journal_write("forget", file);
1370
1371     return send_response(sock, RESP_OK, "Gone!\n");
1372   }
1373   else
1374     return send_response(sock, RESP_ERR, "cannot forget: %s\n",
1375                          status < 0 ? "Internal error" : rrd_strerror(status));
1376
1377   /* NOTREACHED */
1378   assert(1==0);
1379 } /* }}} static int handle_request_forget */
1380
1381 static int handle_request_update (listen_socket_t *sock, /* {{{ */
1382                                   time_t now,
1383                                   char *buffer, size_t buffer_size)
1384 {
1385   char *file, file_tmp[PATH_MAX];
1386   int values_num = 0;
1387   int bad_timestamps = 0;
1388   int status;
1389   char orig_buf[CMD_MAX];
1390
1391   cache_item_t *ci;
1392
1393   status = has_privilege(sock, PRIV_HIGH);
1394   if (status <= 0)
1395     return status;
1396
1397   /* save it for the journal later */
1398   strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1399
1400   status = buffer_get_field (&buffer, &buffer_size, &file);
1401   if (status != 0)
1402     return send_response(sock, RESP_ERR,
1403                          "Usage: UPDATE <filename> <values> [<values> ...]\n");
1404
1405   pthread_mutex_lock(&stats_lock);
1406   stats_updates_received++;
1407   pthread_mutex_unlock(&stats_lock);
1408
1409   get_abs_path(&file, file_tmp);
1410   if (!check_file_access(file, sock)) return 0;
1411
1412   pthread_mutex_lock (&cache_lock);
1413   ci = g_tree_lookup (cache_tree, file);
1414
1415   if (ci == NULL) /* {{{ */
1416   {
1417     struct stat statbuf;
1418
1419     /* don't hold the lock while we setup; stat(2) might block */
1420     pthread_mutex_unlock(&cache_lock);
1421
1422     memset (&statbuf, 0, sizeof (statbuf));
1423     status = stat (file, &statbuf);
1424     if (status != 0)
1425     {
1426       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1427
1428       status = errno;
1429       if (status == ENOENT)
1430         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1431       else
1432         return send_response(sock, RESP_ERR,
1433                              "stat failed with error %i.\n", status);
1434     }
1435     if (!S_ISREG (statbuf.st_mode))
1436       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1437
1438     if (access(file, R_OK|W_OK) != 0)
1439       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1440                            file, rrd_strerror(errno));
1441
1442     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1443     if (ci == NULL)
1444     {
1445       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1446
1447       return send_response(sock, RESP_ERR, "malloc failed.\n");
1448     }
1449     memset (ci, 0, sizeof (cache_item_t));
1450
1451     ci->file = strdup (file);
1452     if (ci->file == NULL)
1453     {
1454       free (ci);
1455       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1456
1457       return send_response(sock, RESP_ERR, "strdup failed.\n");
1458     }
1459
1460     wipe_ci_values(ci, now);
1461     ci->flags = CI_FLAGS_IN_TREE;
1462     pthread_cond_init(&ci->flushed, NULL);
1463
1464     pthread_mutex_lock(&cache_lock);
1465     g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1466   } /* }}} */
1467   assert (ci != NULL);
1468
1469   /* don't re-write updates in replay mode */
1470   if (sock != NULL)
1471     journal_write("update", orig_buf);
1472
1473   while (buffer_size > 0)
1474   {
1475     char **temp;
1476     char *value;
1477     time_t stamp;
1478     char *eostamp;
1479
1480     status = buffer_get_field (&buffer, &buffer_size, &value);
1481     if (status != 0)
1482     {
1483       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1484       break;
1485     }
1486
1487     /* make sure update time is always moving forward */
1488     stamp = strtol(value, &eostamp, 10);
1489     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1490     {
1491       ++bad_timestamps;
1492       add_response_info(sock, "Cannot find timestamp in '%s'!\n", value);
1493       continue;
1494     }
1495     else if (stamp <= ci->last_update_stamp)
1496     {
1497       ++bad_timestamps;
1498       add_response_info(sock,
1499                         "illegal attempt to update using time %ld when"
1500                         " last update time is %ld (minimum one second step)\n",
1501                         stamp, ci->last_update_stamp);
1502       continue;
1503     }
1504     else
1505       ci->last_update_stamp = stamp;
1506
1507     temp = (char **) realloc (ci->values,
1508         sizeof (char *) * (ci->values_num + 1));
1509     if (temp == NULL)
1510     {
1511       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1512       continue;
1513     }
1514     ci->values = temp;
1515
1516     ci->values[ci->values_num] = strdup (value);
1517     if (ci->values[ci->values_num] == NULL)
1518     {
1519       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1520       continue;
1521     }
1522     ci->values_num++;
1523
1524     values_num++;
1525   }
1526
1527   if (((now - ci->last_flush_time) >= config_write_interval)
1528       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1529       && (ci->values_num > 0))
1530   {
1531     enqueue_cache_item (ci, TAIL);
1532   }
1533
1534   pthread_mutex_unlock (&cache_lock);
1535
1536   if (values_num < 1)
1537   {
1538     /* journal replay mode */
1539     if (sock == NULL) return RESP_ERR;
1540
1541     /* if we had only one update attempt, then return the full
1542        error message... try to get the most information out
1543        of the limited error space allowed by the protocol
1544     */
1545     if (bad_timestamps == 1)
1546       return send_response(sock, RESP_ERR, "%s", sock->wbuf);
1547     else
1548       return send_response(sock, RESP_ERR,
1549                            "No values updated (%d bad timestamps).\n",
1550                            bad_timestamps);
1551   }
1552   else
1553     return send_response(sock, RESP_OK,
1554                          "errors, enqueued %i value(s).\n", values_num);
1555
1556   /* NOTREACHED */
1557   assert(1==0);
1558
1559 } /* }}} int handle_request_update */
1560
1561 /* we came across a "WROTE" entry during journal replay.
1562  * throw away any values that we have accumulated for this file
1563  */
1564 static int handle_request_wrote (const char *buffer, time_t now) /* {{{ */
1565 {
1566   int i;
1567   cache_item_t *ci;
1568   const char *file = buffer;
1569
1570   pthread_mutex_lock(&cache_lock);
1571
1572   ci = g_tree_lookup(cache_tree, file);
1573   if (ci == NULL)
1574   {
1575     pthread_mutex_unlock(&cache_lock);
1576     return (0);
1577   }
1578
1579   if (ci->values)
1580   {
1581     for (i=0; i < ci->values_num; i++)
1582       free(ci->values[i]);
1583
1584     free(ci->values);
1585   }
1586
1587   wipe_ci_values(ci, now);
1588   remove_from_queue(ci);
1589
1590   pthread_mutex_unlock(&cache_lock);
1591   return (0);
1592 } /* }}} int handle_request_wrote */
1593
1594 /* start "BATCH" processing */
1595 static int batch_start (listen_socket_t *sock) /* {{{ */
1596 {
1597   int status;
1598   if (sock->batch_start)
1599     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1600
1601   status = send_response(sock, RESP_OK,
1602                          "Go ahead.  End with dot '.' on its own line.\n");
1603   sock->batch_start = time(NULL);
1604   sock->batch_cmd = 0;
1605
1606   return status;
1607 } /* }}} static int batch_start */
1608
1609 /* finish "BATCH" processing and return results to the client */
1610 static int batch_done (listen_socket_t *sock) /* {{{ */
1611 {
1612   assert(sock->batch_start);
1613   sock->batch_start = 0;
1614   sock->batch_cmd  = 0;
1615   return send_response(sock, RESP_OK, "errors\n");
1616 } /* }}} static int batch_done */
1617
1618 /* if sock==NULL, we are in journal replay mode */
1619 static int handle_request (listen_socket_t *sock, /* {{{ */
1620                            time_t now,
1621                            char *buffer, size_t buffer_size)
1622 {
1623   char *buffer_ptr;
1624   char *command;
1625   int status;
1626
1627   assert (buffer[buffer_size - 1] == '\0');
1628
1629   buffer_ptr = buffer;
1630   command = NULL;
1631   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1632   if (status != 0)
1633   {
1634     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1635     return (-1);
1636   }
1637
1638   if (sock != NULL && sock->batch_start)
1639     sock->batch_cmd++;
1640
1641   if (strcasecmp (command, "update") == 0)
1642     return (handle_request_update (sock, now, buffer_ptr, buffer_size));
1643   else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
1644   {
1645     /* this is only valid in replay mode */
1646     return (handle_request_wrote (buffer_ptr, now));
1647   }
1648   else if (strcasecmp (command, "flush") == 0)
1649     return (handle_request_flush (sock, buffer_ptr, buffer_size));
1650   else if (strcasecmp (command, "flushall") == 0)
1651     return (handle_request_flushall(sock));
1652   else if (strcasecmp (command, "pending") == 0)
1653     return (handle_request_pending(sock, buffer_ptr, buffer_size));
1654   else if (strcasecmp (command, "forget") == 0)
1655     return (handle_request_forget(sock, buffer_ptr, buffer_size));
1656   else if (strcasecmp (command, "stats") == 0)
1657     return (handle_request_stats (sock));
1658   else if (strcasecmp (command, "help") == 0)
1659     return (handle_request_help (sock, buffer_ptr, buffer_size));
1660   else if (strcasecmp (command, "batch") == 0 && sock != NULL)
1661     return batch_start(sock);
1662   else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_start)
1663     return batch_done(sock);
1664   else
1665     return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
1666
1667   /* NOTREACHED */
1668   assert(1==0);
1669 } /* }}} int handle_request */
1670
1671 /* MUST NOT hold journal_lock before calling this */
1672 static void journal_rotate(void) /* {{{ */
1673 {
1674   FILE *old_fh = NULL;
1675   int new_fd;
1676
1677   if (journal_cur == NULL || journal_old == NULL)
1678     return;
1679
1680   pthread_mutex_lock(&journal_lock);
1681
1682   /* we rotate this way (rename before close) so that the we can release
1683    * the journal lock as fast as possible.  Journal writes to the new
1684    * journal can proceed immediately after the new file is opened.  The
1685    * fclose can then block without affecting new updates.
1686    */
1687   if (journal_fh != NULL)
1688   {
1689     old_fh = journal_fh;
1690     journal_fh = NULL;
1691     rename(journal_cur, journal_old);
1692     ++stats_journal_rotate;
1693   }
1694
1695   new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1696                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1697   if (new_fd >= 0)
1698   {
1699     journal_fh = fdopen(new_fd, "a");
1700     if (journal_fh == NULL)
1701       close(new_fd);
1702   }
1703
1704   pthread_mutex_unlock(&journal_lock);
1705
1706   if (old_fh != NULL)
1707     fclose(old_fh);
1708
1709   if (journal_fh == NULL)
1710   {
1711     RRDD_LOG(LOG_CRIT,
1712              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1713              journal_cur, rrd_strerror(errno));
1714
1715     RRDD_LOG(LOG_ERR,
1716              "JOURNALING DISABLED: All values will be flushed at shutdown");
1717     config_flush_at_shutdown = 1;
1718   }
1719
1720 } /* }}} static void journal_rotate */
1721
1722 static void journal_done(void) /* {{{ */
1723 {
1724   if (journal_cur == NULL)
1725     return;
1726
1727   pthread_mutex_lock(&journal_lock);
1728   if (journal_fh != NULL)
1729   {
1730     fclose(journal_fh);
1731     journal_fh = NULL;
1732   }
1733
1734   if (config_flush_at_shutdown)
1735   {
1736     RRDD_LOG(LOG_INFO, "removing journals");
1737     unlink(journal_old);
1738     unlink(journal_cur);
1739   }
1740   else
1741   {
1742     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1743              "journals will be used at next startup");
1744   }
1745
1746   pthread_mutex_unlock(&journal_lock);
1747
1748 } /* }}} static void journal_done */
1749
1750 static int journal_write(char *cmd, char *args) /* {{{ */
1751 {
1752   int chars;
1753
1754   if (journal_fh == NULL)
1755     return 0;
1756
1757   pthread_mutex_lock(&journal_lock);
1758   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1759   pthread_mutex_unlock(&journal_lock);
1760
1761   if (chars > 0)
1762   {
1763     pthread_mutex_lock(&stats_lock);
1764     stats_journal_bytes += chars;
1765     pthread_mutex_unlock(&stats_lock);
1766   }
1767
1768   return chars;
1769 } /* }}} static int journal_write */
1770
1771 static int journal_replay (const char *file) /* {{{ */
1772 {
1773   FILE *fh;
1774   int entry_cnt = 0;
1775   int fail_cnt = 0;
1776   uint64_t line = 0;
1777   char entry[CMD_MAX];
1778   time_t now;
1779
1780   if (file == NULL) return 0;
1781
1782   {
1783     char *reason;
1784     int status = 0;
1785     struct stat statbuf;
1786
1787     memset(&statbuf, 0, sizeof(statbuf));
1788     if (stat(file, &statbuf) != 0)
1789     {
1790       if (errno == ENOENT)
1791         return 0;
1792
1793       reason = "stat error";
1794       status = errno;
1795     }
1796     else if (!S_ISREG(statbuf.st_mode))
1797     {
1798       reason = "not a regular file";
1799       status = EPERM;
1800     }
1801     if (statbuf.st_uid != daemon_uid)
1802     {
1803       reason = "not owned by daemon user";
1804       status = EACCES;
1805     }
1806     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1807     {
1808       reason = "must not be user/group writable";
1809       status = EACCES;
1810     }
1811
1812     if (status != 0)
1813     {
1814       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1815                file, rrd_strerror(status), reason);
1816       return 0;
1817     }
1818   }
1819
1820   fh = fopen(file, "r");
1821   if (fh == NULL)
1822   {
1823     if (errno != ENOENT)
1824       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1825                file, rrd_strerror(errno));
1826     return 0;
1827   }
1828   else
1829     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1830
1831   now = time(NULL);
1832
1833   while(!feof(fh))
1834   {
1835     size_t entry_len;
1836
1837     ++line;
1838     if (fgets(entry, sizeof(entry), fh) == NULL)
1839       break;
1840     entry_len = strlen(entry);
1841
1842     /* check \n termination in case journal writing crashed mid-line */
1843     if (entry_len == 0)
1844       continue;
1845     else if (entry[entry_len - 1] != '\n')
1846     {
1847       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1848       ++fail_cnt;
1849       continue;
1850     }
1851
1852     entry[entry_len - 1] = '\0';
1853
1854     if (handle_request(NULL, now, entry, entry_len) == 0)
1855       ++entry_cnt;
1856     else
1857       ++fail_cnt;
1858   }
1859
1860   fclose(fh);
1861
1862   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1863            entry_cnt, fail_cnt);
1864
1865   return entry_cnt > 0 ? 1 : 0;
1866 } /* }}} static int journal_replay */
1867
1868 static void journal_init(void) /* {{{ */
1869 {
1870   int had_journal = 0;
1871
1872   if (journal_cur == NULL) return;
1873
1874   pthread_mutex_lock(&journal_lock);
1875
1876   RRDD_LOG(LOG_INFO, "checking for journal files");
1877
1878   had_journal += journal_replay(journal_old);
1879   had_journal += journal_replay(journal_cur);
1880
1881   /* it must have been a crash.  start a flush */
1882   if (had_journal && config_flush_at_shutdown)
1883     flush_old_values(-1);
1884
1885   pthread_mutex_unlock(&journal_lock);
1886   journal_rotate();
1887
1888   RRDD_LOG(LOG_INFO, "journal processing complete");
1889
1890 } /* }}} static void journal_init */
1891
1892 static void close_connection(listen_socket_t *sock)
1893 {
1894   close(sock->fd) ;  sock->fd   = -1;
1895   free(sock->rbuf);  sock->rbuf = NULL;
1896   free(sock->wbuf);  sock->wbuf = NULL;
1897
1898   free(sock);
1899 }
1900
1901 static void *connection_thread_main (void *args) /* {{{ */
1902 {
1903   pthread_t self;
1904   listen_socket_t *sock;
1905   int i;
1906   int fd;
1907
1908   sock = (listen_socket_t *) args;
1909   fd = sock->fd;
1910
1911   /* init read buffers */
1912   sock->next_read = sock->next_cmd = 0;
1913   sock->rbuf = malloc(RBUF_SIZE);
1914   if (sock->rbuf == NULL)
1915   {
1916     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1917     close_connection(sock);
1918     return NULL;
1919   }
1920
1921   pthread_mutex_lock (&connection_threads_lock);
1922   {
1923     pthread_t *temp;
1924
1925     temp = (pthread_t *) realloc (connection_threads,
1926         sizeof (pthread_t) * (connection_threads_num + 1));
1927     if (temp == NULL)
1928     {
1929       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1930     }
1931     else
1932     {
1933       connection_threads = temp;
1934       connection_threads[connection_threads_num] = pthread_self ();
1935       connection_threads_num++;
1936     }
1937   }
1938   pthread_mutex_unlock (&connection_threads_lock);
1939
1940   while (do_shutdown == 0)
1941   {
1942     char *cmd;
1943     ssize_t cmd_len;
1944     ssize_t rbytes;
1945     time_t now;
1946
1947     struct pollfd pollfd;
1948     int status;
1949
1950     pollfd.fd = fd;
1951     pollfd.events = POLLIN | POLLPRI;
1952     pollfd.revents = 0;
1953
1954     status = poll (&pollfd, 1, /* timeout = */ 500);
1955     if (do_shutdown)
1956       break;
1957     else if (status == 0) /* timeout */
1958       continue;
1959     else if (status < 0) /* error */
1960     {
1961       status = errno;
1962       if (status != EINTR)
1963         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1964       continue;
1965     }
1966
1967     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1968       break;
1969     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1970     {
1971       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1972           "poll(2) returned something unexpected: %#04hx",
1973           pollfd.revents);
1974       break;
1975     }
1976
1977     rbytes = read(fd, sock->rbuf + sock->next_read,
1978                   RBUF_SIZE - sock->next_read);
1979     if (rbytes < 0)
1980     {
1981       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
1982       break;
1983     }
1984     else if (rbytes == 0)
1985       break; /* eof */
1986
1987     sock->next_read += rbytes;
1988
1989     if (sock->batch_start)
1990       now = sock->batch_start;
1991     else
1992       now = time(NULL);
1993
1994     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
1995     {
1996       status = handle_request (sock, now, cmd, cmd_len+1);
1997       if (status != 0)
1998         goto out_close;
1999     }
2000   }
2001
2002 out_close:
2003   close_connection(sock);
2004
2005   self = pthread_self ();
2006   /* Remove this thread from the connection threads list */
2007   pthread_mutex_lock (&connection_threads_lock);
2008   /* Find out own index in the array */
2009   for (i = 0; i < connection_threads_num; i++)
2010     if (pthread_equal (connection_threads[i], self) != 0)
2011       break;
2012   assert (i < connection_threads_num);
2013
2014   /* Move the trailing threads forward. */
2015   if (i < (connection_threads_num - 1))
2016   {
2017     memmove (connection_threads + i,
2018         connection_threads + i + 1,
2019         sizeof (pthread_t) * (connection_threads_num - i - 1));
2020   }
2021
2022   connection_threads_num--;
2023   pthread_mutex_unlock (&connection_threads_lock);
2024
2025   return (NULL);
2026 } /* }}} void *connection_thread_main */
2027
2028 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2029 {
2030   int fd;
2031   struct sockaddr_un sa;
2032   listen_socket_t *temp;
2033   int status;
2034   const char *path;
2035
2036   path = sock->addr;
2037   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2038     path += strlen("unix:");
2039
2040   temp = (listen_socket_t *) realloc (listen_fds,
2041       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2042   if (temp == NULL)
2043   {
2044     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2045     return (-1);
2046   }
2047   listen_fds = temp;
2048   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2049
2050   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2051   if (fd < 0)
2052   {
2053     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2054              rrd_strerror(errno));
2055     return (-1);
2056   }
2057
2058   memset (&sa, 0, sizeof (sa));
2059   sa.sun_family = AF_UNIX;
2060   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2061
2062   /* if we've gotten this far, we own the pid file.  any daemon started
2063    * with the same args must not be alive.  therefore, ensure that we can
2064    * create the socket...
2065    */
2066   unlink(path);
2067
2068   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2069   if (status != 0)
2070   {
2071     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2072              path, rrd_strerror(errno));
2073     close (fd);
2074     return (-1);
2075   }
2076
2077   status = listen (fd, /* backlog = */ 10);
2078   if (status != 0)
2079   {
2080     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2081              path, rrd_strerror(errno));
2082     close (fd);
2083     unlink (path);
2084     return (-1);
2085   }
2086
2087   listen_fds[listen_fds_num].fd = fd;
2088   listen_fds[listen_fds_num].family = PF_UNIX;
2089   strncpy(listen_fds[listen_fds_num].addr, path,
2090           sizeof (listen_fds[listen_fds_num].addr) - 1);
2091   listen_fds_num++;
2092
2093   return (0);
2094 } /* }}} int open_listen_socket_unix */
2095
2096 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2097 {
2098   struct addrinfo ai_hints;
2099   struct addrinfo *ai_res;
2100   struct addrinfo *ai_ptr;
2101   char addr_copy[NI_MAXHOST];
2102   char *addr;
2103   char *port;
2104   int status;
2105
2106   strncpy (addr_copy, sock->addr, sizeof (addr_copy));
2107   addr_copy[sizeof (addr_copy) - 1] = 0;
2108   addr = addr_copy;
2109
2110   memset (&ai_hints, 0, sizeof (ai_hints));
2111   ai_hints.ai_flags = 0;
2112 #ifdef AI_ADDRCONFIG
2113   ai_hints.ai_flags |= AI_ADDRCONFIG;
2114 #endif
2115   ai_hints.ai_family = AF_UNSPEC;
2116   ai_hints.ai_socktype = SOCK_STREAM;
2117
2118   port = NULL;
2119   if (*addr == '[') /* IPv6+port format */
2120   {
2121     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2122     addr++;
2123
2124     port = strchr (addr, ']');
2125     if (port == NULL)
2126     {
2127       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2128       return (-1);
2129     }
2130     *port = 0;
2131     port++;
2132
2133     if (*port == ':')
2134       port++;
2135     else if (*port == 0)
2136       port = NULL;
2137     else
2138     {
2139       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2140       return (-1);
2141     }
2142   } /* if (*addr = ']') */
2143   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2144   {
2145     port = rindex(addr, ':');
2146     if (port != NULL)
2147     {
2148       *port = 0;
2149       port++;
2150     }
2151   }
2152   ai_res = NULL;
2153   status = getaddrinfo (addr,
2154                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2155                         &ai_hints, &ai_res);
2156   if (status != 0)
2157   {
2158     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2159              addr, gai_strerror (status));
2160     return (-1);
2161   }
2162
2163   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2164   {
2165     int fd;
2166     listen_socket_t *temp;
2167     int one = 1;
2168
2169     temp = (listen_socket_t *) realloc (listen_fds,
2170         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2171     if (temp == NULL)
2172     {
2173       fprintf (stderr,
2174                "rrdcached: open_listen_socket_network: realloc failed.\n");
2175       continue;
2176     }
2177     listen_fds = temp;
2178     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2179
2180     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2181     if (fd < 0)
2182     {
2183       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2184                rrd_strerror(errno));
2185       continue;
2186     }
2187
2188     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2189
2190     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2191     if (status != 0)
2192     {
2193       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2194                sock->addr, rrd_strerror(errno));
2195       close (fd);
2196       continue;
2197     }
2198
2199     status = listen (fd, /* backlog = */ 10);
2200     if (status != 0)
2201     {
2202       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2203                sock->addr, rrd_strerror(errno));
2204       close (fd);
2205       return (-1);
2206     }
2207
2208     listen_fds[listen_fds_num].fd = fd;
2209     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2210     listen_fds_num++;
2211   } /* for (ai_ptr) */
2212
2213   return (0);
2214 } /* }}} static int open_listen_socket_network */
2215
2216 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2217 {
2218   assert(sock != NULL);
2219   assert(sock->addr != NULL);
2220
2221   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2222       || sock->addr[0] == '/')
2223     return (open_listen_socket_unix(sock));
2224   else
2225     return (open_listen_socket_network(sock));
2226 } /* }}} int open_listen_socket */
2227
2228 static int close_listen_sockets (void) /* {{{ */
2229 {
2230   size_t i;
2231
2232   for (i = 0; i < listen_fds_num; i++)
2233   {
2234     close (listen_fds[i].fd);
2235
2236     if (listen_fds[i].family == PF_UNIX)
2237       unlink(listen_fds[i].addr);
2238   }
2239
2240   free (listen_fds);
2241   listen_fds = NULL;
2242   listen_fds_num = 0;
2243
2244   return (0);
2245 } /* }}} int close_listen_sockets */
2246
2247 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2248 {
2249   struct pollfd *pollfds;
2250   int pollfds_num;
2251   int status;
2252   int i;
2253
2254   if (listen_fds_num < 1)
2255   {
2256     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2257     return (NULL);
2258   }
2259
2260   pollfds_num = listen_fds_num;
2261   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2262   if (pollfds == NULL)
2263   {
2264     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2265     return (NULL);
2266   }
2267   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2268
2269   RRDD_LOG(LOG_INFO, "listening for connections");
2270
2271   while (do_shutdown == 0)
2272   {
2273     assert (pollfds_num == ((int) listen_fds_num));
2274     for (i = 0; i < pollfds_num; i++)
2275     {
2276       pollfds[i].fd = listen_fds[i].fd;
2277       pollfds[i].events = POLLIN | POLLPRI;
2278       pollfds[i].revents = 0;
2279     }
2280
2281     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2282     if (do_shutdown)
2283       break;
2284     else if (status == 0) /* timeout */
2285       continue;
2286     else if (status < 0) /* error */
2287     {
2288       status = errno;
2289       if (status != EINTR)
2290       {
2291         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2292       }
2293       continue;
2294     }
2295
2296     for (i = 0; i < pollfds_num; i++)
2297     {
2298       listen_socket_t *client_sock;
2299       struct sockaddr_storage client_sa;
2300       socklen_t client_sa_size;
2301       pthread_t tid;
2302       pthread_attr_t attr;
2303
2304       if (pollfds[i].revents == 0)
2305         continue;
2306
2307       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2308       {
2309         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2310             "poll(2) returned something unexpected for listen FD #%i.",
2311             pollfds[i].fd);
2312         continue;
2313       }
2314
2315       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2316       if (client_sock == NULL)
2317       {
2318         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2319         continue;
2320       }
2321       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2322
2323       client_sa_size = sizeof (client_sa);
2324       client_sock->fd = accept (pollfds[i].fd,
2325           (struct sockaddr *) &client_sa, &client_sa_size);
2326       if (client_sock->fd < 0)
2327       {
2328         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2329         free(client_sock);
2330         continue;
2331       }
2332
2333       pthread_attr_init (&attr);
2334       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2335
2336       status = pthread_create (&tid, &attr, connection_thread_main,
2337                                client_sock);
2338       if (status != 0)
2339       {
2340         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2341         close_connection(client_sock);
2342         continue;
2343       }
2344     } /* for (pollfds_num) */
2345   } /* while (do_shutdown == 0) */
2346
2347   RRDD_LOG(LOG_INFO, "starting shutdown");
2348
2349   close_listen_sockets ();
2350
2351   pthread_mutex_lock (&connection_threads_lock);
2352   while (connection_threads_num > 0)
2353   {
2354     pthread_t wait_for;
2355
2356     wait_for = connection_threads[0];
2357
2358     pthread_mutex_unlock (&connection_threads_lock);
2359     pthread_join (wait_for, /* retval = */ NULL);
2360     pthread_mutex_lock (&connection_threads_lock);
2361   }
2362   pthread_mutex_unlock (&connection_threads_lock);
2363
2364   return (NULL);
2365 } /* }}} void *listen_thread_main */
2366
2367 static int daemonize (void) /* {{{ */
2368 {
2369   int pid_fd;
2370   char *base_dir;
2371
2372   daemon_uid = geteuid();
2373
2374   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2375   if (pid_fd < 0)
2376     pid_fd = check_pidfile();
2377   if (pid_fd < 0)
2378     return pid_fd;
2379
2380   /* open all the listen sockets */
2381   if (config_listen_address_list_len > 0)
2382   {
2383     for (int i = 0; i < config_listen_address_list_len; i++)
2384       open_listen_socket (config_listen_address_list[i]);
2385   }
2386   else
2387   {
2388     listen_socket_t sock;
2389     memset(&sock, 0, sizeof(sock));
2390     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
2391     open_listen_socket (&sock);
2392   }
2393
2394   if (listen_fds_num < 1)
2395   {
2396     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2397     goto error;
2398   }
2399
2400   if (!stay_foreground)
2401   {
2402     pid_t child;
2403
2404     child = fork ();
2405     if (child < 0)
2406     {
2407       fprintf (stderr, "daemonize: fork(2) failed.\n");
2408       goto error;
2409     }
2410     else if (child > 0)
2411       exit(0);
2412
2413     /* Become session leader */
2414     setsid ();
2415
2416     /* Open the first three file descriptors to /dev/null */
2417     close (2);
2418     close (1);
2419     close (0);
2420
2421     open ("/dev/null", O_RDWR);
2422     dup (0);
2423     dup (0);
2424   } /* if (!stay_foreground) */
2425
2426   /* Change into the /tmp directory. */
2427   base_dir = (config_base_dir != NULL)
2428     ? config_base_dir
2429     : "/tmp";
2430
2431   if (chdir (base_dir) != 0)
2432   {
2433     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2434     goto error;
2435   }
2436
2437   install_signal_handlers();
2438
2439   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2440   RRDD_LOG(LOG_INFO, "starting up");
2441
2442   cache_tree = g_tree_new ((GCompareFunc) strcmp);
2443   if (cache_tree == NULL)
2444   {
2445     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2446     goto error;
2447   }
2448
2449   return write_pidfile (pid_fd);
2450
2451 error:
2452   remove_pidfile();
2453   return -1;
2454 } /* }}} int daemonize */
2455
2456 static int cleanup (void) /* {{{ */
2457 {
2458   do_shutdown++;
2459
2460   pthread_cond_signal (&cache_cond);
2461   pthread_join (queue_thread, /* return = */ NULL);
2462
2463   remove_pidfile ();
2464
2465   RRDD_LOG(LOG_INFO, "goodbye");
2466   closelog ();
2467
2468   return (0);
2469 } /* }}} int cleanup */
2470
2471 static int read_options (int argc, char **argv) /* {{{ */
2472 {
2473   int option;
2474   int status = 0;
2475
2476   while ((option = getopt(argc, argv, "gl:L:f:w:b:Bz:p:j:h?F")) != -1)
2477   {
2478     switch (option)
2479     {
2480       case 'g':
2481         stay_foreground=1;
2482         break;
2483
2484       case 'L':
2485       case 'l':
2486       {
2487         listen_socket_t **temp;
2488         listen_socket_t *new;
2489
2490         new = malloc(sizeof(listen_socket_t));
2491         if (new == NULL)
2492         {
2493           fprintf(stderr, "read_options: malloc failed.\n");
2494           return(2);
2495         }
2496         memset(new, 0, sizeof(listen_socket_t));
2497
2498         temp = (listen_socket_t **) realloc (config_listen_address_list,
2499             sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2500         if (temp == NULL)
2501         {
2502           fprintf (stderr, "read_options: realloc failed.\n");
2503           return (2);
2504         }
2505         config_listen_address_list = temp;
2506
2507         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2508         new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2509
2510         temp[config_listen_address_list_len] = new;
2511         config_listen_address_list_len++;
2512       }
2513       break;
2514
2515       case 'f':
2516       {
2517         int temp;
2518
2519         temp = atoi (optarg);
2520         if (temp > 0)
2521           config_flush_interval = temp;
2522         else
2523         {
2524           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2525           status = 3;
2526         }
2527       }
2528       break;
2529
2530       case 'w':
2531       {
2532         int temp;
2533
2534         temp = atoi (optarg);
2535         if (temp > 0)
2536           config_write_interval = temp;
2537         else
2538         {
2539           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2540           status = 2;
2541         }
2542       }
2543       break;
2544
2545       case 'z':
2546       {
2547         int temp;
2548
2549         temp = atoi(optarg);
2550         if (temp > 0)
2551           config_write_jitter = temp;
2552         else
2553         {
2554           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2555           status = 2;
2556         }
2557
2558         break;
2559       }
2560
2561       case 'B':
2562         config_write_base_only = 1;
2563         break;
2564
2565       case 'b':
2566       {
2567         size_t len;
2568         char base_realpath[PATH_MAX];
2569
2570         if (config_base_dir != NULL)
2571           free (config_base_dir);
2572         config_base_dir = strdup (optarg);
2573         if (config_base_dir == NULL)
2574         {
2575           fprintf (stderr, "read_options: strdup failed.\n");
2576           return (3);
2577         }
2578
2579         /* make sure that the base directory is not resolved via
2580          * symbolic links.  this makes some performance-enhancing
2581          * assumptions possible (we don't have to resolve paths
2582          * that start with a "/")
2583          */
2584         if (realpath(config_base_dir, base_realpath) == NULL)
2585         {
2586           fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2587           return 5;
2588         }
2589         else if (strncmp(config_base_dir,
2590                          base_realpath, sizeof(base_realpath)) != 0)
2591         {
2592           fprintf(stderr,
2593                   "Base directory (-b) resolved via file system links!\n"
2594                   "Please consult rrdcached '-b' documentation!\n"
2595                   "Consider specifying the real directory (%s)\n",
2596                   base_realpath);
2597           return 5;
2598         }
2599
2600         len = strlen (config_base_dir);
2601         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2602         {
2603           config_base_dir[len - 1] = 0;
2604           len--;
2605         }
2606
2607         if (len < 1)
2608         {
2609           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2610           return (4);
2611         }
2612
2613         _config_base_dir_len = len;
2614       }
2615       break;
2616
2617       case 'p':
2618       {
2619         if (config_pid_file != NULL)
2620           free (config_pid_file);
2621         config_pid_file = strdup (optarg);
2622         if (config_pid_file == NULL)
2623         {
2624           fprintf (stderr, "read_options: strdup failed.\n");
2625           return (3);
2626         }
2627       }
2628       break;
2629
2630       case 'F':
2631         config_flush_at_shutdown = 1;
2632         break;
2633
2634       case 'j':
2635       {
2636         struct stat statbuf;
2637         const char *dir = optarg;
2638
2639         status = stat(dir, &statbuf);
2640         if (status != 0)
2641         {
2642           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2643           return 6;
2644         }
2645
2646         if (!S_ISDIR(statbuf.st_mode)
2647             || access(dir, R_OK|W_OK|X_OK) != 0)
2648         {
2649           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2650                   errno ? rrd_strerror(errno) : "");
2651           return 6;
2652         }
2653
2654         journal_cur = malloc(PATH_MAX + 1);
2655         journal_old = malloc(PATH_MAX + 1);
2656         if (journal_cur == NULL || journal_old == NULL)
2657         {
2658           fprintf(stderr, "malloc failure for journal files\n");
2659           return 6;
2660         }
2661         else 
2662         {
2663           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2664           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2665         }
2666       }
2667       break;
2668
2669       case 'h':
2670       case '?':
2671         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2672             "\n"
2673             "Usage: rrdcached [options]\n"
2674             "\n"
2675             "Valid options are:\n"
2676             "  -l <address>  Socket address to listen to.\n"
2677             "  -L <address>  Socket address to listen to ('FLUSH' only).\n"
2678             "  -w <seconds>  Interval in which to write data.\n"
2679             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2680             "  -f <seconds>  Interval in which to flush dead data.\n"
2681             "  -p <file>     Location of the PID-file.\n"
2682             "  -b <dir>      Base directory to change to.\n"
2683             "  -B            Restrict file access to paths within -b <dir>\n"
2684             "  -g            Do not fork and run in the foreground.\n"
2685             "  -j <dir>      Directory in which to create the journal files.\n"
2686             "  -F            Always flush all updates at shutdown\n"
2687             "\n"
2688             "For more information and a detailed description of all options "
2689             "please refer\n"
2690             "to the rrdcached(1) manual page.\n",
2691             VERSION);
2692         status = -1;
2693         break;
2694     } /* switch (option) */
2695   } /* while (getopt) */
2696
2697   /* advise the user when values are not sane */
2698   if (config_flush_interval < 2 * config_write_interval)
2699     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2700             " 2x write interval (-w) !\n");
2701   if (config_write_jitter > config_write_interval)
2702     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2703             " write interval (-w) !\n");
2704
2705   if (config_write_base_only && config_base_dir == NULL)
2706     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2707             "  Consult the rrdcached documentation\n");
2708
2709   if (journal_cur == NULL)
2710     config_flush_at_shutdown = 1;
2711
2712   return (status);
2713 } /* }}} int read_options */
2714
2715 int main (int argc, char **argv)
2716 {
2717   int status;
2718
2719   status = read_options (argc, argv);
2720   if (status != 0)
2721   {
2722     if (status < 0)
2723       status = 0;
2724     return (status);
2725   }
2726
2727   status = daemonize ();
2728   if (status != 0)
2729   {
2730     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
2731     return (1);
2732   }
2733
2734   journal_init();
2735
2736   /* start the queue thread */
2737   memset (&queue_thread, 0, sizeof (queue_thread));
2738   status = pthread_create (&queue_thread,
2739                            NULL, /* attr */
2740                            queue_thread_main,
2741                            NULL); /* args */
2742   if (status != 0)
2743   {
2744     RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2745     cleanup();
2746     return (1);
2747   }
2748
2749   listen_thread_main (NULL);
2750   cleanup ();
2751
2752   return (0);
2753 } /* int main */
2754
2755 /*
2756  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2757  */