rrdcached: examine the current queue with the "QUEUE" command
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
73
74 #include "rrd.h"
75 #include "rrd_client.h"
76
77 #include <stdlib.h>
78 #include <stdint.h>
79 #include <stdio.h>
80 #include <unistd.h>
81 #include <string.h>
82 #include <strings.h>
83 #include <stdint.h>
84 #include <inttypes.h>
85
86 #include <sys/types.h>
87 #include <sys/stat.h>
88 #include <fcntl.h>
89 #include <signal.h>
90 #include <sys/socket.h>
91 #include <sys/un.h>
92 #include <netdb.h>
93 #include <poll.h>
94 #include <syslog.h>
95 #include <pthread.h>
96 #include <errno.h>
97 #include <assert.h>
98 #include <sys/time.h>
99 #include <time.h>
100
101 #include <glib-2.0/glib.h>
102 /* }}} */
103
104 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
105
106 #ifndef __GNUC__
107 # define __attribute__(x) /**/
108 #endif
109
110 /*
111  * Types
112  */
113 typedef enum
114 {
115   PRIV_LOW,
116   PRIV_HIGH
117 } socket_privilege;
118
119 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
120
121 struct listen_socket_s
122 {
123   int fd;
124   char addr[PATH_MAX + 1];
125   int family;
126   socket_privilege privilege;
127
128   /* state for BATCH processing */
129   time_t batch_start;
130   int batch_cmd;
131
132   /* buffered IO */
133   char *rbuf;
134   off_t next_cmd;
135   off_t next_read;
136
137   char *wbuf;
138   ssize_t wbuf_len;
139 };
140 typedef struct listen_socket_s listen_socket_t;
141
142 struct cache_item_s;
143 typedef struct cache_item_s cache_item_t;
144 struct cache_item_s
145 {
146   char *file;
147   char **values;
148   int values_num;
149   time_t last_flush_time;
150   time_t last_update_stamp;
151 #define CI_FLAGS_IN_TREE  (1<<0)
152 #define CI_FLAGS_IN_QUEUE (1<<1)
153   int flags;
154   pthread_cond_t  flushed;
155   cache_item_t *prev;
156   cache_item_t *next;
157 };
158
159 struct callback_flush_data_s
160 {
161   time_t now;
162   time_t abs_timeout;
163   char **keys;
164   size_t keys_num;
165 };
166 typedef struct callback_flush_data_s callback_flush_data_t;
167
168 enum queue_side_e
169 {
170   HEAD,
171   TAIL
172 };
173 typedef enum queue_side_e queue_side_t;
174
175 /* max length of socket command or response */
176 #define CMD_MAX 4096
177 #define RBUF_SIZE (CMD_MAX*2)
178
179 /*
180  * Variables
181  */
182 static int stay_foreground = 0;
183 static uid_t daemon_uid;
184
185 static listen_socket_t *listen_fds = NULL;
186 static size_t listen_fds_num = 0;
187
188 static int do_shutdown = 0;
189
190 static pthread_t queue_thread;
191
192 static pthread_t *connection_threads = NULL;
193 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
194 static int connection_threads_num = 0;
195
196 /* Cache stuff */
197 static GTree          *cache_tree = NULL;
198 static cache_item_t   *cache_queue_head = NULL;
199 static cache_item_t   *cache_queue_tail = NULL;
200 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
201 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
202
203 static int config_write_interval = 300;
204 static int config_write_jitter   = 0;
205 static int config_flush_interval = 3600;
206 static int config_flush_at_shutdown = 0;
207 static char *config_pid_file = NULL;
208 static char *config_base_dir = NULL;
209 static size_t _config_base_dir_len = 0;
210 static int config_write_base_only = 0;
211
212 static listen_socket_t **config_listen_address_list = NULL;
213 static int config_listen_address_list_len = 0;
214
215 static uint64_t stats_queue_length = 0;
216 static uint64_t stats_updates_received = 0;
217 static uint64_t stats_flush_received = 0;
218 static uint64_t stats_updates_written = 0;
219 static uint64_t stats_data_sets_written = 0;
220 static uint64_t stats_journal_bytes = 0;
221 static uint64_t stats_journal_rotate = 0;
222 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
223
224 /* Journaled updates */
225 static char *journal_cur = NULL;
226 static char *journal_old = NULL;
227 static FILE *journal_fh = NULL;
228 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
229 static int journal_write(char *cmd, char *args);
230 static void journal_done(void);
231 static void journal_rotate(void);
232
233 /* 
234  * Functions
235  */
236 static void sig_common (const char *sig) /* {{{ */
237 {
238   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
239   do_shutdown++;
240   pthread_cond_broadcast(&cache_cond);
241 } /* }}} void sig_common */
242
243 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
244 {
245   sig_common("INT");
246 } /* }}} void sig_int_handler */
247
248 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
249 {
250   sig_common("TERM");
251 } /* }}} void sig_term_handler */
252
253 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
254 {
255   config_flush_at_shutdown = 1;
256   sig_common("USR1");
257 } /* }}} void sig_usr1_handler */
258
259 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
260 {
261   config_flush_at_shutdown = 0;
262   sig_common("USR2");
263 } /* }}} void sig_usr2_handler */
264
265 static void install_signal_handlers(void) /* {{{ */
266 {
267   /* These structures are static, because `sigaction' behaves weird if the are
268    * overwritten.. */
269   static struct sigaction sa_int;
270   static struct sigaction sa_term;
271   static struct sigaction sa_pipe;
272   static struct sigaction sa_usr1;
273   static struct sigaction sa_usr2;
274
275   /* Install signal handlers */
276   memset (&sa_int, 0, sizeof (sa_int));
277   sa_int.sa_handler = sig_int_handler;
278   sigaction (SIGINT, &sa_int, NULL);
279
280   memset (&sa_term, 0, sizeof (sa_term));
281   sa_term.sa_handler = sig_term_handler;
282   sigaction (SIGTERM, &sa_term, NULL);
283
284   memset (&sa_pipe, 0, sizeof (sa_pipe));
285   sa_pipe.sa_handler = SIG_IGN;
286   sigaction (SIGPIPE, &sa_pipe, NULL);
287
288   memset (&sa_pipe, 0, sizeof (sa_usr1));
289   sa_usr1.sa_handler = sig_usr1_handler;
290   sigaction (SIGUSR1, &sa_usr1, NULL);
291
292   memset (&sa_usr2, 0, sizeof (sa_usr2));
293   sa_usr2.sa_handler = sig_usr2_handler;
294   sigaction (SIGUSR2, &sa_usr2, NULL);
295
296 } /* }}} void install_signal_handlers */
297
298 static int open_pidfile(char *action, int oflag) /* {{{ */
299 {
300   int fd;
301   char *file;
302
303   file = (config_pid_file != NULL)
304     ? config_pid_file
305     : LOCALSTATEDIR "/run/rrdcached.pid";
306
307   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
308   if (fd < 0)
309     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
310             action, file, rrd_strerror(errno));
311
312   return(fd);
313 } /* }}} static int open_pidfile */
314
315 /* check existing pid file to see whether a daemon is running */
316 static int check_pidfile(void)
317 {
318   int pid_fd;
319   pid_t pid;
320   char pid_str[16];
321
322   pid_fd = open_pidfile("open", O_RDWR);
323   if (pid_fd < 0)
324     return pid_fd;
325
326   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
327     return -1;
328
329   pid = atoi(pid_str);
330   if (pid <= 0)
331     return -1;
332
333   /* another running process that we can signal COULD be
334    * a competing rrdcached */
335   if (pid != getpid() && kill(pid, 0) == 0)
336   {
337     fprintf(stderr,
338             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
339     close(pid_fd);
340     return -1;
341   }
342
343   lseek(pid_fd, 0, SEEK_SET);
344   ftruncate(pid_fd, 0);
345
346   fprintf(stderr,
347           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
348           "rrdcached: starting normally.\n", pid);
349
350   return pid_fd;
351 } /* }}} static int check_pidfile */
352
353 static int write_pidfile (int fd) /* {{{ */
354 {
355   pid_t pid;
356   FILE *fh;
357
358   pid = getpid ();
359
360   fh = fdopen (fd, "w");
361   if (fh == NULL)
362   {
363     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
364     close(fd);
365     return (-1);
366   }
367
368   fprintf (fh, "%i\n", (int) pid);
369   fclose (fh);
370
371   return (0);
372 } /* }}} int write_pidfile */
373
374 static int remove_pidfile (void) /* {{{ */
375 {
376   char *file;
377   int status;
378
379   file = (config_pid_file != NULL)
380     ? config_pid_file
381     : LOCALSTATEDIR "/run/rrdcached.pid";
382
383   status = unlink (file);
384   if (status == 0)
385     return (0);
386   return (errno);
387 } /* }}} int remove_pidfile */
388
389 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
390 {
391   char *eol;
392
393   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
394                sock->next_read - sock->next_cmd);
395
396   if (eol == NULL)
397   {
398     /* no commands left, move remainder back to front of rbuf */
399     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
400             sock->next_read - sock->next_cmd);
401     sock->next_read -= sock->next_cmd;
402     sock->next_cmd = 0;
403     *len = 0;
404     return NULL;
405   }
406   else
407   {
408     char *cmd = sock->rbuf + sock->next_cmd;
409     *eol = '\0';
410
411     sock->next_cmd = eol - sock->rbuf + 1;
412
413     if (eol > sock->rbuf && *(eol-1) == '\r')
414       *(--eol) = '\0'; /* handle "\r\n" EOL */
415
416     *len = eol - cmd;
417
418     return cmd;
419   }
420
421   /* NOTREACHED */
422   assert(1==0);
423 }
424
425 /* add the characters directly to the write buffer */
426 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
427 {
428   char *new_buf;
429
430   assert(sock != NULL);
431
432   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
433   if (new_buf == NULL)
434   {
435     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
436     return -1;
437   }
438
439   strncpy(new_buf + sock->wbuf_len, str, len + 1);
440
441   sock->wbuf = new_buf;
442   sock->wbuf_len += len;
443
444   return 0;
445 } /* }}} static int add_to_wbuf */
446
447 /* add the text to the "extra" info that's sent after the status line */
448 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
449 {
450   va_list argp;
451   char buffer[CMD_MAX];
452   int len;
453
454   if (sock == NULL) return 0; /* journal replay mode */
455   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
456
457   va_start(argp, fmt);
458 #ifdef HAVE_VSNPRINTF
459   len = vsnprintf(buffer, sizeof(buffer)-1, fmt, argp);
460 #else
461   len = vsprintf(buffer, fmt, argp);
462 #endif
463   va_end(argp);
464   if (len < 0)
465   {
466     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
467     return -1;
468   }
469
470   return add_to_wbuf(sock, buffer, len);
471 } /* }}} static int add_response_info */
472
473 static int count_lines(char *str) /* {{{ */
474 {
475   int lines = 0;
476
477   if (str != NULL)
478   {
479     while ((str = strchr(str, '\n')) != NULL)
480     {
481       ++lines;
482       ++str;
483     }
484   }
485
486   return lines;
487 } /* }}} static int count_lines */
488
489 /* send the response back to the user.
490  * returns 0 on success, -1 on error
491  * write buffer is always zeroed after this call */
492 static int send_response (listen_socket_t *sock, response_code rc,
493                           char *fmt, ...) /* {{{ */
494 {
495   va_list argp;
496   char buffer[CMD_MAX];
497   int lines;
498   ssize_t wrote;
499   int rclen, len;
500
501   if (sock == NULL) return rc;  /* journal replay mode */
502
503   if (sock->batch_start)
504   {
505     if (rc == RESP_OK)
506       return rc; /* no response on success during BATCH */
507     lines = sock->batch_cmd;
508   }
509   else if (rc == RESP_OK)
510     lines = count_lines(sock->wbuf);
511   else
512     lines = -1;
513
514   rclen = sprintf(buffer, "%d ", lines);
515   va_start(argp, fmt);
516 #ifdef HAVE_VSNPRINTF
517   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen-1, fmt, argp);
518 #else
519   len = vsprintf(buffer+rclen, fmt, argp);
520 #endif
521   va_end(argp);
522   if (len < 0)
523     return -1;
524
525   len += rclen;
526
527   /* append the result to the wbuf, don't write to the user */
528   if (sock->batch_start)
529     return add_to_wbuf(sock, buffer, len);
530
531   /* first write must be complete */
532   if (len != write(sock->fd, buffer, len))
533   {
534     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
535     return -1;
536   }
537
538   if (sock->wbuf != NULL && rc == RESP_OK)
539   {
540     wrote = 0;
541     while (wrote < sock->wbuf_len)
542     {
543       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
544       if (wb <= 0)
545       {
546         RRDD_LOG(LOG_INFO, "send_response: could not write results");
547         return -1;
548       }
549       wrote += wb;
550     }
551   }
552
553   free(sock->wbuf); sock->wbuf = NULL;
554   sock->wbuf_len = 0;
555
556   return 0;
557 } /* }}} */
558
559 static void wipe_ci_values(cache_item_t *ci, time_t when)
560 {
561   ci->values = NULL;
562   ci->values_num = 0;
563
564   ci->last_flush_time = when;
565   if (config_write_jitter > 0)
566     ci->last_flush_time += (random() % config_write_jitter);
567 }
568
569 /* remove_from_queue
570  * remove a "cache_item_t" item from the queue.
571  * must hold 'cache_lock' when calling this
572  */
573 static void remove_from_queue(cache_item_t *ci) /* {{{ */
574 {
575   if (ci == NULL) return;
576   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
577
578   if (ci->prev == NULL)
579     cache_queue_head = ci->next; /* reset head */
580   else
581     ci->prev->next = ci->next;
582
583   if (ci->next == NULL)
584     cache_queue_tail = ci->prev; /* reset the tail */
585   else
586     ci->next->prev = ci->prev;
587
588   ci->next = ci->prev = NULL;
589   ci->flags &= ~CI_FLAGS_IN_QUEUE;
590 } /* }}} static void remove_from_queue */
591
592 /* free the resources associated with the cache_item_t
593  * must hold cache_lock when calling this function
594  */
595 static void *free_cache_item(cache_item_t *ci) /* {{{ */
596 {
597   if (ci == NULL) return NULL;
598
599   remove_from_queue(ci);
600
601   for (int i=0; i < ci->values_num; i++)
602     free(ci->values[i]);
603
604   free (ci->values);
605   free (ci->file);
606
607   /* in case anyone is waiting */
608   pthread_cond_broadcast(&ci->flushed);
609
610   free (ci);
611
612   return NULL;
613 } /* }}} static void *free_cache_item */
614
615 /*
616  * enqueue_cache_item:
617  * `cache_lock' must be acquired before calling this function!
618  */
619 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
620     queue_side_t side)
621 {
622   if (ci == NULL)
623     return (-1);
624
625   if (ci->values_num == 0)
626     return (0);
627
628   if (side == HEAD)
629   {
630     if (cache_queue_head == ci)
631       return 0;
632
633     /* remove if further down in queue */
634     remove_from_queue(ci);
635
636     ci->prev = NULL;
637     ci->next = cache_queue_head;
638     if (ci->next != NULL)
639       ci->next->prev = ci;
640     cache_queue_head = ci;
641
642     if (cache_queue_tail == NULL)
643       cache_queue_tail = cache_queue_head;
644   }
645   else /* (side == TAIL) */
646   {
647     /* We don't move values back in the list.. */
648     if (ci->flags & CI_FLAGS_IN_QUEUE)
649       return (0);
650
651     assert (ci->next == NULL);
652     assert (ci->prev == NULL);
653
654     ci->prev = cache_queue_tail;
655
656     if (cache_queue_tail == NULL)
657       cache_queue_head = ci;
658     else
659       cache_queue_tail->next = ci;
660
661     cache_queue_tail = ci;
662   }
663
664   ci->flags |= CI_FLAGS_IN_QUEUE;
665
666   pthread_cond_broadcast(&cache_cond);
667   pthread_mutex_lock (&stats_lock);
668   stats_queue_length++;
669   pthread_mutex_unlock (&stats_lock);
670
671   return (0);
672 } /* }}} int enqueue_cache_item */
673
674 /*
675  * tree_callback_flush:
676  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
677  * while this is in progress.
678  */
679 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
680     gpointer data)
681 {
682   cache_item_t *ci;
683   callback_flush_data_t *cfd;
684
685   ci = (cache_item_t *) value;
686   cfd = (callback_flush_data_t *) data;
687
688   if (ci->flags & CI_FLAGS_IN_QUEUE)
689     return FALSE;
690
691   if ((ci->last_flush_time <= cfd->abs_timeout)
692       && (ci->values_num > 0))
693   {
694     enqueue_cache_item (ci, TAIL);
695   }
696   else if ((do_shutdown != 0)
697       && (ci->values_num > 0))
698   {
699     enqueue_cache_item (ci, TAIL);
700   }
701   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
702       && (ci->values_num <= 0))
703   {
704     char **temp;
705
706     temp = (char **) rrd_realloc (cfd->keys,
707         sizeof (char *) * (cfd->keys_num + 1));
708     if (temp == NULL)
709     {
710       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
711       return (FALSE);
712     }
713     cfd->keys = temp;
714     /* Make really sure this points to the _same_ place */
715     assert ((char *) key == ci->file);
716     cfd->keys[cfd->keys_num] = (char *) key;
717     cfd->keys_num++;
718   }
719
720   return (FALSE);
721 } /* }}} gboolean tree_callback_flush */
722
723 static int flush_old_values (int max_age)
724 {
725   callback_flush_data_t cfd;
726   size_t k;
727
728   memset (&cfd, 0, sizeof (cfd));
729   /* Pass the current time as user data so that we don't need to call
730    * `time' for each node. */
731   cfd.now = time (NULL);
732   cfd.keys = NULL;
733   cfd.keys_num = 0;
734
735   if (max_age > 0)
736     cfd.abs_timeout = cfd.now - max_age;
737   else
738     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
739
740   /* `tree_callback_flush' will return the keys of all values that haven't
741    * been touched in the last `config_flush_interval' seconds in `cfd'.
742    * The char*'s in this array point to the same memory as ci->file, so we
743    * don't need to free them separately. */
744   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
745
746   for (k = 0; k < cfd.keys_num; k++)
747   {
748     /* should never fail, since we have held the cache_lock
749      * the entire time */
750     assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
751   }
752
753   if (cfd.keys != NULL)
754   {
755     free (cfd.keys);
756     cfd.keys = NULL;
757   }
758
759   return (0);
760 } /* int flush_old_values */
761
762 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
763 {
764   struct timeval now;
765   struct timespec next_flush;
766   int final_flush = 0; /* make sure we only flush once on shutdown */
767
768   gettimeofday (&now, NULL);
769   next_flush.tv_sec = now.tv_sec + config_flush_interval;
770   next_flush.tv_nsec = 1000 * now.tv_usec;
771
772   pthread_mutex_lock (&cache_lock);
773   while ((do_shutdown == 0) || (cache_queue_head != NULL))
774   {
775     cache_item_t *ci;
776     char *file;
777     char **values;
778     int values_num;
779     int status;
780     int i;
781
782     /* First, check if it's time to do the cache flush. */
783     gettimeofday (&now, NULL);
784     if ((now.tv_sec > next_flush.tv_sec)
785         || ((now.tv_sec == next_flush.tv_sec)
786           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
787     {
788       /* Flush all values that haven't been written in the last
789        * `config_write_interval' seconds. */
790       flush_old_values (config_write_interval);
791
792       /* Determine the time of the next cache flush. */
793       next_flush.tv_sec =
794         now.tv_sec + next_flush.tv_sec % config_flush_interval;
795
796       /* unlock the cache while we rotate so we don't block incoming
797        * updates if the fsync() blocks on disk I/O */
798       pthread_mutex_unlock(&cache_lock);
799       journal_rotate();
800       pthread_mutex_lock(&cache_lock);
801     }
802
803     /* Now, check if there's something to store away. If not, wait until
804      * something comes in or it's time to do the cache flush.  if we are
805      * shutting down, do not wait around.  */
806     if (cache_queue_head == NULL && !do_shutdown)
807     {
808       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
809       if ((status != 0) && (status != ETIMEDOUT))
810       {
811         RRDD_LOG (LOG_ERR, "queue_thread_main: "
812             "pthread_cond_timedwait returned %i.", status);
813       }
814     }
815
816     /* We're about to shut down */
817     if (do_shutdown != 0 && !final_flush++)
818     {
819       if (config_flush_at_shutdown)
820         flush_old_values (-1); /* flush everything */
821       else
822         break;
823     }
824
825     /* Check if a value has arrived. This may be NULL if we timed out or there
826      * was an interrupt such as a signal. */
827     if (cache_queue_head == NULL)
828       continue;
829
830     ci = cache_queue_head;
831
832     /* copy the relevant parts */
833     file = strdup (ci->file);
834     if (file == NULL)
835     {
836       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
837       continue;
838     }
839
840     assert(ci->values != NULL);
841     assert(ci->values_num > 0);
842
843     values = ci->values;
844     values_num = ci->values_num;
845
846     wipe_ci_values(ci, time(NULL));
847     remove_from_queue(ci);
848
849     pthread_mutex_lock (&stats_lock);
850     assert (stats_queue_length > 0);
851     stats_queue_length--;
852     pthread_mutex_unlock (&stats_lock);
853
854     pthread_mutex_unlock (&cache_lock);
855
856     rrd_clear_error ();
857     status = rrd_update_r (file, NULL, values_num, (void *) values);
858     if (status != 0)
859     {
860       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
861           "rrd_update_r (%s) failed with status %i. (%s)",
862           file, status, rrd_get_error());
863     }
864
865     journal_write("wrote", file);
866     pthread_cond_broadcast(&ci->flushed);
867
868     for (i = 0; i < values_num; i++)
869       free (values[i]);
870
871     free(values);
872     free(file);
873
874     if (status == 0)
875     {
876       pthread_mutex_lock (&stats_lock);
877       stats_updates_written++;
878       stats_data_sets_written += values_num;
879       pthread_mutex_unlock (&stats_lock);
880     }
881
882     pthread_mutex_lock (&cache_lock);
883
884     /* We're about to shut down */
885     if (do_shutdown != 0 && !final_flush++)
886     {
887       if (config_flush_at_shutdown)
888           flush_old_values (-1); /* flush everything */
889       else
890         break;
891     }
892   } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
893   pthread_mutex_unlock (&cache_lock);
894
895   if (config_flush_at_shutdown)
896   {
897     assert(cache_queue_head == NULL);
898     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
899   }
900
901   journal_done();
902
903   return (NULL);
904 } /* }}} void *queue_thread_main */
905
906 static int buffer_get_field (char **buffer_ret, /* {{{ */
907     size_t *buffer_size_ret, char **field_ret)
908 {
909   char *buffer;
910   size_t buffer_pos;
911   size_t buffer_size;
912   char *field;
913   size_t field_size;
914   int status;
915
916   buffer = *buffer_ret;
917   buffer_pos = 0;
918   buffer_size = *buffer_size_ret;
919   field = *buffer_ret;
920   field_size = 0;
921
922   if (buffer_size <= 0)
923     return (-1);
924
925   /* This is ensured by `handle_request'. */
926   assert (buffer[buffer_size - 1] == '\0');
927
928   status = -1;
929   while (buffer_pos < buffer_size)
930   {
931     /* Check for end-of-field or end-of-buffer */
932     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
933     {
934       field[field_size] = 0;
935       field_size++;
936       buffer_pos++;
937       status = 0;
938       break;
939     }
940     /* Handle escaped characters. */
941     else if (buffer[buffer_pos] == '\\')
942     {
943       if (buffer_pos >= (buffer_size - 1))
944         break;
945       buffer_pos++;
946       field[field_size] = buffer[buffer_pos];
947       field_size++;
948       buffer_pos++;
949     }
950     /* Normal operation */ 
951     else
952     {
953       field[field_size] = buffer[buffer_pos];
954       field_size++;
955       buffer_pos++;
956     }
957   } /* while (buffer_pos < buffer_size) */
958
959   if (status != 0)
960     return (status);
961
962   *buffer_ret = buffer + buffer_pos;
963   *buffer_size_ret = buffer_size - buffer_pos;
964   *field_ret = field;
965
966   return (0);
967 } /* }}} int buffer_get_field */
968
969 /* if we're restricting writes to the base directory,
970  * check whether the file falls within the dir
971  * returns 1 if OK, otherwise 0
972  */
973 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
974 {
975   assert(file != NULL);
976
977   if (!config_write_base_only
978       || sock == NULL /* journal replay */
979       || config_base_dir == NULL)
980     return 1;
981
982   if (strstr(file, "../") != NULL) goto err;
983
984   /* relative paths without "../" are ok */
985   if (*file != '/') return 1;
986
987   /* file must be of the format base + "/" + <1+ char filename> */
988   if (strlen(file) < _config_base_dir_len + 2) goto err;
989   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
990   if (*(file + _config_base_dir_len) != '/') goto err;
991
992   return 1;
993
994 err:
995   if (sock != NULL && sock->fd >= 0)
996     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
997
998   return 0;
999 } /* }}} static int check_file_access */
1000
1001 /* when using a base dir, convert relative paths to absolute paths.
1002  * if necessary, modifies the "filename" pointer to point
1003  * to the new path created in "tmp".  "tmp" is provided
1004  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1005  *
1006  * this allows us to optimize for the expected case (absolute path)
1007  * with a no-op.
1008  */
1009 static void get_abs_path(char **filename, char *tmp)
1010 {
1011   assert(tmp != NULL);
1012   assert(filename != NULL && *filename != NULL);
1013
1014   if (config_base_dir == NULL || **filename == '/')
1015     return;
1016
1017   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1018   *filename = tmp;
1019 } /* }}} static int get_abs_path */
1020
1021 /* returns 1 if we have the required privilege level,
1022  * otherwise issue an error to the user on sock */
1023 static int has_privilege (listen_socket_t *sock, /* {{{ */
1024                           socket_privilege priv)
1025 {
1026   if (sock == NULL) /* journal replay */
1027     return 1;
1028
1029   if (sock->privilege >= priv)
1030     return 1;
1031
1032   return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1033 } /* }}} static int has_privilege */
1034
1035 static int flush_file (const char *filename) /* {{{ */
1036 {
1037   cache_item_t *ci;
1038
1039   pthread_mutex_lock (&cache_lock);
1040
1041   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1042   if (ci == NULL)
1043   {
1044     pthread_mutex_unlock (&cache_lock);
1045     return (ENOENT);
1046   }
1047
1048   if (ci->values_num > 0)
1049   {
1050     /* Enqueue at head */
1051     enqueue_cache_item (ci, HEAD);
1052     pthread_cond_wait(&ci->flushed, &cache_lock);
1053   }
1054
1055   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1056    * may have been purged during our cond_wait() */
1057
1058   pthread_mutex_unlock(&cache_lock);
1059
1060   return (0);
1061 } /* }}} int flush_file */
1062
1063 static int handle_request_help (listen_socket_t *sock, /* {{{ */
1064     char *buffer, size_t buffer_size)
1065 {
1066   int status;
1067   char **help_text;
1068   char *command;
1069
1070   char *help_help[2] =
1071   {
1072     "Command overview\n"
1073     ,
1074     "HELP [<command>]\n"
1075     "FLUSH <filename>\n"
1076     "FLUSHALL\n"
1077     "PENDING <filename>\n"
1078     "FORGET <filename>\n"
1079     "QUEUE\n"
1080     "UPDATE <filename> <values> [<values> ...]\n"
1081     "BATCH\n"
1082     "STATS\n"
1083     "QUIT\n"
1084   };
1085
1086   char *help_flush[2] =
1087   {
1088     "Help for FLUSH\n"
1089     ,
1090     "Usage: FLUSH <filename>\n"
1091     "\n"
1092     "Adds the given filename to the head of the update queue and returns\n"
1093     "after is has been dequeued.\n"
1094   };
1095
1096   char *help_flushall[2] =
1097   {
1098     "Help for FLUSHALL\n"
1099     ,
1100     "Usage: FLUSHALL\n"
1101     "\n"
1102     "Triggers writing of all pending updates.  Returns immediately.\n"
1103   };
1104
1105   char *help_pending[2] =
1106   {
1107     "Help for PENDING\n"
1108     ,
1109     "Usage: PENDING <filename>\n"
1110     "\n"
1111     "Shows any 'pending' updates for a file, in order.\n"
1112     "The updates shown have not yet been written to the underlying RRD file.\n"
1113   };
1114
1115   char *help_forget[2] =
1116   {
1117     "Help for FORGET\n"
1118     ,
1119     "Usage: FORGET <filename>\n"
1120     "\n"
1121     "Removes the file completely from the cache.\n"
1122     "Any pending updates for the file will be lost.\n"
1123   };
1124
1125   char *help_queue[2] =
1126   {
1127     "Help for QUEUE\n"
1128     ,
1129     "Shows all files in the output queue.\n"
1130     "The output is zero or more lines in the following format:\n"
1131     "(where <num_vals> is the number of values to be written)\n"
1132     "\n"
1133     "<num_vals> <filename>\n"
1134     "\n"
1135   };
1136
1137   char *help_update[2] =
1138   {
1139     "Help for UPDATE\n"
1140     ,
1141     "Usage: UPDATE <filename> <values> [<values> ...]\n"
1142     "\n"
1143     "Adds the given file to the internal cache if it is not yet known and\n"
1144     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1145     "for details.\n"
1146     "\n"
1147     "Each <values> has the following form:\n"
1148     "  <values> = <time>:<value>[:<value>[...]]\n"
1149     "See the rrdupdate(1) manpage for details.\n"
1150   };
1151
1152   char *help_stats[2] =
1153   {
1154     "Help for STATS\n"
1155     ,
1156     "Usage: STATS\n"
1157     "\n"
1158     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1159     "a description of the values.\n"
1160   };
1161
1162   char *help_batch[2] =
1163   {
1164     "Help for BATCH\n"
1165     ,
1166     "The 'BATCH' command permits the client to initiate a bulk load\n"
1167     "   of commands to rrdcached.\n"
1168     "\n"
1169     "Usage:\n"
1170     "\n"
1171     "    client: BATCH\n"
1172     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1173     "    client: command #1\n"
1174     "    client: command #2\n"
1175     "    client: ... and so on\n"
1176     "    client: .\n"
1177     "    server: 2 errors\n"
1178     "    server: 7 message for command #7\n"
1179     "    server: 9 message for command #9\n"
1180     "\n"
1181     "For more information, consult the rrdcached(1) documentation.\n"
1182   };
1183
1184   char *help_quit[2] =
1185   {
1186     "Help for QUIT\n"
1187     ,
1188     "Disconnect from rrdcached.\n"
1189   };
1190
1191   status = buffer_get_field (&buffer, &buffer_size, &command);
1192   if (status != 0)
1193     help_text = help_help;
1194   else
1195   {
1196     if (strcasecmp (command, "update") == 0)
1197       help_text = help_update;
1198     else if (strcasecmp (command, "flush") == 0)
1199       help_text = help_flush;
1200     else if (strcasecmp (command, "flushall") == 0)
1201       help_text = help_flushall;
1202     else if (strcasecmp (command, "pending") == 0)
1203       help_text = help_pending;
1204     else if (strcasecmp (command, "forget") == 0)
1205       help_text = help_forget;
1206     else if (strcasecmp (command, "queue") == 0)
1207       help_text = help_queue;
1208     else if (strcasecmp (command, "stats") == 0)
1209       help_text = help_stats;
1210     else if (strcasecmp (command, "batch") == 0)
1211       help_text = help_batch;
1212     else if (strcasecmp (command, "quit") == 0)
1213       help_text = help_quit;
1214     else
1215       help_text = help_help;
1216   }
1217
1218   add_response_info(sock, help_text[1]);
1219   return send_response(sock, RESP_OK, help_text[0]);
1220 } /* }}} int handle_request_help */
1221
1222 static int handle_request_stats (listen_socket_t *sock) /* {{{ */
1223 {
1224   uint64_t copy_queue_length;
1225   uint64_t copy_updates_received;
1226   uint64_t copy_flush_received;
1227   uint64_t copy_updates_written;
1228   uint64_t copy_data_sets_written;
1229   uint64_t copy_journal_bytes;
1230   uint64_t copy_journal_rotate;
1231
1232   uint64_t tree_nodes_number;
1233   uint64_t tree_depth;
1234
1235   pthread_mutex_lock (&stats_lock);
1236   copy_queue_length       = stats_queue_length;
1237   copy_updates_received   = stats_updates_received;
1238   copy_flush_received     = stats_flush_received;
1239   copy_updates_written    = stats_updates_written;
1240   copy_data_sets_written  = stats_data_sets_written;
1241   copy_journal_bytes      = stats_journal_bytes;
1242   copy_journal_rotate     = stats_journal_rotate;
1243   pthread_mutex_unlock (&stats_lock);
1244
1245   pthread_mutex_lock (&cache_lock);
1246   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1247   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1248   pthread_mutex_unlock (&cache_lock);
1249
1250   add_response_info(sock,
1251                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1252   add_response_info(sock,
1253                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1254   add_response_info(sock,
1255                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1256   add_response_info(sock,
1257                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1258   add_response_info(sock,
1259                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1260   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1261   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1262   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1263   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1264
1265   send_response(sock, RESP_OK, "Statistics follow\n");
1266
1267   return (0);
1268 } /* }}} int handle_request_stats */
1269
1270 static int handle_request_flush (listen_socket_t *sock, /* {{{ */
1271     char *buffer, size_t buffer_size)
1272 {
1273   char *file, file_tmp[PATH_MAX];
1274   int status;
1275
1276   status = buffer_get_field (&buffer, &buffer_size, &file);
1277   if (status != 0)
1278   {
1279     return send_response(sock, RESP_ERR, "Usage: flush <filename>\n");
1280   }
1281   else
1282   {
1283     pthread_mutex_lock(&stats_lock);
1284     stats_flush_received++;
1285     pthread_mutex_unlock(&stats_lock);
1286
1287     get_abs_path(&file, file_tmp);
1288     if (!check_file_access(file, sock)) return 0;
1289
1290     status = flush_file (file);
1291     if (status == 0)
1292       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1293     else if (status == ENOENT)
1294     {
1295       /* no file in our tree; see whether it exists at all */
1296       struct stat statbuf;
1297
1298       memset(&statbuf, 0, sizeof(statbuf));
1299       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1300         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1301       else
1302         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1303     }
1304     else if (status < 0)
1305       return send_response(sock, RESP_ERR, "Internal error.\n");
1306     else
1307       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1308   }
1309
1310   /* NOTREACHED */
1311   assert(1==0);
1312 } /* }}} int handle_request_flush */
1313
1314 static int handle_request_flushall(listen_socket_t *sock) /* {{{ */
1315 {
1316   int status;
1317
1318   status = has_privilege(sock, PRIV_HIGH);
1319   if (status <= 0)
1320     return status;
1321
1322   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1323
1324   pthread_mutex_lock(&cache_lock);
1325   flush_old_values(-1);
1326   pthread_mutex_unlock(&cache_lock);
1327
1328   return send_response(sock, RESP_OK, "Started flush.\n");
1329 } /* }}} static int handle_request_flushall */
1330
1331 static int handle_request_pending(listen_socket_t *sock, /* {{{ */
1332                                   char *buffer, size_t buffer_size)
1333 {
1334   int status;
1335   char *file, file_tmp[PATH_MAX];
1336   cache_item_t *ci;
1337
1338   status = buffer_get_field(&buffer, &buffer_size, &file);
1339   if (status != 0)
1340     return send_response(sock, RESP_ERR,
1341                          "Usage: PENDING <filename>\n");
1342
1343   status = has_privilege(sock, PRIV_HIGH);
1344   if (status <= 0)
1345     return status;
1346
1347   get_abs_path(&file, file_tmp);
1348
1349   pthread_mutex_lock(&cache_lock);
1350   ci = g_tree_lookup(cache_tree, file);
1351   if (ci == NULL)
1352   {
1353     pthread_mutex_unlock(&cache_lock);
1354     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1355   }
1356
1357   for (int i=0; i < ci->values_num; i++)
1358     add_response_info(sock, "%s\n", ci->values[i]);
1359
1360   pthread_mutex_unlock(&cache_lock);
1361   return send_response(sock, RESP_OK, "updates pending\n");
1362 } /* }}} static int handle_request_pending */
1363
1364 static int handle_request_forget(listen_socket_t *sock, /* {{{ */
1365                                  char *buffer, size_t buffer_size)
1366 {
1367   int status;
1368   gboolean found;
1369   char *file, file_tmp[PATH_MAX];
1370
1371   status = buffer_get_field(&buffer, &buffer_size, &file);
1372   if (status != 0)
1373     return send_response(sock, RESP_ERR,
1374                          "Usage: FORGET <filename>\n");
1375
1376   status = has_privilege(sock, PRIV_HIGH);
1377   if (status <= 0)
1378     return status;
1379
1380   get_abs_path(&file, file_tmp);
1381   if (!check_file_access(file, sock)) return 0;
1382
1383   pthread_mutex_lock(&cache_lock);
1384   found = g_tree_remove(cache_tree, file);
1385   pthread_mutex_unlock(&cache_lock);
1386
1387   if (found == TRUE)
1388   {
1389     if (sock != NULL)
1390       journal_write("forget", file);
1391
1392     return send_response(sock, RESP_OK, "Gone!\n");
1393   }
1394   else
1395     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1396
1397   /* NOTREACHED */
1398   assert(1==0);
1399 } /* }}} static int handle_request_forget */
1400
1401 static int handle_request_queue (listen_socket_t *sock) /* {{{ */
1402 {
1403   cache_item_t *ci;
1404
1405   pthread_mutex_lock(&cache_lock);
1406
1407   ci = cache_queue_head;
1408   while (ci != NULL)
1409   {
1410     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1411     ci = ci->next;
1412   }
1413
1414   pthread_mutex_unlock(&cache_lock);
1415
1416   return send_response(sock, RESP_OK, "in queue.\n");
1417 } /* }}} int handle_request_queue */
1418
1419 static int handle_request_update (listen_socket_t *sock, /* {{{ */
1420                                   time_t now,
1421                                   char *buffer, size_t buffer_size)
1422 {
1423   char *file, file_tmp[PATH_MAX];
1424   int values_num = 0;
1425   int status;
1426   char orig_buf[CMD_MAX];
1427
1428   cache_item_t *ci;
1429
1430   status = has_privilege(sock, PRIV_HIGH);
1431   if (status <= 0)
1432     return status;
1433
1434   /* save it for the journal later */
1435   strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1436
1437   status = buffer_get_field (&buffer, &buffer_size, &file);
1438   if (status != 0)
1439     return send_response(sock, RESP_ERR,
1440                          "Usage: UPDATE <filename> <values> [<values> ...]\n");
1441
1442   pthread_mutex_lock(&stats_lock);
1443   stats_updates_received++;
1444   pthread_mutex_unlock(&stats_lock);
1445
1446   get_abs_path(&file, file_tmp);
1447   if (!check_file_access(file, sock)) return 0;
1448
1449   pthread_mutex_lock (&cache_lock);
1450   ci = g_tree_lookup (cache_tree, file);
1451
1452   if (ci == NULL) /* {{{ */
1453   {
1454     struct stat statbuf;
1455
1456     /* don't hold the lock while we setup; stat(2) might block */
1457     pthread_mutex_unlock(&cache_lock);
1458
1459     memset (&statbuf, 0, sizeof (statbuf));
1460     status = stat (file, &statbuf);
1461     if (status != 0)
1462     {
1463       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1464
1465       status = errno;
1466       if (status == ENOENT)
1467         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1468       else
1469         return send_response(sock, RESP_ERR,
1470                              "stat failed with error %i.\n", status);
1471     }
1472     if (!S_ISREG (statbuf.st_mode))
1473       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1474
1475     if (access(file, R_OK|W_OK) != 0)
1476       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1477                            file, rrd_strerror(errno));
1478
1479     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1480     if (ci == NULL)
1481     {
1482       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1483
1484       return send_response(sock, RESP_ERR, "malloc failed.\n");
1485     }
1486     memset (ci, 0, sizeof (cache_item_t));
1487
1488     ci->file = strdup (file);
1489     if (ci->file == NULL)
1490     {
1491       free (ci);
1492       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1493
1494       return send_response(sock, RESP_ERR, "strdup failed.\n");
1495     }
1496
1497     wipe_ci_values(ci, now);
1498     ci->flags = CI_FLAGS_IN_TREE;
1499     pthread_cond_init(&ci->flushed, NULL);
1500
1501     pthread_mutex_lock(&cache_lock);
1502     g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1503   } /* }}} */
1504   assert (ci != NULL);
1505
1506   /* don't re-write updates in replay mode */
1507   if (sock != NULL)
1508     journal_write("update", orig_buf);
1509
1510   while (buffer_size > 0)
1511   {
1512     char **temp;
1513     char *value;
1514     time_t stamp;
1515     char *eostamp;
1516
1517     status = buffer_get_field (&buffer, &buffer_size, &value);
1518     if (status != 0)
1519     {
1520       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1521       break;
1522     }
1523
1524     /* make sure update time is always moving forward */
1525     stamp = strtol(value, &eostamp, 10);
1526     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1527     {
1528       pthread_mutex_unlock(&cache_lock);
1529       return send_response(sock, RESP_ERR,
1530                            "Cannot find timestamp in '%s'!\n", value);
1531     }
1532     else if (stamp <= ci->last_update_stamp)
1533     {
1534       pthread_mutex_unlock(&cache_lock);
1535       return send_response(sock, RESP_ERR,
1536                            "illegal attempt to update using time %ld when last"
1537                            " update time is %ld (minimum one second step)\n",
1538                            stamp, ci->last_update_stamp);
1539     }
1540     else
1541       ci->last_update_stamp = stamp;
1542
1543     temp = (char **) rrd_realloc (ci->values,
1544         sizeof (char *) * (ci->values_num + 1));
1545     if (temp == NULL)
1546     {
1547       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1548       continue;
1549     }
1550     ci->values = temp;
1551
1552     ci->values[ci->values_num] = strdup (value);
1553     if (ci->values[ci->values_num] == NULL)
1554     {
1555       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1556       continue;
1557     }
1558     ci->values_num++;
1559
1560     values_num++;
1561   }
1562
1563   if (((now - ci->last_flush_time) >= config_write_interval)
1564       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1565       && (ci->values_num > 0))
1566   {
1567     enqueue_cache_item (ci, TAIL);
1568   }
1569
1570   pthread_mutex_unlock (&cache_lock);
1571
1572   if (values_num < 1)
1573     return send_response(sock, RESP_ERR, "No values updated.\n");
1574   else
1575     return send_response(sock, RESP_OK,
1576                          "errors, enqueued %i value(s).\n", values_num);
1577
1578   /* NOTREACHED */
1579   assert(1==0);
1580
1581 } /* }}} int handle_request_update */
1582
1583 /* we came across a "WROTE" entry during journal replay.
1584  * throw away any values that we have accumulated for this file
1585  */
1586 static int handle_request_wrote (const char *buffer, time_t now) /* {{{ */
1587 {
1588   int i;
1589   cache_item_t *ci;
1590   const char *file = buffer;
1591
1592   pthread_mutex_lock(&cache_lock);
1593
1594   ci = g_tree_lookup(cache_tree, file);
1595   if (ci == NULL)
1596   {
1597     pthread_mutex_unlock(&cache_lock);
1598     return (0);
1599   }
1600
1601   if (ci->values)
1602   {
1603     for (i=0; i < ci->values_num; i++)
1604       free(ci->values[i]);
1605
1606     free(ci->values);
1607   }
1608
1609   wipe_ci_values(ci, now);
1610   remove_from_queue(ci);
1611
1612   pthread_mutex_unlock(&cache_lock);
1613   return (0);
1614 } /* }}} int handle_request_wrote */
1615
1616 /* start "BATCH" processing */
1617 static int batch_start (listen_socket_t *sock) /* {{{ */
1618 {
1619   int status;
1620   if (sock->batch_start)
1621     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1622
1623   status = send_response(sock, RESP_OK,
1624                          "Go ahead.  End with dot '.' on its own line.\n");
1625   sock->batch_start = time(NULL);
1626   sock->batch_cmd = 0;
1627
1628   return status;
1629 } /* }}} static int batch_start */
1630
1631 /* finish "BATCH" processing and return results to the client */
1632 static int batch_done (listen_socket_t *sock) /* {{{ */
1633 {
1634   assert(sock->batch_start);
1635   sock->batch_start = 0;
1636   sock->batch_cmd  = 0;
1637   return send_response(sock, RESP_OK, "errors\n");
1638 } /* }}} static int batch_done */
1639
1640 /* if sock==NULL, we are in journal replay mode */
1641 static int handle_request (listen_socket_t *sock, /* {{{ */
1642                            time_t now,
1643                            char *buffer, size_t buffer_size)
1644 {
1645   char *buffer_ptr;
1646   char *command;
1647   int status;
1648
1649   assert (buffer[buffer_size - 1] == '\0');
1650
1651   buffer_ptr = buffer;
1652   command = NULL;
1653   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1654   if (status != 0)
1655   {
1656     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1657     return (-1);
1658   }
1659
1660   if (sock != NULL && sock->batch_start)
1661     sock->batch_cmd++;
1662
1663   if (strcasecmp (command, "update") == 0)
1664     return (handle_request_update (sock, now, buffer_ptr, buffer_size));
1665   else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
1666   {
1667     /* this is only valid in replay mode */
1668     return (handle_request_wrote (buffer_ptr, now));
1669   }
1670   else if (strcasecmp (command, "flush") == 0)
1671     return (handle_request_flush (sock, buffer_ptr, buffer_size));
1672   else if (strcasecmp (command, "flushall") == 0)
1673     return (handle_request_flushall(sock));
1674   else if (strcasecmp (command, "pending") == 0)
1675     return (handle_request_pending(sock, buffer_ptr, buffer_size));
1676   else if (strcasecmp (command, "forget") == 0)
1677     return (handle_request_forget(sock, buffer_ptr, buffer_size));
1678   else if (strcasecmp (command, "queue") == 0)
1679     return (handle_request_queue(sock));
1680   else if (strcasecmp (command, "stats") == 0)
1681     return (handle_request_stats (sock));
1682   else if (strcasecmp (command, "help") == 0)
1683     return (handle_request_help (sock, buffer_ptr, buffer_size));
1684   else if (strcasecmp (command, "batch") == 0 && sock != NULL)
1685     return batch_start(sock);
1686   else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_start)
1687     return batch_done(sock);
1688   else if (strcasecmp (command, "quit") == 0)
1689     return -1;
1690   else
1691     return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
1692
1693   /* NOTREACHED */
1694   assert(1==0);
1695 } /* }}} int handle_request */
1696
1697 /* MUST NOT hold journal_lock before calling this */
1698 static void journal_rotate(void) /* {{{ */
1699 {
1700   FILE *old_fh = NULL;
1701   int new_fd;
1702
1703   if (journal_cur == NULL || journal_old == NULL)
1704     return;
1705
1706   pthread_mutex_lock(&journal_lock);
1707
1708   /* we rotate this way (rename before close) so that the we can release
1709    * the journal lock as fast as possible.  Journal writes to the new
1710    * journal can proceed immediately after the new file is opened.  The
1711    * fclose can then block without affecting new updates.
1712    */
1713   if (journal_fh != NULL)
1714   {
1715     old_fh = journal_fh;
1716     journal_fh = NULL;
1717     rename(journal_cur, journal_old);
1718     ++stats_journal_rotate;
1719   }
1720
1721   new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1722                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1723   if (new_fd >= 0)
1724   {
1725     journal_fh = fdopen(new_fd, "a");
1726     if (journal_fh == NULL)
1727       close(new_fd);
1728   }
1729
1730   pthread_mutex_unlock(&journal_lock);
1731
1732   if (old_fh != NULL)
1733     fclose(old_fh);
1734
1735   if (journal_fh == NULL)
1736   {
1737     RRDD_LOG(LOG_CRIT,
1738              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1739              journal_cur, rrd_strerror(errno));
1740
1741     RRDD_LOG(LOG_ERR,
1742              "JOURNALING DISABLED: All values will be flushed at shutdown");
1743     config_flush_at_shutdown = 1;
1744   }
1745
1746 } /* }}} static void journal_rotate */
1747
1748 static void journal_done(void) /* {{{ */
1749 {
1750   if (journal_cur == NULL)
1751     return;
1752
1753   pthread_mutex_lock(&journal_lock);
1754   if (journal_fh != NULL)
1755   {
1756     fclose(journal_fh);
1757     journal_fh = NULL;
1758   }
1759
1760   if (config_flush_at_shutdown)
1761   {
1762     RRDD_LOG(LOG_INFO, "removing journals");
1763     unlink(journal_old);
1764     unlink(journal_cur);
1765   }
1766   else
1767   {
1768     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1769              "journals will be used at next startup");
1770   }
1771
1772   pthread_mutex_unlock(&journal_lock);
1773
1774 } /* }}} static void journal_done */
1775
1776 static int journal_write(char *cmd, char *args) /* {{{ */
1777 {
1778   int chars;
1779
1780   if (journal_fh == NULL)
1781     return 0;
1782
1783   pthread_mutex_lock(&journal_lock);
1784   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1785   pthread_mutex_unlock(&journal_lock);
1786
1787   if (chars > 0)
1788   {
1789     pthread_mutex_lock(&stats_lock);
1790     stats_journal_bytes += chars;
1791     pthread_mutex_unlock(&stats_lock);
1792   }
1793
1794   return chars;
1795 } /* }}} static int journal_write */
1796
1797 static int journal_replay (const char *file) /* {{{ */
1798 {
1799   FILE *fh;
1800   int entry_cnt = 0;
1801   int fail_cnt = 0;
1802   uint64_t line = 0;
1803   char entry[CMD_MAX];
1804   time_t now;
1805
1806   if (file == NULL) return 0;
1807
1808   {
1809     char *reason = "unknown error";
1810     int status = 0;
1811     struct stat statbuf;
1812
1813     memset(&statbuf, 0, sizeof(statbuf));
1814     if (stat(file, &statbuf) != 0)
1815     {
1816       if (errno == ENOENT)
1817         return 0;
1818
1819       reason = "stat error";
1820       status = errno;
1821     }
1822     else if (!S_ISREG(statbuf.st_mode))
1823     {
1824       reason = "not a regular file";
1825       status = EPERM;
1826     }
1827     if (statbuf.st_uid != daemon_uid)
1828     {
1829       reason = "not owned by daemon user";
1830       status = EACCES;
1831     }
1832     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1833     {
1834       reason = "must not be user/group writable";
1835       status = EACCES;
1836     }
1837
1838     if (status != 0)
1839     {
1840       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1841                file, rrd_strerror(status), reason);
1842       return 0;
1843     }
1844   }
1845
1846   fh = fopen(file, "r");
1847   if (fh == NULL)
1848   {
1849     if (errno != ENOENT)
1850       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1851                file, rrd_strerror(errno));
1852     return 0;
1853   }
1854   else
1855     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1856
1857   now = time(NULL);
1858
1859   while(!feof(fh))
1860   {
1861     size_t entry_len;
1862
1863     ++line;
1864     if (fgets(entry, sizeof(entry), fh) == NULL)
1865       break;
1866     entry_len = strlen(entry);
1867
1868     /* check \n termination in case journal writing crashed mid-line */
1869     if (entry_len == 0)
1870       continue;
1871     else if (entry[entry_len - 1] != '\n')
1872     {
1873       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1874       ++fail_cnt;
1875       continue;
1876     }
1877
1878     entry[entry_len - 1] = '\0';
1879
1880     if (handle_request(NULL, now, entry, entry_len) == 0)
1881       ++entry_cnt;
1882     else
1883       ++fail_cnt;
1884   }
1885
1886   fclose(fh);
1887
1888   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1889            entry_cnt, fail_cnt);
1890
1891   return entry_cnt > 0 ? 1 : 0;
1892 } /* }}} static int journal_replay */
1893
1894 static void journal_init(void) /* {{{ */
1895 {
1896   int had_journal = 0;
1897
1898   if (journal_cur == NULL) return;
1899
1900   pthread_mutex_lock(&journal_lock);
1901
1902   RRDD_LOG(LOG_INFO, "checking for journal files");
1903
1904   had_journal += journal_replay(journal_old);
1905   had_journal += journal_replay(journal_cur);
1906
1907   /* it must have been a crash.  start a flush */
1908   if (had_journal && config_flush_at_shutdown)
1909     flush_old_values(-1);
1910
1911   pthread_mutex_unlock(&journal_lock);
1912   journal_rotate();
1913
1914   RRDD_LOG(LOG_INFO, "journal processing complete");
1915
1916 } /* }}} static void journal_init */
1917
1918 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
1919 {
1920   assert(sock != NULL);
1921
1922   free(sock->rbuf);  sock->rbuf = NULL;
1923   free(sock->wbuf);  sock->wbuf = NULL;
1924   free(sock);
1925 } /* }}} void free_listen_socket */
1926
1927 static void close_connection(listen_socket_t *sock) /* {{{ */
1928 {
1929   if (sock->fd >= 0)
1930   {
1931     close(sock->fd);
1932     sock->fd = -1;
1933   }
1934
1935   free_listen_socket(sock);
1936
1937 } /* }}} void close_connection */
1938
1939 static void *connection_thread_main (void *args) /* {{{ */
1940 {
1941   listen_socket_t *sock;
1942   int i;
1943   int fd;
1944
1945   sock = (listen_socket_t *) args;
1946   fd = sock->fd;
1947
1948   /* init read buffers */
1949   sock->next_read = sock->next_cmd = 0;
1950   sock->rbuf = malloc(RBUF_SIZE);
1951   if (sock->rbuf == NULL)
1952   {
1953     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1954     close_connection(sock);
1955     return NULL;
1956   }
1957
1958   pthread_mutex_lock (&connection_threads_lock);
1959   {
1960     pthread_t *temp;
1961
1962     temp = (pthread_t *) rrd_realloc (connection_threads,
1963         sizeof (pthread_t) * (connection_threads_num + 1));
1964     if (temp == NULL)
1965     {
1966       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc(++) failed.");
1967     }
1968     else
1969     {
1970       connection_threads = temp;
1971       connection_threads[connection_threads_num] = pthread_self ();
1972       connection_threads_num++;
1973     }
1974   }
1975   pthread_mutex_unlock (&connection_threads_lock);
1976
1977   while (do_shutdown == 0)
1978   {
1979     char *cmd;
1980     ssize_t cmd_len;
1981     ssize_t rbytes;
1982     time_t now;
1983
1984     struct pollfd pollfd;
1985     int status;
1986
1987     pollfd.fd = fd;
1988     pollfd.events = POLLIN | POLLPRI;
1989     pollfd.revents = 0;
1990
1991     status = poll (&pollfd, 1, /* timeout = */ 500);
1992     if (do_shutdown)
1993       break;
1994     else if (status == 0) /* timeout */
1995       continue;
1996     else if (status < 0) /* error */
1997     {
1998       status = errno;
1999       if (status != EINTR)
2000         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2001       continue;
2002     }
2003
2004     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2005       break;
2006     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2007     {
2008       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2009           "poll(2) returned something unexpected: %#04hx",
2010           pollfd.revents);
2011       break;
2012     }
2013
2014     rbytes = read(fd, sock->rbuf + sock->next_read,
2015                   RBUF_SIZE - sock->next_read);
2016     if (rbytes < 0)
2017     {
2018       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2019       break;
2020     }
2021     else if (rbytes == 0)
2022       break; /* eof */
2023
2024     sock->next_read += rbytes;
2025
2026     if (sock->batch_start)
2027       now = sock->batch_start;
2028     else
2029       now = time(NULL);
2030
2031     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2032     {
2033       status = handle_request (sock, now, cmd, cmd_len+1);
2034       if (status != 0)
2035         goto out_close;
2036     }
2037   }
2038
2039 out_close:
2040   close_connection(sock);
2041
2042   /* Remove this thread from the connection threads list */
2043   pthread_mutex_lock (&connection_threads_lock);
2044   {
2045     pthread_t self;
2046     pthread_t *temp;
2047
2048     /* Find out own index in the array */
2049     self = pthread_self ();
2050     for (i = 0; i < connection_threads_num; i++)
2051       if (pthread_equal (connection_threads[i], self) != 0)
2052         break;
2053     assert (i < connection_threads_num);
2054
2055     /* Move the trailing threads forward. */
2056     if (i < (connection_threads_num - 1))
2057     {
2058       memmove (connection_threads + i,
2059                connection_threads + i + 1,
2060                sizeof (pthread_t) * (connection_threads_num - i - 1));
2061     }
2062
2063     connection_threads_num--;
2064
2065     temp = rrd_realloc(connection_threads,
2066                    sizeof(*connection_threads) * connection_threads_num);
2067     if (connection_threads_num > 0 && temp == NULL)
2068       RRDD_LOG(LOG_ERR, "connection_thread_main: realloc(--) failed.");
2069     else
2070       connection_threads = temp;
2071   }
2072   pthread_mutex_unlock (&connection_threads_lock);
2073
2074   return (NULL);
2075 } /* }}} void *connection_thread_main */
2076
2077 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2078 {
2079   int fd;
2080   struct sockaddr_un sa;
2081   listen_socket_t *temp;
2082   int status;
2083   const char *path;
2084
2085   path = sock->addr;
2086   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2087     path += strlen("unix:");
2088
2089   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2090       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2091   if (temp == NULL)
2092   {
2093     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2094     return (-1);
2095   }
2096   listen_fds = temp;
2097   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2098
2099   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2100   if (fd < 0)
2101   {
2102     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2103              rrd_strerror(errno));
2104     return (-1);
2105   }
2106
2107   memset (&sa, 0, sizeof (sa));
2108   sa.sun_family = AF_UNIX;
2109   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2110
2111   /* if we've gotten this far, we own the pid file.  any daemon started
2112    * with the same args must not be alive.  therefore, ensure that we can
2113    * create the socket...
2114    */
2115   unlink(path);
2116
2117   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2118   if (status != 0)
2119   {
2120     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2121              path, rrd_strerror(errno));
2122     close (fd);
2123     return (-1);
2124   }
2125
2126   status = listen (fd, /* backlog = */ 10);
2127   if (status != 0)
2128   {
2129     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2130              path, rrd_strerror(errno));
2131     close (fd);
2132     unlink (path);
2133     return (-1);
2134   }
2135
2136   listen_fds[listen_fds_num].fd = fd;
2137   listen_fds[listen_fds_num].family = PF_UNIX;
2138   strncpy(listen_fds[listen_fds_num].addr, path,
2139           sizeof (listen_fds[listen_fds_num].addr) - 1);
2140   listen_fds_num++;
2141
2142   return (0);
2143 } /* }}} int open_listen_socket_unix */
2144
2145 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2146 {
2147   struct addrinfo ai_hints;
2148   struct addrinfo *ai_res;
2149   struct addrinfo *ai_ptr;
2150   char addr_copy[NI_MAXHOST];
2151   char *addr;
2152   char *port;
2153   int status;
2154
2155   strncpy (addr_copy, sock->addr, sizeof (addr_copy));
2156   addr_copy[sizeof (addr_copy) - 1] = 0;
2157   addr = addr_copy;
2158
2159   memset (&ai_hints, 0, sizeof (ai_hints));
2160   ai_hints.ai_flags = 0;
2161 #ifdef AI_ADDRCONFIG
2162   ai_hints.ai_flags |= AI_ADDRCONFIG;
2163 #endif
2164   ai_hints.ai_family = AF_UNSPEC;
2165   ai_hints.ai_socktype = SOCK_STREAM;
2166
2167   port = NULL;
2168   if (*addr == '[') /* IPv6+port format */
2169   {
2170     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2171     addr++;
2172
2173     port = strchr (addr, ']');
2174     if (port == NULL)
2175     {
2176       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2177       return (-1);
2178     }
2179     *port = 0;
2180     port++;
2181
2182     if (*port == ':')
2183       port++;
2184     else if (*port == 0)
2185       port = NULL;
2186     else
2187     {
2188       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2189       return (-1);
2190     }
2191   } /* if (*addr = ']') */
2192   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2193   {
2194     port = rindex(addr, ':');
2195     if (port != NULL)
2196     {
2197       *port = 0;
2198       port++;
2199     }
2200   }
2201   ai_res = NULL;
2202   status = getaddrinfo (addr,
2203                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2204                         &ai_hints, &ai_res);
2205   if (status != 0)
2206   {
2207     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2208              addr, gai_strerror (status));
2209     return (-1);
2210   }
2211
2212   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2213   {
2214     int fd;
2215     listen_socket_t *temp;
2216     int one = 1;
2217
2218     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2219         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2220     if (temp == NULL)
2221     {
2222       fprintf (stderr,
2223                "rrdcached: open_listen_socket_network: realloc failed.\n");
2224       continue;
2225     }
2226     listen_fds = temp;
2227     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2228
2229     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2230     if (fd < 0)
2231     {
2232       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2233                rrd_strerror(errno));
2234       continue;
2235     }
2236
2237     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2238
2239     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2240     if (status != 0)
2241     {
2242       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2243                sock->addr, rrd_strerror(errno));
2244       close (fd);
2245       continue;
2246     }
2247
2248     status = listen (fd, /* backlog = */ 10);
2249     if (status != 0)
2250     {
2251       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2252                sock->addr, rrd_strerror(errno));
2253       close (fd);
2254       freeaddrinfo(ai_res);
2255       return (-1);
2256     }
2257
2258     listen_fds[listen_fds_num].fd = fd;
2259     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2260     listen_fds_num++;
2261   } /* for (ai_ptr) */
2262
2263   freeaddrinfo(ai_res);
2264   return (0);
2265 } /* }}} static int open_listen_socket_network */
2266
2267 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2268 {
2269   assert(sock != NULL);
2270   assert(sock->addr != NULL);
2271
2272   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2273       || sock->addr[0] == '/')
2274     return (open_listen_socket_unix(sock));
2275   else
2276     return (open_listen_socket_network(sock));
2277 } /* }}} int open_listen_socket */
2278
2279 static int close_listen_sockets (void) /* {{{ */
2280 {
2281   size_t i;
2282
2283   for (i = 0; i < listen_fds_num; i++)
2284   {
2285     close (listen_fds[i].fd);
2286
2287     if (listen_fds[i].family == PF_UNIX)
2288       unlink(listen_fds[i].addr);
2289   }
2290
2291   free (listen_fds);
2292   listen_fds = NULL;
2293   listen_fds_num = 0;
2294
2295   return (0);
2296 } /* }}} int close_listen_sockets */
2297
2298 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2299 {
2300   struct pollfd *pollfds;
2301   int pollfds_num;
2302   int status;
2303   int i;
2304
2305   if (listen_fds_num < 1)
2306   {
2307     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2308     return (NULL);
2309   }
2310
2311   pollfds_num = listen_fds_num;
2312   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2313   if (pollfds == NULL)
2314   {
2315     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2316     return (NULL);
2317   }
2318   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2319
2320   RRDD_LOG(LOG_INFO, "listening for connections");
2321
2322   while (do_shutdown == 0)
2323   {
2324     for (i = 0; i < pollfds_num; i++)
2325     {
2326       pollfds[i].fd = listen_fds[i].fd;
2327       pollfds[i].events = POLLIN | POLLPRI;
2328       pollfds[i].revents = 0;
2329     }
2330
2331     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2332     if (do_shutdown)
2333       break;
2334     else if (status == 0) /* timeout */
2335       continue;
2336     else if (status < 0) /* error */
2337     {
2338       status = errno;
2339       if (status != EINTR)
2340       {
2341         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2342       }
2343       continue;
2344     }
2345
2346     for (i = 0; i < pollfds_num; i++)
2347     {
2348       listen_socket_t *client_sock;
2349       struct sockaddr_storage client_sa;
2350       socklen_t client_sa_size;
2351       pthread_t tid;
2352       pthread_attr_t attr;
2353
2354       if (pollfds[i].revents == 0)
2355         continue;
2356
2357       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2358       {
2359         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2360             "poll(2) returned something unexpected for listen FD #%i.",
2361             pollfds[i].fd);
2362         continue;
2363       }
2364
2365       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2366       if (client_sock == NULL)
2367       {
2368         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2369         continue;
2370       }
2371       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2372
2373       client_sa_size = sizeof (client_sa);
2374       client_sock->fd = accept (pollfds[i].fd,
2375           (struct sockaddr *) &client_sa, &client_sa_size);
2376       if (client_sock->fd < 0)
2377       {
2378         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2379         free(client_sock);
2380         continue;
2381       }
2382
2383       pthread_attr_init (&attr);
2384       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2385
2386       status = pthread_create (&tid, &attr, connection_thread_main,
2387                                client_sock);
2388       if (status != 0)
2389       {
2390         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2391         close_connection(client_sock);
2392         continue;
2393       }
2394     } /* for (pollfds_num) */
2395   } /* while (do_shutdown == 0) */
2396
2397   RRDD_LOG(LOG_INFO, "starting shutdown");
2398
2399   close_listen_sockets ();
2400
2401   pthread_mutex_lock (&connection_threads_lock);
2402   while (connection_threads_num > 0)
2403   {
2404     pthread_t wait_for;
2405
2406     wait_for = connection_threads[0];
2407
2408     pthread_mutex_unlock (&connection_threads_lock);
2409     pthread_join (wait_for, /* retval = */ NULL);
2410     pthread_mutex_lock (&connection_threads_lock);
2411   }
2412   pthread_mutex_unlock (&connection_threads_lock);
2413
2414   free(pollfds);
2415
2416   return (NULL);
2417 } /* }}} void *listen_thread_main */
2418
2419 static int daemonize (void) /* {{{ */
2420 {
2421   int pid_fd;
2422   char *base_dir;
2423
2424   daemon_uid = geteuid();
2425
2426   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2427   if (pid_fd < 0)
2428     pid_fd = check_pidfile();
2429   if (pid_fd < 0)
2430     return pid_fd;
2431
2432   /* open all the listen sockets */
2433   if (config_listen_address_list_len > 0)
2434   {
2435     for (int i = 0; i < config_listen_address_list_len; i++)
2436     {
2437       open_listen_socket (config_listen_address_list[i]);
2438       free_listen_socket (config_listen_address_list[i]);
2439     }
2440
2441     free(config_listen_address_list);
2442   }
2443   else
2444   {
2445     listen_socket_t sock;
2446     memset(&sock, 0, sizeof(sock));
2447     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
2448     open_listen_socket (&sock);
2449   }
2450
2451   if (listen_fds_num < 1)
2452   {
2453     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2454     goto error;
2455   }
2456
2457   if (!stay_foreground)
2458   {
2459     pid_t child;
2460
2461     child = fork ();
2462     if (child < 0)
2463     {
2464       fprintf (stderr, "daemonize: fork(2) failed.\n");
2465       goto error;
2466     }
2467     else if (child > 0)
2468       exit(0);
2469
2470     /* Become session leader */
2471     setsid ();
2472
2473     /* Open the first three file descriptors to /dev/null */
2474     close (2);
2475     close (1);
2476     close (0);
2477
2478     open ("/dev/null", O_RDWR);
2479     dup (0);
2480     dup (0);
2481   } /* if (!stay_foreground) */
2482
2483   /* Change into the /tmp directory. */
2484   base_dir = (config_base_dir != NULL)
2485     ? config_base_dir
2486     : "/tmp";
2487
2488   if (chdir (base_dir) != 0)
2489   {
2490     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2491     goto error;
2492   }
2493
2494   install_signal_handlers();
2495
2496   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2497   RRDD_LOG(LOG_INFO, "starting up");
2498
2499   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2500                                 (GDestroyNotify) free_cache_item);
2501   if (cache_tree == NULL)
2502   {
2503     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2504     goto error;
2505   }
2506
2507   return write_pidfile (pid_fd);
2508
2509 error:
2510   remove_pidfile();
2511   return -1;
2512 } /* }}} int daemonize */
2513
2514 static int cleanup (void) /* {{{ */
2515 {
2516   do_shutdown++;
2517
2518   pthread_cond_signal (&cache_cond);
2519   pthread_join (queue_thread, /* return = */ NULL);
2520
2521   remove_pidfile ();
2522
2523   free(config_base_dir);
2524   free(config_pid_file);
2525   free(journal_cur);
2526   free(journal_old);
2527
2528   pthread_mutex_lock(&cache_lock);
2529   g_tree_destroy(cache_tree);
2530
2531   RRDD_LOG(LOG_INFO, "goodbye");
2532   closelog ();
2533
2534   return (0);
2535 } /* }}} int cleanup */
2536
2537 static int read_options (int argc, char **argv) /* {{{ */
2538 {
2539   int option;
2540   int status = 0;
2541
2542   while ((option = getopt(argc, argv, "gl:L:f:w:b:Bz:p:j:h?F")) != -1)
2543   {
2544     switch (option)
2545     {
2546       case 'g':
2547         stay_foreground=1;
2548         break;
2549
2550       case 'L':
2551       case 'l':
2552       {
2553         listen_socket_t **temp;
2554         listen_socket_t *new;
2555
2556         new = malloc(sizeof(listen_socket_t));
2557         if (new == NULL)
2558         {
2559           fprintf(stderr, "read_options: malloc failed.\n");
2560           return(2);
2561         }
2562         memset(new, 0, sizeof(listen_socket_t));
2563
2564         temp = (listen_socket_t **) rrd_realloc (config_listen_address_list,
2565             sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2566         if (temp == NULL)
2567         {
2568           fprintf (stderr, "read_options: realloc failed.\n");
2569           return (2);
2570         }
2571         config_listen_address_list = temp;
2572
2573         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2574         new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2575
2576         temp[config_listen_address_list_len] = new;
2577         config_listen_address_list_len++;
2578       }
2579       break;
2580
2581       case 'f':
2582       {
2583         int temp;
2584
2585         temp = atoi (optarg);
2586         if (temp > 0)
2587           config_flush_interval = temp;
2588         else
2589         {
2590           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2591           status = 3;
2592         }
2593       }
2594       break;
2595
2596       case 'w':
2597       {
2598         int temp;
2599
2600         temp = atoi (optarg);
2601         if (temp > 0)
2602           config_write_interval = temp;
2603         else
2604         {
2605           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2606           status = 2;
2607         }
2608       }
2609       break;
2610
2611       case 'z':
2612       {
2613         int temp;
2614
2615         temp = atoi(optarg);
2616         if (temp > 0)
2617           config_write_jitter = temp;
2618         else
2619         {
2620           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2621           status = 2;
2622         }
2623
2624         break;
2625       }
2626
2627       case 'B':
2628         config_write_base_only = 1;
2629         break;
2630
2631       case 'b':
2632       {
2633         size_t len;
2634         char base_realpath[PATH_MAX];
2635
2636         if (config_base_dir != NULL)
2637           free (config_base_dir);
2638         config_base_dir = strdup (optarg);
2639         if (config_base_dir == NULL)
2640         {
2641           fprintf (stderr, "read_options: strdup failed.\n");
2642           return (3);
2643         }
2644
2645         /* make sure that the base directory is not resolved via
2646          * symbolic links.  this makes some performance-enhancing
2647          * assumptions possible (we don't have to resolve paths
2648          * that start with a "/")
2649          */
2650         if (realpath(config_base_dir, base_realpath) == NULL)
2651         {
2652           fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2653           return 5;
2654         }
2655         else if (strncmp(config_base_dir,
2656                          base_realpath, sizeof(base_realpath)) != 0)
2657         {
2658           fprintf(stderr,
2659                   "Base directory (-b) resolved via file system links!\n"
2660                   "Please consult rrdcached '-b' documentation!\n"
2661                   "Consider specifying the real directory (%s)\n",
2662                   base_realpath);
2663           return 5;
2664         }
2665
2666         len = strlen (config_base_dir);
2667         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2668         {
2669           config_base_dir[len - 1] = 0;
2670           len--;
2671         }
2672
2673         if (len < 1)
2674         {
2675           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2676           return (4);
2677         }
2678
2679         _config_base_dir_len = len;
2680       }
2681       break;
2682
2683       case 'p':
2684       {
2685         if (config_pid_file != NULL)
2686           free (config_pid_file);
2687         config_pid_file = strdup (optarg);
2688         if (config_pid_file == NULL)
2689         {
2690           fprintf (stderr, "read_options: strdup failed.\n");
2691           return (3);
2692         }
2693       }
2694       break;
2695
2696       case 'F':
2697         config_flush_at_shutdown = 1;
2698         break;
2699
2700       case 'j':
2701       {
2702         struct stat statbuf;
2703         const char *dir = optarg;
2704
2705         status = stat(dir, &statbuf);
2706         if (status != 0)
2707         {
2708           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2709           return 6;
2710         }
2711
2712         if (!S_ISDIR(statbuf.st_mode)
2713             || access(dir, R_OK|W_OK|X_OK) != 0)
2714         {
2715           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2716                   errno ? rrd_strerror(errno) : "");
2717           return 6;
2718         }
2719
2720         journal_cur = malloc(PATH_MAX + 1);
2721         journal_old = malloc(PATH_MAX + 1);
2722         if (journal_cur == NULL || journal_old == NULL)
2723         {
2724           fprintf(stderr, "malloc failure for journal files\n");
2725           return 6;
2726         }
2727         else 
2728         {
2729           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2730           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2731         }
2732       }
2733       break;
2734
2735       case 'h':
2736       case '?':
2737         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2738             "\n"
2739             "Usage: rrdcached [options]\n"
2740             "\n"
2741             "Valid options are:\n"
2742             "  -l <address>  Socket address to listen to.\n"
2743             "  -L <address>  Socket address to listen to ('FLUSH' only).\n"
2744             "  -w <seconds>  Interval in which to write data.\n"
2745             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2746             "  -f <seconds>  Interval in which to flush dead data.\n"
2747             "  -p <file>     Location of the PID-file.\n"
2748             "  -b <dir>      Base directory to change to.\n"
2749             "  -B            Restrict file access to paths within -b <dir>\n"
2750             "  -g            Do not fork and run in the foreground.\n"
2751             "  -j <dir>      Directory in which to create the journal files.\n"
2752             "  -F            Always flush all updates at shutdown\n"
2753             "\n"
2754             "For more information and a detailed description of all options "
2755             "please refer\n"
2756             "to the rrdcached(1) manual page.\n",
2757             VERSION);
2758         status = -1;
2759         break;
2760     } /* switch (option) */
2761   } /* while (getopt) */
2762
2763   /* advise the user when values are not sane */
2764   if (config_flush_interval < 2 * config_write_interval)
2765     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2766             " 2x write interval (-w) !\n");
2767   if (config_write_jitter > config_write_interval)
2768     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2769             " write interval (-w) !\n");
2770
2771   if (config_write_base_only && config_base_dir == NULL)
2772     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2773             "  Consult the rrdcached documentation\n");
2774
2775   if (journal_cur == NULL)
2776     config_flush_at_shutdown = 1;
2777
2778   return (status);
2779 } /* }}} int read_options */
2780
2781 int main (int argc, char **argv)
2782 {
2783   int status;
2784
2785   status = read_options (argc, argv);
2786   if (status != 0)
2787   {
2788     if (status < 0)
2789       status = 0;
2790     return (status);
2791   }
2792
2793   status = daemonize ();
2794   if (status != 0)
2795   {
2796     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
2797     return (1);
2798   }
2799
2800   journal_init();
2801
2802   /* start the queue thread */
2803   memset (&queue_thread, 0, sizeof (queue_thread));
2804   status = pthread_create (&queue_thread,
2805                            NULL, /* attr */
2806                            queue_thread_main,
2807                            NULL); /* args */
2808   if (status != 0)
2809   {
2810     RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2811     cleanup();
2812     return (1);
2813   }
2814
2815   listen_thread_main (NULL);
2816   cleanup ();
2817
2818   return (0);
2819 } /* int main */
2820
2821 /*
2822  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2823  */