97eed320a04a5008fcf880e6e5c7d142e0932da5
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
73
74 #include "rrd.h"
75 #include "rrd_client.h"
76
77 #include <stdlib.h>
78 #include <stdint.h>
79 #include <stdio.h>
80 #include <unistd.h>
81 #include <string.h>
82 #include <strings.h>
83 #include <stdint.h>
84 #include <inttypes.h>
85
86 #include <sys/types.h>
87 #include <sys/stat.h>
88 #include <fcntl.h>
89 #include <signal.h>
90 #include <sys/socket.h>
91 #include <sys/un.h>
92 #include <netdb.h>
93 #include <poll.h>
94 #include <syslog.h>
95 #include <pthread.h>
96 #include <errno.h>
97 #include <assert.h>
98 #include <sys/time.h>
99 #include <time.h>
100
101 #include <glib-2.0/glib.h>
102 /* }}} */
103
104 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
105
106 #ifndef __GNUC__
107 # define __attribute__(x) /**/
108 #endif
109
110 /*
111  * Types
112  */
113 typedef enum
114 {
115   PRIV_LOW,
116   PRIV_HIGH
117 } socket_privilege;
118
119 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
120
121 struct listen_socket_s
122 {
123   int fd;
124   char addr[PATH_MAX + 1];
125   int family;
126   socket_privilege privilege;
127
128   /* state for BATCH processing */
129   time_t batch_start;
130   int batch_cmd;
131
132   /* buffered IO */
133   char *rbuf;
134   off_t next_cmd;
135   off_t next_read;
136
137   char *wbuf;
138   ssize_t wbuf_len;
139 };
140 typedef struct listen_socket_s listen_socket_t;
141
142 struct cache_item_s;
143 typedef struct cache_item_s cache_item_t;
144 struct cache_item_s
145 {
146   char *file;
147   char **values;
148   int values_num;
149   time_t last_flush_time;
150   time_t last_update_stamp;
151 #define CI_FLAGS_IN_TREE  (1<<0)
152 #define CI_FLAGS_IN_QUEUE (1<<1)
153   int flags;
154   pthread_cond_t  flushed;
155   cache_item_t *prev;
156   cache_item_t *next;
157 };
158
159 struct callback_flush_data_s
160 {
161   time_t now;
162   time_t abs_timeout;
163   char **keys;
164   size_t keys_num;
165 };
166 typedef struct callback_flush_data_s callback_flush_data_t;
167
168 enum queue_side_e
169 {
170   HEAD,
171   TAIL
172 };
173 typedef enum queue_side_e queue_side_t;
174
175 /* max length of socket command or response */
176 #define CMD_MAX 4096
177 #define RBUF_SIZE (CMD_MAX*2)
178
179 /*
180  * Variables
181  */
182 static int stay_foreground = 0;
183 static uid_t daemon_uid;
184
185 static listen_socket_t *listen_fds = NULL;
186 static size_t listen_fds_num = 0;
187
188 static int do_shutdown = 0;
189
190 static pthread_t queue_thread;
191
192 static pthread_t *connection_threads = NULL;
193 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
194 static int connection_threads_num = 0;
195
196 /* Cache stuff */
197 static GTree          *cache_tree = NULL;
198 static cache_item_t   *cache_queue_head = NULL;
199 static cache_item_t   *cache_queue_tail = NULL;
200 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
201 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
202
203 static int config_write_interval = 300;
204 static int config_write_jitter   = 0;
205 static int config_flush_interval = 3600;
206 static int config_flush_at_shutdown = 0;
207 static char *config_pid_file = NULL;
208 static char *config_base_dir = NULL;
209 static size_t _config_base_dir_len = 0;
210 static int config_write_base_only = 0;
211
212 static listen_socket_t **config_listen_address_list = NULL;
213 static int config_listen_address_list_len = 0;
214
215 static uint64_t stats_queue_length = 0;
216 static uint64_t stats_updates_received = 0;
217 static uint64_t stats_flush_received = 0;
218 static uint64_t stats_updates_written = 0;
219 static uint64_t stats_data_sets_written = 0;
220 static uint64_t stats_journal_bytes = 0;
221 static uint64_t stats_journal_rotate = 0;
222 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
223
224 /* Journaled updates */
225 static char *journal_cur = NULL;
226 static char *journal_old = NULL;
227 static FILE *journal_fh = NULL;
228 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
229 static int journal_write(char *cmd, char *args);
230 static void journal_done(void);
231 static void journal_rotate(void);
232
233 /* 
234  * Functions
235  */
236 static void sig_common (const char *sig) /* {{{ */
237 {
238   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
239   do_shutdown++;
240   pthread_cond_broadcast(&cache_cond);
241 } /* }}} void sig_common */
242
243 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
244 {
245   sig_common("INT");
246 } /* }}} void sig_int_handler */
247
248 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
249 {
250   sig_common("TERM");
251 } /* }}} void sig_term_handler */
252
253 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
254 {
255   config_flush_at_shutdown = 1;
256   sig_common("USR1");
257 } /* }}} void sig_usr1_handler */
258
259 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
260 {
261   config_flush_at_shutdown = 0;
262   sig_common("USR2");
263 } /* }}} void sig_usr2_handler */
264
265 static void install_signal_handlers(void) /* {{{ */
266 {
267   /* These structures are static, because `sigaction' behaves weird if the are
268    * overwritten.. */
269   static struct sigaction sa_int;
270   static struct sigaction sa_term;
271   static struct sigaction sa_pipe;
272   static struct sigaction sa_usr1;
273   static struct sigaction sa_usr2;
274
275   /* Install signal handlers */
276   memset (&sa_int, 0, sizeof (sa_int));
277   sa_int.sa_handler = sig_int_handler;
278   sigaction (SIGINT, &sa_int, NULL);
279
280   memset (&sa_term, 0, sizeof (sa_term));
281   sa_term.sa_handler = sig_term_handler;
282   sigaction (SIGTERM, &sa_term, NULL);
283
284   memset (&sa_pipe, 0, sizeof (sa_pipe));
285   sa_pipe.sa_handler = SIG_IGN;
286   sigaction (SIGPIPE, &sa_pipe, NULL);
287
288   memset (&sa_pipe, 0, sizeof (sa_usr1));
289   sa_usr1.sa_handler = sig_usr1_handler;
290   sigaction (SIGUSR1, &sa_usr1, NULL);
291
292   memset (&sa_usr2, 0, sizeof (sa_usr2));
293   sa_usr2.sa_handler = sig_usr2_handler;
294   sigaction (SIGUSR2, &sa_usr2, NULL);
295
296 } /* }}} void install_signal_handlers */
297
298 static int open_pidfile(char *action, int oflag) /* {{{ */
299 {
300   int fd;
301   char *file;
302
303   file = (config_pid_file != NULL)
304     ? config_pid_file
305     : LOCALSTATEDIR "/run/rrdcached.pid";
306
307   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
308   if (fd < 0)
309     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
310             action, file, rrd_strerror(errno));
311
312   return(fd);
313 } /* }}} static int open_pidfile */
314
315 /* check existing pid file to see whether a daemon is running */
316 static int check_pidfile(void)
317 {
318   int pid_fd;
319   pid_t pid;
320   char pid_str[16];
321
322   pid_fd = open_pidfile("open", O_RDWR);
323   if (pid_fd < 0)
324     return pid_fd;
325
326   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
327     return -1;
328
329   pid = atoi(pid_str);
330   if (pid <= 0)
331     return -1;
332
333   /* another running process that we can signal COULD be
334    * a competing rrdcached */
335   if (pid != getpid() && kill(pid, 0) == 0)
336   {
337     fprintf(stderr,
338             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
339     close(pid_fd);
340     return -1;
341   }
342
343   lseek(pid_fd, 0, SEEK_SET);
344   ftruncate(pid_fd, 0);
345
346   fprintf(stderr,
347           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
348           "rrdcached: starting normally.\n", pid);
349
350   return pid_fd;
351 } /* }}} static int check_pidfile */
352
353 static int write_pidfile (int fd) /* {{{ */
354 {
355   pid_t pid;
356   FILE *fh;
357
358   pid = getpid ();
359
360   fh = fdopen (fd, "w");
361   if (fh == NULL)
362   {
363     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
364     close(fd);
365     return (-1);
366   }
367
368   fprintf (fh, "%i\n", (int) pid);
369   fclose (fh);
370
371   return (0);
372 } /* }}} int write_pidfile */
373
374 static int remove_pidfile (void) /* {{{ */
375 {
376   char *file;
377   int status;
378
379   file = (config_pid_file != NULL)
380     ? config_pid_file
381     : LOCALSTATEDIR "/run/rrdcached.pid";
382
383   status = unlink (file);
384   if (status == 0)
385     return (0);
386   return (errno);
387 } /* }}} int remove_pidfile */
388
389 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
390 {
391   char *eol;
392
393   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
394                sock->next_read - sock->next_cmd);
395
396   if (eol == NULL)
397   {
398     /* no commands left, move remainder back to front of rbuf */
399     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
400             sock->next_read - sock->next_cmd);
401     sock->next_read -= sock->next_cmd;
402     sock->next_cmd = 0;
403     *len = 0;
404     return NULL;
405   }
406   else
407   {
408     char *cmd = sock->rbuf + sock->next_cmd;
409     *eol = '\0';
410
411     sock->next_cmd = eol - sock->rbuf + 1;
412
413     if (eol > sock->rbuf && *(eol-1) == '\r')
414       *(--eol) = '\0'; /* handle "\r\n" EOL */
415
416     *len = eol - cmd;
417
418     return cmd;
419   }
420
421   /* NOTREACHED */
422   assert(1==0);
423 }
424
425 /* add the characters directly to the write buffer */
426 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
427 {
428   char *new_buf;
429
430   assert(sock != NULL);
431
432   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
433   if (new_buf == NULL)
434   {
435     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
436     return -1;
437   }
438
439   strncpy(new_buf + sock->wbuf_len, str, len + 1);
440
441   sock->wbuf = new_buf;
442   sock->wbuf_len += len;
443
444   return 0;
445 } /* }}} static int add_to_wbuf */
446
447 /* add the text to the "extra" info that's sent after the status line */
448 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
449 {
450   va_list argp;
451   char buffer[CMD_MAX];
452   int len;
453
454   if (sock == NULL) return 0; /* journal replay mode */
455   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
456
457   va_start(argp, fmt);
458 #ifdef HAVE_VSNPRINTF
459   len = vsnprintf(buffer, sizeof(buffer)-1, fmt, argp);
460 #else
461   len = vsprintf(buffer, fmt, argp);
462 #endif
463   va_end(argp);
464   if (len < 0)
465   {
466     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
467     return -1;
468   }
469
470   return add_to_wbuf(sock, buffer, len);
471 } /* }}} static int add_response_info */
472
473 static int count_lines(char *str) /* {{{ */
474 {
475   int lines = 0;
476
477   if (str != NULL)
478   {
479     while ((str = strchr(str, '\n')) != NULL)
480     {
481       ++lines;
482       ++str;
483     }
484   }
485
486   return lines;
487 } /* }}} static int count_lines */
488
489 /* send the response back to the user.
490  * returns 0 on success, -1 on error
491  * write buffer is always zeroed after this call */
492 static int send_response (listen_socket_t *sock, response_code rc,
493                           char *fmt, ...) /* {{{ */
494 {
495   va_list argp;
496   char buffer[CMD_MAX];
497   int lines;
498   ssize_t wrote;
499   int rclen, len;
500
501   if (sock == NULL) return rc;  /* journal replay mode */
502
503   if (sock->batch_start)
504   {
505     if (rc == RESP_OK)
506       return rc; /* no response on success during BATCH */
507     lines = sock->batch_cmd;
508   }
509   else if (rc == RESP_OK)
510     lines = count_lines(sock->wbuf);
511   else
512     lines = -1;
513
514   rclen = sprintf(buffer, "%d ", lines);
515   va_start(argp, fmt);
516 #ifdef HAVE_VSNPRINTF
517   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen-1, fmt, argp);
518 #else
519   len = vsprintf(buffer+rclen, fmt, argp);
520 #endif
521   va_end(argp);
522   if (len < 0)
523     return -1;
524
525   len += rclen;
526
527   /* append the result to the wbuf, don't write to the user */
528   if (sock->batch_start)
529     return add_to_wbuf(sock, buffer, len);
530
531   /* first write must be complete */
532   if (len != write(sock->fd, buffer, len))
533   {
534     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
535     return -1;
536   }
537
538   if (sock->wbuf != NULL && rc == RESP_OK)
539   {
540     wrote = 0;
541     while (wrote < sock->wbuf_len)
542     {
543       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
544       if (wb <= 0)
545       {
546         RRDD_LOG(LOG_INFO, "send_response: could not write results");
547         return -1;
548       }
549       wrote += wb;
550     }
551   }
552
553   free(sock->wbuf); sock->wbuf = NULL;
554   sock->wbuf_len = 0;
555
556   return 0;
557 } /* }}} */
558
559 static void wipe_ci_values(cache_item_t *ci, time_t when)
560 {
561   ci->values = NULL;
562   ci->values_num = 0;
563
564   ci->last_flush_time = when;
565   if (config_write_jitter > 0)
566     ci->last_flush_time += (random() % config_write_jitter);
567 }
568
569 /* remove_from_queue
570  * remove a "cache_item_t" item from the queue.
571  * must hold 'cache_lock' when calling this
572  */
573 static void remove_from_queue(cache_item_t *ci) /* {{{ */
574 {
575   if (ci == NULL) return;
576   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
577
578   if (ci->prev == NULL)
579     cache_queue_head = ci->next; /* reset head */
580   else
581     ci->prev->next = ci->next;
582
583   if (ci->next == NULL)
584     cache_queue_tail = ci->prev; /* reset the tail */
585   else
586     ci->next->prev = ci->prev;
587
588   ci->next = ci->prev = NULL;
589   ci->flags &= ~CI_FLAGS_IN_QUEUE;
590
591   pthread_mutex_lock (&stats_lock);
592   assert (stats_queue_length > 0);
593   stats_queue_length--;
594   pthread_mutex_unlock (&stats_lock);
595
596 } /* }}} static void remove_from_queue */
597
598 /* free the resources associated with the cache_item_t
599  * must hold cache_lock when calling this function
600  */
601 static void *free_cache_item(cache_item_t *ci) /* {{{ */
602 {
603   if (ci == NULL) return NULL;
604
605   remove_from_queue(ci);
606
607   for (int i=0; i < ci->values_num; i++)
608     free(ci->values[i]);
609
610   free (ci->values);
611   free (ci->file);
612
613   /* in case anyone is waiting */
614   pthread_cond_broadcast(&ci->flushed);
615
616   free (ci);
617
618   return NULL;
619 } /* }}} static void *free_cache_item */
620
621 /*
622  * enqueue_cache_item:
623  * `cache_lock' must be acquired before calling this function!
624  */
625 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
626     queue_side_t side)
627 {
628   if (ci == NULL)
629     return (-1);
630
631   if (ci->values_num == 0)
632     return (0);
633
634   if (side == HEAD)
635   {
636     if (cache_queue_head == ci)
637       return 0;
638
639     /* remove if further down in queue */
640     remove_from_queue(ci);
641
642     ci->prev = NULL;
643     ci->next = cache_queue_head;
644     if (ci->next != NULL)
645       ci->next->prev = ci;
646     cache_queue_head = ci;
647
648     if (cache_queue_tail == NULL)
649       cache_queue_tail = cache_queue_head;
650   }
651   else /* (side == TAIL) */
652   {
653     /* We don't move values back in the list.. */
654     if (ci->flags & CI_FLAGS_IN_QUEUE)
655       return (0);
656
657     assert (ci->next == NULL);
658     assert (ci->prev == NULL);
659
660     ci->prev = cache_queue_tail;
661
662     if (cache_queue_tail == NULL)
663       cache_queue_head = ci;
664     else
665       cache_queue_tail->next = ci;
666
667     cache_queue_tail = ci;
668   }
669
670   ci->flags |= CI_FLAGS_IN_QUEUE;
671
672   pthread_cond_broadcast(&cache_cond);
673   pthread_mutex_lock (&stats_lock);
674   stats_queue_length++;
675   pthread_mutex_unlock (&stats_lock);
676
677   return (0);
678 } /* }}} int enqueue_cache_item */
679
680 /*
681  * tree_callback_flush:
682  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
683  * while this is in progress.
684  */
685 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
686     gpointer data)
687 {
688   cache_item_t *ci;
689   callback_flush_data_t *cfd;
690
691   ci = (cache_item_t *) value;
692   cfd = (callback_flush_data_t *) data;
693
694   if (ci->flags & CI_FLAGS_IN_QUEUE)
695     return FALSE;
696
697   if ((ci->last_flush_time <= cfd->abs_timeout)
698       && (ci->values_num > 0))
699   {
700     enqueue_cache_item (ci, TAIL);
701   }
702   else if ((do_shutdown != 0)
703       && (ci->values_num > 0))
704   {
705     enqueue_cache_item (ci, TAIL);
706   }
707   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
708       && (ci->values_num <= 0))
709   {
710     char **temp;
711
712     temp = (char **) rrd_realloc (cfd->keys,
713         sizeof (char *) * (cfd->keys_num + 1));
714     if (temp == NULL)
715     {
716       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
717       return (FALSE);
718     }
719     cfd->keys = temp;
720     /* Make really sure this points to the _same_ place */
721     assert ((char *) key == ci->file);
722     cfd->keys[cfd->keys_num] = (char *) key;
723     cfd->keys_num++;
724   }
725
726   return (FALSE);
727 } /* }}} gboolean tree_callback_flush */
728
729 static int flush_old_values (int max_age)
730 {
731   callback_flush_data_t cfd;
732   size_t k;
733
734   memset (&cfd, 0, sizeof (cfd));
735   /* Pass the current time as user data so that we don't need to call
736    * `time' for each node. */
737   cfd.now = time (NULL);
738   cfd.keys = NULL;
739   cfd.keys_num = 0;
740
741   if (max_age > 0)
742     cfd.abs_timeout = cfd.now - max_age;
743   else
744     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
745
746   /* `tree_callback_flush' will return the keys of all values that haven't
747    * been touched in the last `config_flush_interval' seconds in `cfd'.
748    * The char*'s in this array point to the same memory as ci->file, so we
749    * don't need to free them separately. */
750   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
751
752   for (k = 0; k < cfd.keys_num; k++)
753   {
754     /* should never fail, since we have held the cache_lock
755      * the entire time */
756     assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
757   }
758
759   if (cfd.keys != NULL)
760   {
761     free (cfd.keys);
762     cfd.keys = NULL;
763   }
764
765   return (0);
766 } /* int flush_old_values */
767
768 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
769 {
770   struct timeval now;
771   struct timespec next_flush;
772   int final_flush = 0; /* make sure we only flush once on shutdown */
773
774   gettimeofday (&now, NULL);
775   next_flush.tv_sec = now.tv_sec + config_flush_interval;
776   next_flush.tv_nsec = 1000 * now.tv_usec;
777
778   pthread_mutex_lock (&cache_lock);
779   while ((do_shutdown == 0) || (cache_queue_head != NULL))
780   {
781     cache_item_t *ci;
782     char *file;
783     char **values;
784     int values_num;
785     int status;
786     int i;
787
788     /* First, check if it's time to do the cache flush. */
789     gettimeofday (&now, NULL);
790     if ((now.tv_sec > next_flush.tv_sec)
791         || ((now.tv_sec == next_flush.tv_sec)
792           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
793     {
794       /* Flush all values that haven't been written in the last
795        * `config_write_interval' seconds. */
796       flush_old_values (config_write_interval);
797
798       /* Determine the time of the next cache flush. */
799       next_flush.tv_sec =
800         now.tv_sec + next_flush.tv_sec % config_flush_interval;
801
802       /* unlock the cache while we rotate so we don't block incoming
803        * updates if the fsync() blocks on disk I/O */
804       pthread_mutex_unlock(&cache_lock);
805       journal_rotate();
806       pthread_mutex_lock(&cache_lock);
807     }
808
809     /* Now, check if there's something to store away. If not, wait until
810      * something comes in or it's time to do the cache flush.  if we are
811      * shutting down, do not wait around.  */
812     if (cache_queue_head == NULL && !do_shutdown)
813     {
814       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
815       if ((status != 0) && (status != ETIMEDOUT))
816       {
817         RRDD_LOG (LOG_ERR, "queue_thread_main: "
818             "pthread_cond_timedwait returned %i.", status);
819       }
820     }
821
822     /* We're about to shut down */
823     if (do_shutdown != 0 && !final_flush++)
824     {
825       if (config_flush_at_shutdown)
826         flush_old_values (-1); /* flush everything */
827       else
828         break;
829     }
830
831     /* Check if a value has arrived. This may be NULL if we timed out or there
832      * was an interrupt such as a signal. */
833     if (cache_queue_head == NULL)
834       continue;
835
836     ci = cache_queue_head;
837
838     /* copy the relevant parts */
839     file = strdup (ci->file);
840     if (file == NULL)
841     {
842       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
843       continue;
844     }
845
846     assert(ci->values != NULL);
847     assert(ci->values_num > 0);
848
849     values = ci->values;
850     values_num = ci->values_num;
851
852     wipe_ci_values(ci, time(NULL));
853     remove_from_queue(ci);
854
855     pthread_mutex_unlock (&cache_lock);
856
857     rrd_clear_error ();
858     status = rrd_update_r (file, NULL, values_num, (void *) values);
859     if (status != 0)
860     {
861       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
862           "rrd_update_r (%s) failed with status %i. (%s)",
863           file, status, rrd_get_error());
864     }
865
866     journal_write("wrote", file);
867     pthread_cond_broadcast(&ci->flushed);
868
869     for (i = 0; i < values_num; i++)
870       free (values[i]);
871
872     free(values);
873     free(file);
874
875     if (status == 0)
876     {
877       pthread_mutex_lock (&stats_lock);
878       stats_updates_written++;
879       stats_data_sets_written += values_num;
880       pthread_mutex_unlock (&stats_lock);
881     }
882
883     pthread_mutex_lock (&cache_lock);
884
885     /* We're about to shut down */
886     if (do_shutdown != 0 && !final_flush++)
887     {
888       if (config_flush_at_shutdown)
889           flush_old_values (-1); /* flush everything */
890       else
891         break;
892     }
893   } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
894   pthread_mutex_unlock (&cache_lock);
895
896   if (config_flush_at_shutdown)
897   {
898     assert(cache_queue_head == NULL);
899     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
900   }
901
902   journal_done();
903
904   return (NULL);
905 } /* }}} void *queue_thread_main */
906
907 static int buffer_get_field (char **buffer_ret, /* {{{ */
908     size_t *buffer_size_ret, char **field_ret)
909 {
910   char *buffer;
911   size_t buffer_pos;
912   size_t buffer_size;
913   char *field;
914   size_t field_size;
915   int status;
916
917   buffer = *buffer_ret;
918   buffer_pos = 0;
919   buffer_size = *buffer_size_ret;
920   field = *buffer_ret;
921   field_size = 0;
922
923   if (buffer_size <= 0)
924     return (-1);
925
926   /* This is ensured by `handle_request'. */
927   assert (buffer[buffer_size - 1] == '\0');
928
929   status = -1;
930   while (buffer_pos < buffer_size)
931   {
932     /* Check for end-of-field or end-of-buffer */
933     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
934     {
935       field[field_size] = 0;
936       field_size++;
937       buffer_pos++;
938       status = 0;
939       break;
940     }
941     /* Handle escaped characters. */
942     else if (buffer[buffer_pos] == '\\')
943     {
944       if (buffer_pos >= (buffer_size - 1))
945         break;
946       buffer_pos++;
947       field[field_size] = buffer[buffer_pos];
948       field_size++;
949       buffer_pos++;
950     }
951     /* Normal operation */ 
952     else
953     {
954       field[field_size] = buffer[buffer_pos];
955       field_size++;
956       buffer_pos++;
957     }
958   } /* while (buffer_pos < buffer_size) */
959
960   if (status != 0)
961     return (status);
962
963   *buffer_ret = buffer + buffer_pos;
964   *buffer_size_ret = buffer_size - buffer_pos;
965   *field_ret = field;
966
967   return (0);
968 } /* }}} int buffer_get_field */
969
970 /* if we're restricting writes to the base directory,
971  * check whether the file falls within the dir
972  * returns 1 if OK, otherwise 0
973  */
974 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
975 {
976   assert(file != NULL);
977
978   if (!config_write_base_only
979       || sock == NULL /* journal replay */
980       || config_base_dir == NULL)
981     return 1;
982
983   if (strstr(file, "../") != NULL) goto err;
984
985   /* relative paths without "../" are ok */
986   if (*file != '/') return 1;
987
988   /* file must be of the format base + "/" + <1+ char filename> */
989   if (strlen(file) < _config_base_dir_len + 2) goto err;
990   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
991   if (*(file + _config_base_dir_len) != '/') goto err;
992
993   return 1;
994
995 err:
996   if (sock != NULL && sock->fd >= 0)
997     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
998
999   return 0;
1000 } /* }}} static int check_file_access */
1001
1002 /* when using a base dir, convert relative paths to absolute paths.
1003  * if necessary, modifies the "filename" pointer to point
1004  * to the new path created in "tmp".  "tmp" is provided
1005  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1006  *
1007  * this allows us to optimize for the expected case (absolute path)
1008  * with a no-op.
1009  */
1010 static void get_abs_path(char **filename, char *tmp)
1011 {
1012   assert(tmp != NULL);
1013   assert(filename != NULL && *filename != NULL);
1014
1015   if (config_base_dir == NULL || **filename == '/')
1016     return;
1017
1018   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1019   *filename = tmp;
1020 } /* }}} static int get_abs_path */
1021
1022 /* returns 1 if we have the required privilege level,
1023  * otherwise issue an error to the user on sock */
1024 static int has_privilege (listen_socket_t *sock, /* {{{ */
1025                           socket_privilege priv)
1026 {
1027   if (sock == NULL) /* journal replay */
1028     return 1;
1029
1030   if (sock->privilege >= priv)
1031     return 1;
1032
1033   return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1034 } /* }}} static int has_privilege */
1035
1036 static int flush_file (const char *filename) /* {{{ */
1037 {
1038   cache_item_t *ci;
1039
1040   pthread_mutex_lock (&cache_lock);
1041
1042   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1043   if (ci == NULL)
1044   {
1045     pthread_mutex_unlock (&cache_lock);
1046     return (ENOENT);
1047   }
1048
1049   if (ci->values_num > 0)
1050   {
1051     /* Enqueue at head */
1052     enqueue_cache_item (ci, HEAD);
1053     pthread_cond_wait(&ci->flushed, &cache_lock);
1054   }
1055
1056   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1057    * may have been purged during our cond_wait() */
1058
1059   pthread_mutex_unlock(&cache_lock);
1060
1061   return (0);
1062 } /* }}} int flush_file */
1063
1064 static int handle_request_help (listen_socket_t *sock, /* {{{ */
1065     char *buffer, size_t buffer_size)
1066 {
1067   int status;
1068   char **help_text;
1069   char *command;
1070
1071   char *help_help[2] =
1072   {
1073     "Command overview\n"
1074     ,
1075     "HELP [<command>]\n"
1076     "FLUSH <filename>\n"
1077     "FLUSHALL\n"
1078     "PENDING <filename>\n"
1079     "FORGET <filename>\n"
1080     "QUEUE\n"
1081     "UPDATE <filename> <values> [<values> ...]\n"
1082     "BATCH\n"
1083     "STATS\n"
1084     "QUIT\n"
1085   };
1086
1087   char *help_flush[2] =
1088   {
1089     "Help for FLUSH\n"
1090     ,
1091     "Usage: FLUSH <filename>\n"
1092     "\n"
1093     "Adds the given filename to the head of the update queue and returns\n"
1094     "after is has been dequeued.\n"
1095   };
1096
1097   char *help_flushall[2] =
1098   {
1099     "Help for FLUSHALL\n"
1100     ,
1101     "Usage: FLUSHALL\n"
1102     "\n"
1103     "Triggers writing of all pending updates.  Returns immediately.\n"
1104   };
1105
1106   char *help_pending[2] =
1107   {
1108     "Help for PENDING\n"
1109     ,
1110     "Usage: PENDING <filename>\n"
1111     "\n"
1112     "Shows any 'pending' updates for a file, in order.\n"
1113     "The updates shown have not yet been written to the underlying RRD file.\n"
1114   };
1115
1116   char *help_forget[2] =
1117   {
1118     "Help for FORGET\n"
1119     ,
1120     "Usage: FORGET <filename>\n"
1121     "\n"
1122     "Removes the file completely from the cache.\n"
1123     "Any pending updates for the file will be lost.\n"
1124   };
1125
1126   char *help_queue[2] =
1127   {
1128     "Help for QUEUE\n"
1129     ,
1130     "Shows all files in the output queue.\n"
1131     "The output is zero or more lines in the following format:\n"
1132     "(where <num_vals> is the number of values to be written)\n"
1133     "\n"
1134     "<num_vals> <filename>\n"
1135     "\n"
1136   };
1137
1138   char *help_update[2] =
1139   {
1140     "Help for UPDATE\n"
1141     ,
1142     "Usage: UPDATE <filename> <values> [<values> ...]\n"
1143     "\n"
1144     "Adds the given file to the internal cache if it is not yet known and\n"
1145     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1146     "for details.\n"
1147     "\n"
1148     "Each <values> has the following form:\n"
1149     "  <values> = <time>:<value>[:<value>[...]]\n"
1150     "See the rrdupdate(1) manpage for details.\n"
1151   };
1152
1153   char *help_stats[2] =
1154   {
1155     "Help for STATS\n"
1156     ,
1157     "Usage: STATS\n"
1158     "\n"
1159     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1160     "a description of the values.\n"
1161   };
1162
1163   char *help_batch[2] =
1164   {
1165     "Help for BATCH\n"
1166     ,
1167     "The 'BATCH' command permits the client to initiate a bulk load\n"
1168     "   of commands to rrdcached.\n"
1169     "\n"
1170     "Usage:\n"
1171     "\n"
1172     "    client: BATCH\n"
1173     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1174     "    client: command #1\n"
1175     "    client: command #2\n"
1176     "    client: ... and so on\n"
1177     "    client: .\n"
1178     "    server: 2 errors\n"
1179     "    server: 7 message for command #7\n"
1180     "    server: 9 message for command #9\n"
1181     "\n"
1182     "For more information, consult the rrdcached(1) documentation.\n"
1183   };
1184
1185   char *help_quit[2] =
1186   {
1187     "Help for QUIT\n"
1188     ,
1189     "Disconnect from rrdcached.\n"
1190   };
1191
1192   status = buffer_get_field (&buffer, &buffer_size, &command);
1193   if (status != 0)
1194     help_text = help_help;
1195   else
1196   {
1197     if (strcasecmp (command, "update") == 0)
1198       help_text = help_update;
1199     else if (strcasecmp (command, "flush") == 0)
1200       help_text = help_flush;
1201     else if (strcasecmp (command, "flushall") == 0)
1202       help_text = help_flushall;
1203     else if (strcasecmp (command, "pending") == 0)
1204       help_text = help_pending;
1205     else if (strcasecmp (command, "forget") == 0)
1206       help_text = help_forget;
1207     else if (strcasecmp (command, "queue") == 0)
1208       help_text = help_queue;
1209     else if (strcasecmp (command, "stats") == 0)
1210       help_text = help_stats;
1211     else if (strcasecmp (command, "batch") == 0)
1212       help_text = help_batch;
1213     else if (strcasecmp (command, "quit") == 0)
1214       help_text = help_quit;
1215     else
1216       help_text = help_help;
1217   }
1218
1219   add_response_info(sock, help_text[1]);
1220   return send_response(sock, RESP_OK, help_text[0]);
1221 } /* }}} int handle_request_help */
1222
1223 static int handle_request_stats (listen_socket_t *sock) /* {{{ */
1224 {
1225   uint64_t copy_queue_length;
1226   uint64_t copy_updates_received;
1227   uint64_t copy_flush_received;
1228   uint64_t copy_updates_written;
1229   uint64_t copy_data_sets_written;
1230   uint64_t copy_journal_bytes;
1231   uint64_t copy_journal_rotate;
1232
1233   uint64_t tree_nodes_number;
1234   uint64_t tree_depth;
1235
1236   pthread_mutex_lock (&stats_lock);
1237   copy_queue_length       = stats_queue_length;
1238   copy_updates_received   = stats_updates_received;
1239   copy_flush_received     = stats_flush_received;
1240   copy_updates_written    = stats_updates_written;
1241   copy_data_sets_written  = stats_data_sets_written;
1242   copy_journal_bytes      = stats_journal_bytes;
1243   copy_journal_rotate     = stats_journal_rotate;
1244   pthread_mutex_unlock (&stats_lock);
1245
1246   pthread_mutex_lock (&cache_lock);
1247   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1248   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1249   pthread_mutex_unlock (&cache_lock);
1250
1251   add_response_info(sock,
1252                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1253   add_response_info(sock,
1254                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1255   add_response_info(sock,
1256                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1257   add_response_info(sock,
1258                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1259   add_response_info(sock,
1260                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1261   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1262   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1263   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1264   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1265
1266   send_response(sock, RESP_OK, "Statistics follow\n");
1267
1268   return (0);
1269 } /* }}} int handle_request_stats */
1270
1271 static int handle_request_flush (listen_socket_t *sock, /* {{{ */
1272     char *buffer, size_t buffer_size)
1273 {
1274   char *file, file_tmp[PATH_MAX];
1275   int status;
1276
1277   status = buffer_get_field (&buffer, &buffer_size, &file);
1278   if (status != 0)
1279   {
1280     return send_response(sock, RESP_ERR, "Usage: flush <filename>\n");
1281   }
1282   else
1283   {
1284     pthread_mutex_lock(&stats_lock);
1285     stats_flush_received++;
1286     pthread_mutex_unlock(&stats_lock);
1287
1288     get_abs_path(&file, file_tmp);
1289     if (!check_file_access(file, sock)) return 0;
1290
1291     status = flush_file (file);
1292     if (status == 0)
1293       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1294     else if (status == ENOENT)
1295     {
1296       /* no file in our tree; see whether it exists at all */
1297       struct stat statbuf;
1298
1299       memset(&statbuf, 0, sizeof(statbuf));
1300       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1301         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1302       else
1303         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1304     }
1305     else if (status < 0)
1306       return send_response(sock, RESP_ERR, "Internal error.\n");
1307     else
1308       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1309   }
1310
1311   /* NOTREACHED */
1312   assert(1==0);
1313 } /* }}} int handle_request_flush */
1314
1315 static int handle_request_flushall(listen_socket_t *sock) /* {{{ */
1316 {
1317   int status;
1318
1319   status = has_privilege(sock, PRIV_HIGH);
1320   if (status <= 0)
1321     return status;
1322
1323   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1324
1325   pthread_mutex_lock(&cache_lock);
1326   flush_old_values(-1);
1327   pthread_mutex_unlock(&cache_lock);
1328
1329   return send_response(sock, RESP_OK, "Started flush.\n");
1330 } /* }}} static int handle_request_flushall */
1331
1332 static int handle_request_pending(listen_socket_t *sock, /* {{{ */
1333                                   char *buffer, size_t buffer_size)
1334 {
1335   int status;
1336   char *file, file_tmp[PATH_MAX];
1337   cache_item_t *ci;
1338
1339   status = buffer_get_field(&buffer, &buffer_size, &file);
1340   if (status != 0)
1341     return send_response(sock, RESP_ERR,
1342                          "Usage: PENDING <filename>\n");
1343
1344   status = has_privilege(sock, PRIV_HIGH);
1345   if (status <= 0)
1346     return status;
1347
1348   get_abs_path(&file, file_tmp);
1349
1350   pthread_mutex_lock(&cache_lock);
1351   ci = g_tree_lookup(cache_tree, file);
1352   if (ci == NULL)
1353   {
1354     pthread_mutex_unlock(&cache_lock);
1355     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1356   }
1357
1358   for (int i=0; i < ci->values_num; i++)
1359     add_response_info(sock, "%s\n", ci->values[i]);
1360
1361   pthread_mutex_unlock(&cache_lock);
1362   return send_response(sock, RESP_OK, "updates pending\n");
1363 } /* }}} static int handle_request_pending */
1364
1365 static int handle_request_forget(listen_socket_t *sock, /* {{{ */
1366                                  char *buffer, size_t buffer_size)
1367 {
1368   int status;
1369   gboolean found;
1370   char *file, file_tmp[PATH_MAX];
1371
1372   status = buffer_get_field(&buffer, &buffer_size, &file);
1373   if (status != 0)
1374     return send_response(sock, RESP_ERR,
1375                          "Usage: FORGET <filename>\n");
1376
1377   status = has_privilege(sock, PRIV_HIGH);
1378   if (status <= 0)
1379     return status;
1380
1381   get_abs_path(&file, file_tmp);
1382   if (!check_file_access(file, sock)) return 0;
1383
1384   pthread_mutex_lock(&cache_lock);
1385   found = g_tree_remove(cache_tree, file);
1386   pthread_mutex_unlock(&cache_lock);
1387
1388   if (found == TRUE)
1389   {
1390     if (sock != NULL)
1391       journal_write("forget", file);
1392
1393     return send_response(sock, RESP_OK, "Gone!\n");
1394   }
1395   else
1396     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1397
1398   /* NOTREACHED */
1399   assert(1==0);
1400 } /* }}} static int handle_request_forget */
1401
1402 static int handle_request_queue (listen_socket_t *sock) /* {{{ */
1403 {
1404   cache_item_t *ci;
1405
1406   pthread_mutex_lock(&cache_lock);
1407
1408   ci = cache_queue_head;
1409   while (ci != NULL)
1410   {
1411     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1412     ci = ci->next;
1413   }
1414
1415   pthread_mutex_unlock(&cache_lock);
1416
1417   return send_response(sock, RESP_OK, "in queue.\n");
1418 } /* }}} int handle_request_queue */
1419
1420 static int handle_request_update (listen_socket_t *sock, /* {{{ */
1421                                   time_t now,
1422                                   char *buffer, size_t buffer_size)
1423 {
1424   char *file, file_tmp[PATH_MAX];
1425   int values_num = 0;
1426   int status;
1427   char orig_buf[CMD_MAX];
1428
1429   cache_item_t *ci;
1430
1431   status = has_privilege(sock, PRIV_HIGH);
1432   if (status <= 0)
1433     return status;
1434
1435   /* save it for the journal later */
1436   strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1437
1438   status = buffer_get_field (&buffer, &buffer_size, &file);
1439   if (status != 0)
1440     return send_response(sock, RESP_ERR,
1441                          "Usage: UPDATE <filename> <values> [<values> ...]\n");
1442
1443   pthread_mutex_lock(&stats_lock);
1444   stats_updates_received++;
1445   pthread_mutex_unlock(&stats_lock);
1446
1447   get_abs_path(&file, file_tmp);
1448   if (!check_file_access(file, sock)) return 0;
1449
1450   pthread_mutex_lock (&cache_lock);
1451   ci = g_tree_lookup (cache_tree, file);
1452
1453   if (ci == NULL) /* {{{ */
1454   {
1455     struct stat statbuf;
1456
1457     /* don't hold the lock while we setup; stat(2) might block */
1458     pthread_mutex_unlock(&cache_lock);
1459
1460     memset (&statbuf, 0, sizeof (statbuf));
1461     status = stat (file, &statbuf);
1462     if (status != 0)
1463     {
1464       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1465
1466       status = errno;
1467       if (status == ENOENT)
1468         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1469       else
1470         return send_response(sock, RESP_ERR,
1471                              "stat failed with error %i.\n", status);
1472     }
1473     if (!S_ISREG (statbuf.st_mode))
1474       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1475
1476     if (access(file, R_OK|W_OK) != 0)
1477       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1478                            file, rrd_strerror(errno));
1479
1480     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1481     if (ci == NULL)
1482     {
1483       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1484
1485       return send_response(sock, RESP_ERR, "malloc failed.\n");
1486     }
1487     memset (ci, 0, sizeof (cache_item_t));
1488
1489     ci->file = strdup (file);
1490     if (ci->file == NULL)
1491     {
1492       free (ci);
1493       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1494
1495       return send_response(sock, RESP_ERR, "strdup failed.\n");
1496     }
1497
1498     wipe_ci_values(ci, now);
1499     ci->flags = CI_FLAGS_IN_TREE;
1500     pthread_cond_init(&ci->flushed, NULL);
1501
1502     pthread_mutex_lock(&cache_lock);
1503     g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1504   } /* }}} */
1505   assert (ci != NULL);
1506
1507   /* don't re-write updates in replay mode */
1508   if (sock != NULL)
1509     journal_write("update", orig_buf);
1510
1511   while (buffer_size > 0)
1512   {
1513     char **temp;
1514     char *value;
1515     time_t stamp;
1516     char *eostamp;
1517
1518     status = buffer_get_field (&buffer, &buffer_size, &value);
1519     if (status != 0)
1520     {
1521       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1522       break;
1523     }
1524
1525     /* make sure update time is always moving forward */
1526     stamp = strtol(value, &eostamp, 10);
1527     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1528     {
1529       pthread_mutex_unlock(&cache_lock);
1530       return send_response(sock, RESP_ERR,
1531                            "Cannot find timestamp in '%s'!\n", value);
1532     }
1533     else if (stamp <= ci->last_update_stamp)
1534     {
1535       pthread_mutex_unlock(&cache_lock);
1536       return send_response(sock, RESP_ERR,
1537                            "illegal attempt to update using time %ld when last"
1538                            " update time is %ld (minimum one second step)\n",
1539                            stamp, ci->last_update_stamp);
1540     }
1541     else
1542       ci->last_update_stamp = stamp;
1543
1544     temp = (char **) rrd_realloc (ci->values,
1545         sizeof (char *) * (ci->values_num + 1));
1546     if (temp == NULL)
1547     {
1548       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1549       continue;
1550     }
1551     ci->values = temp;
1552
1553     ci->values[ci->values_num] = strdup (value);
1554     if (ci->values[ci->values_num] == NULL)
1555     {
1556       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1557       continue;
1558     }
1559     ci->values_num++;
1560
1561     values_num++;
1562   }
1563
1564   if (((now - ci->last_flush_time) >= config_write_interval)
1565       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1566       && (ci->values_num > 0))
1567   {
1568     enqueue_cache_item (ci, TAIL);
1569   }
1570
1571   pthread_mutex_unlock (&cache_lock);
1572
1573   if (values_num < 1)
1574     return send_response(sock, RESP_ERR, "No values updated.\n");
1575   else
1576     return send_response(sock, RESP_OK,
1577                          "errors, enqueued %i value(s).\n", values_num);
1578
1579   /* NOTREACHED */
1580   assert(1==0);
1581
1582 } /* }}} int handle_request_update */
1583
1584 /* we came across a "WROTE" entry during journal replay.
1585  * throw away any values that we have accumulated for this file
1586  */
1587 static int handle_request_wrote (const char *buffer, time_t now) /* {{{ */
1588 {
1589   int i;
1590   cache_item_t *ci;
1591   const char *file = buffer;
1592
1593   pthread_mutex_lock(&cache_lock);
1594
1595   ci = g_tree_lookup(cache_tree, file);
1596   if (ci == NULL)
1597   {
1598     pthread_mutex_unlock(&cache_lock);
1599     return (0);
1600   }
1601
1602   if (ci->values)
1603   {
1604     for (i=0; i < ci->values_num; i++)
1605       free(ci->values[i]);
1606
1607     free(ci->values);
1608   }
1609
1610   wipe_ci_values(ci, now);
1611   remove_from_queue(ci);
1612
1613   pthread_mutex_unlock(&cache_lock);
1614   return (0);
1615 } /* }}} int handle_request_wrote */
1616
1617 /* start "BATCH" processing */
1618 static int batch_start (listen_socket_t *sock) /* {{{ */
1619 {
1620   int status;
1621   if (sock->batch_start)
1622     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1623
1624   status = send_response(sock, RESP_OK,
1625                          "Go ahead.  End with dot '.' on its own line.\n");
1626   sock->batch_start = time(NULL);
1627   sock->batch_cmd = 0;
1628
1629   return status;
1630 } /* }}} static int batch_start */
1631
1632 /* finish "BATCH" processing and return results to the client */
1633 static int batch_done (listen_socket_t *sock) /* {{{ */
1634 {
1635   assert(sock->batch_start);
1636   sock->batch_start = 0;
1637   sock->batch_cmd  = 0;
1638   return send_response(sock, RESP_OK, "errors\n");
1639 } /* }}} static int batch_done */
1640
1641 /* if sock==NULL, we are in journal replay mode */
1642 static int handle_request (listen_socket_t *sock, /* {{{ */
1643                            time_t now,
1644                            char *buffer, size_t buffer_size)
1645 {
1646   char *buffer_ptr;
1647   char *command;
1648   int status;
1649
1650   assert (buffer[buffer_size - 1] == '\0');
1651
1652   buffer_ptr = buffer;
1653   command = NULL;
1654   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1655   if (status != 0)
1656   {
1657     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1658     return (-1);
1659   }
1660
1661   if (sock != NULL && sock->batch_start)
1662     sock->batch_cmd++;
1663
1664   if (strcasecmp (command, "update") == 0)
1665     return (handle_request_update (sock, now, buffer_ptr, buffer_size));
1666   else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
1667   {
1668     /* this is only valid in replay mode */
1669     return (handle_request_wrote (buffer_ptr, now));
1670   }
1671   else if (strcasecmp (command, "flush") == 0)
1672     return (handle_request_flush (sock, buffer_ptr, buffer_size));
1673   else if (strcasecmp (command, "flushall") == 0)
1674     return (handle_request_flushall(sock));
1675   else if (strcasecmp (command, "pending") == 0)
1676     return (handle_request_pending(sock, buffer_ptr, buffer_size));
1677   else if (strcasecmp (command, "forget") == 0)
1678     return (handle_request_forget(sock, buffer_ptr, buffer_size));
1679   else if (strcasecmp (command, "queue") == 0)
1680     return (handle_request_queue(sock));
1681   else if (strcasecmp (command, "stats") == 0)
1682     return (handle_request_stats (sock));
1683   else if (strcasecmp (command, "help") == 0)
1684     return (handle_request_help (sock, buffer_ptr, buffer_size));
1685   else if (strcasecmp (command, "batch") == 0 && sock != NULL)
1686     return batch_start(sock);
1687   else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_start)
1688     return batch_done(sock);
1689   else if (strcasecmp (command, "quit") == 0)
1690     return -1;
1691   else
1692     return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
1693
1694   /* NOTREACHED */
1695   assert(1==0);
1696 } /* }}} int handle_request */
1697
1698 /* MUST NOT hold journal_lock before calling this */
1699 static void journal_rotate(void) /* {{{ */
1700 {
1701   FILE *old_fh = NULL;
1702   int new_fd;
1703
1704   if (journal_cur == NULL || journal_old == NULL)
1705     return;
1706
1707   pthread_mutex_lock(&journal_lock);
1708
1709   /* we rotate this way (rename before close) so that the we can release
1710    * the journal lock as fast as possible.  Journal writes to the new
1711    * journal can proceed immediately after the new file is opened.  The
1712    * fclose can then block without affecting new updates.
1713    */
1714   if (journal_fh != NULL)
1715   {
1716     old_fh = journal_fh;
1717     journal_fh = NULL;
1718     rename(journal_cur, journal_old);
1719     ++stats_journal_rotate;
1720   }
1721
1722   new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1723                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1724   if (new_fd >= 0)
1725   {
1726     journal_fh = fdopen(new_fd, "a");
1727     if (journal_fh == NULL)
1728       close(new_fd);
1729   }
1730
1731   pthread_mutex_unlock(&journal_lock);
1732
1733   if (old_fh != NULL)
1734     fclose(old_fh);
1735
1736   if (journal_fh == NULL)
1737   {
1738     RRDD_LOG(LOG_CRIT,
1739              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1740              journal_cur, rrd_strerror(errno));
1741
1742     RRDD_LOG(LOG_ERR,
1743              "JOURNALING DISABLED: All values will be flushed at shutdown");
1744     config_flush_at_shutdown = 1;
1745   }
1746
1747 } /* }}} static void journal_rotate */
1748
1749 static void journal_done(void) /* {{{ */
1750 {
1751   if (journal_cur == NULL)
1752     return;
1753
1754   pthread_mutex_lock(&journal_lock);
1755   if (journal_fh != NULL)
1756   {
1757     fclose(journal_fh);
1758     journal_fh = NULL;
1759   }
1760
1761   if (config_flush_at_shutdown)
1762   {
1763     RRDD_LOG(LOG_INFO, "removing journals");
1764     unlink(journal_old);
1765     unlink(journal_cur);
1766   }
1767   else
1768   {
1769     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1770              "journals will be used at next startup");
1771   }
1772
1773   pthread_mutex_unlock(&journal_lock);
1774
1775 } /* }}} static void journal_done */
1776
1777 static int journal_write(char *cmd, char *args) /* {{{ */
1778 {
1779   int chars;
1780
1781   if (journal_fh == NULL)
1782     return 0;
1783
1784   pthread_mutex_lock(&journal_lock);
1785   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1786   pthread_mutex_unlock(&journal_lock);
1787
1788   if (chars > 0)
1789   {
1790     pthread_mutex_lock(&stats_lock);
1791     stats_journal_bytes += chars;
1792     pthread_mutex_unlock(&stats_lock);
1793   }
1794
1795   return chars;
1796 } /* }}} static int journal_write */
1797
1798 static int journal_replay (const char *file) /* {{{ */
1799 {
1800   FILE *fh;
1801   int entry_cnt = 0;
1802   int fail_cnt = 0;
1803   uint64_t line = 0;
1804   char entry[CMD_MAX];
1805   time_t now;
1806
1807   if (file == NULL) return 0;
1808
1809   {
1810     char *reason = "unknown error";
1811     int status = 0;
1812     struct stat statbuf;
1813
1814     memset(&statbuf, 0, sizeof(statbuf));
1815     if (stat(file, &statbuf) != 0)
1816     {
1817       if (errno == ENOENT)
1818         return 0;
1819
1820       reason = "stat error";
1821       status = errno;
1822     }
1823     else if (!S_ISREG(statbuf.st_mode))
1824     {
1825       reason = "not a regular file";
1826       status = EPERM;
1827     }
1828     if (statbuf.st_uid != daemon_uid)
1829     {
1830       reason = "not owned by daemon user";
1831       status = EACCES;
1832     }
1833     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1834     {
1835       reason = "must not be user/group writable";
1836       status = EACCES;
1837     }
1838
1839     if (status != 0)
1840     {
1841       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1842                file, rrd_strerror(status), reason);
1843       return 0;
1844     }
1845   }
1846
1847   fh = fopen(file, "r");
1848   if (fh == NULL)
1849   {
1850     if (errno != ENOENT)
1851       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1852                file, rrd_strerror(errno));
1853     return 0;
1854   }
1855   else
1856     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1857
1858   now = time(NULL);
1859
1860   while(!feof(fh))
1861   {
1862     size_t entry_len;
1863
1864     ++line;
1865     if (fgets(entry, sizeof(entry), fh) == NULL)
1866       break;
1867     entry_len = strlen(entry);
1868
1869     /* check \n termination in case journal writing crashed mid-line */
1870     if (entry_len == 0)
1871       continue;
1872     else if (entry[entry_len - 1] != '\n')
1873     {
1874       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1875       ++fail_cnt;
1876       continue;
1877     }
1878
1879     entry[entry_len - 1] = '\0';
1880
1881     if (handle_request(NULL, now, entry, entry_len) == 0)
1882       ++entry_cnt;
1883     else
1884       ++fail_cnt;
1885   }
1886
1887   fclose(fh);
1888
1889   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1890            entry_cnt, fail_cnt);
1891
1892   return entry_cnt > 0 ? 1 : 0;
1893 } /* }}} static int journal_replay */
1894
1895 static void journal_init(void) /* {{{ */
1896 {
1897   int had_journal = 0;
1898
1899   if (journal_cur == NULL) return;
1900
1901   pthread_mutex_lock(&journal_lock);
1902
1903   RRDD_LOG(LOG_INFO, "checking for journal files");
1904
1905   had_journal += journal_replay(journal_old);
1906   had_journal += journal_replay(journal_cur);
1907
1908   /* it must have been a crash.  start a flush */
1909   if (had_journal && config_flush_at_shutdown)
1910     flush_old_values(-1);
1911
1912   pthread_mutex_unlock(&journal_lock);
1913   journal_rotate();
1914
1915   RRDD_LOG(LOG_INFO, "journal processing complete");
1916
1917 } /* }}} static void journal_init */
1918
1919 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
1920 {
1921   assert(sock != NULL);
1922
1923   free(sock->rbuf);  sock->rbuf = NULL;
1924   free(sock->wbuf);  sock->wbuf = NULL;
1925   free(sock);
1926 } /* }}} void free_listen_socket */
1927
1928 static void close_connection(listen_socket_t *sock) /* {{{ */
1929 {
1930   if (sock->fd >= 0)
1931   {
1932     close(sock->fd);
1933     sock->fd = -1;
1934   }
1935
1936   free_listen_socket(sock);
1937
1938 } /* }}} void close_connection */
1939
1940 static void *connection_thread_main (void *args) /* {{{ */
1941 {
1942   listen_socket_t *sock;
1943   int i;
1944   int fd;
1945
1946   sock = (listen_socket_t *) args;
1947   fd = sock->fd;
1948
1949   /* init read buffers */
1950   sock->next_read = sock->next_cmd = 0;
1951   sock->rbuf = malloc(RBUF_SIZE);
1952   if (sock->rbuf == NULL)
1953   {
1954     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1955     close_connection(sock);
1956     return NULL;
1957   }
1958
1959   pthread_mutex_lock (&connection_threads_lock);
1960   {
1961     pthread_t *temp;
1962
1963     temp = (pthread_t *) rrd_realloc (connection_threads,
1964         sizeof (pthread_t) * (connection_threads_num + 1));
1965     if (temp == NULL)
1966     {
1967       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc(++) failed.");
1968     }
1969     else
1970     {
1971       connection_threads = temp;
1972       connection_threads[connection_threads_num] = pthread_self ();
1973       connection_threads_num++;
1974     }
1975   }
1976   pthread_mutex_unlock (&connection_threads_lock);
1977
1978   while (do_shutdown == 0)
1979   {
1980     char *cmd;
1981     ssize_t cmd_len;
1982     ssize_t rbytes;
1983     time_t now;
1984
1985     struct pollfd pollfd;
1986     int status;
1987
1988     pollfd.fd = fd;
1989     pollfd.events = POLLIN | POLLPRI;
1990     pollfd.revents = 0;
1991
1992     status = poll (&pollfd, 1, /* timeout = */ 500);
1993     if (do_shutdown)
1994       break;
1995     else if (status == 0) /* timeout */
1996       continue;
1997     else if (status < 0) /* error */
1998     {
1999       status = errno;
2000       if (status != EINTR)
2001         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2002       continue;
2003     }
2004
2005     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2006       break;
2007     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2008     {
2009       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2010           "poll(2) returned something unexpected: %#04hx",
2011           pollfd.revents);
2012       break;
2013     }
2014
2015     rbytes = read(fd, sock->rbuf + sock->next_read,
2016                   RBUF_SIZE - sock->next_read);
2017     if (rbytes < 0)
2018     {
2019       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2020       break;
2021     }
2022     else if (rbytes == 0)
2023       break; /* eof */
2024
2025     sock->next_read += rbytes;
2026
2027     if (sock->batch_start)
2028       now = sock->batch_start;
2029     else
2030       now = time(NULL);
2031
2032     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2033     {
2034       status = handle_request (sock, now, cmd, cmd_len+1);
2035       if (status != 0)
2036         goto out_close;
2037     }
2038   }
2039
2040 out_close:
2041   close_connection(sock);
2042
2043   /* Remove this thread from the connection threads list */
2044   pthread_mutex_lock (&connection_threads_lock);
2045   {
2046     pthread_t self;
2047     pthread_t *temp;
2048
2049     /* Find out own index in the array */
2050     self = pthread_self ();
2051     for (i = 0; i < connection_threads_num; i++)
2052       if (pthread_equal (connection_threads[i], self) != 0)
2053         break;
2054     assert (i < connection_threads_num);
2055
2056     /* Move the trailing threads forward. */
2057     if (i < (connection_threads_num - 1))
2058     {
2059       memmove (connection_threads + i,
2060                connection_threads + i + 1,
2061                sizeof (pthread_t) * (connection_threads_num - i - 1));
2062     }
2063
2064     connection_threads_num--;
2065
2066     temp = rrd_realloc(connection_threads,
2067                    sizeof(*connection_threads) * connection_threads_num);
2068     if (connection_threads_num > 0 && temp == NULL)
2069       RRDD_LOG(LOG_ERR, "connection_thread_main: realloc(--) failed.");
2070     else
2071       connection_threads = temp;
2072   }
2073   pthread_mutex_unlock (&connection_threads_lock);
2074
2075   return (NULL);
2076 } /* }}} void *connection_thread_main */
2077
2078 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2079 {
2080   int fd;
2081   struct sockaddr_un sa;
2082   listen_socket_t *temp;
2083   int status;
2084   const char *path;
2085
2086   path = sock->addr;
2087   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2088     path += strlen("unix:");
2089
2090   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2091       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2092   if (temp == NULL)
2093   {
2094     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2095     return (-1);
2096   }
2097   listen_fds = temp;
2098   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2099
2100   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2101   if (fd < 0)
2102   {
2103     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2104              rrd_strerror(errno));
2105     return (-1);
2106   }
2107
2108   memset (&sa, 0, sizeof (sa));
2109   sa.sun_family = AF_UNIX;
2110   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2111
2112   /* if we've gotten this far, we own the pid file.  any daemon started
2113    * with the same args must not be alive.  therefore, ensure that we can
2114    * create the socket...
2115    */
2116   unlink(path);
2117
2118   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2119   if (status != 0)
2120   {
2121     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2122              path, rrd_strerror(errno));
2123     close (fd);
2124     return (-1);
2125   }
2126
2127   status = listen (fd, /* backlog = */ 10);
2128   if (status != 0)
2129   {
2130     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2131              path, rrd_strerror(errno));
2132     close (fd);
2133     unlink (path);
2134     return (-1);
2135   }
2136
2137   listen_fds[listen_fds_num].fd = fd;
2138   listen_fds[listen_fds_num].family = PF_UNIX;
2139   strncpy(listen_fds[listen_fds_num].addr, path,
2140           sizeof (listen_fds[listen_fds_num].addr) - 1);
2141   listen_fds_num++;
2142
2143   return (0);
2144 } /* }}} int open_listen_socket_unix */
2145
2146 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2147 {
2148   struct addrinfo ai_hints;
2149   struct addrinfo *ai_res;
2150   struct addrinfo *ai_ptr;
2151   char addr_copy[NI_MAXHOST];
2152   char *addr;
2153   char *port;
2154   int status;
2155
2156   strncpy (addr_copy, sock->addr, sizeof (addr_copy));
2157   addr_copy[sizeof (addr_copy) - 1] = 0;
2158   addr = addr_copy;
2159
2160   memset (&ai_hints, 0, sizeof (ai_hints));
2161   ai_hints.ai_flags = 0;
2162 #ifdef AI_ADDRCONFIG
2163   ai_hints.ai_flags |= AI_ADDRCONFIG;
2164 #endif
2165   ai_hints.ai_family = AF_UNSPEC;
2166   ai_hints.ai_socktype = SOCK_STREAM;
2167
2168   port = NULL;
2169   if (*addr == '[') /* IPv6+port format */
2170   {
2171     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2172     addr++;
2173
2174     port = strchr (addr, ']');
2175     if (port == NULL)
2176     {
2177       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2178       return (-1);
2179     }
2180     *port = 0;
2181     port++;
2182
2183     if (*port == ':')
2184       port++;
2185     else if (*port == 0)
2186       port = NULL;
2187     else
2188     {
2189       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2190       return (-1);
2191     }
2192   } /* if (*addr = ']') */
2193   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2194   {
2195     port = rindex(addr, ':');
2196     if (port != NULL)
2197     {
2198       *port = 0;
2199       port++;
2200     }
2201   }
2202   ai_res = NULL;
2203   status = getaddrinfo (addr,
2204                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2205                         &ai_hints, &ai_res);
2206   if (status != 0)
2207   {
2208     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2209              addr, gai_strerror (status));
2210     return (-1);
2211   }
2212
2213   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2214   {
2215     int fd;
2216     listen_socket_t *temp;
2217     int one = 1;
2218
2219     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2220         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2221     if (temp == NULL)
2222     {
2223       fprintf (stderr,
2224                "rrdcached: open_listen_socket_network: realloc failed.\n");
2225       continue;
2226     }
2227     listen_fds = temp;
2228     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2229
2230     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2231     if (fd < 0)
2232     {
2233       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2234                rrd_strerror(errno));
2235       continue;
2236     }
2237
2238     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2239
2240     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2241     if (status != 0)
2242     {
2243       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2244                sock->addr, rrd_strerror(errno));
2245       close (fd);
2246       continue;
2247     }
2248
2249     status = listen (fd, /* backlog = */ 10);
2250     if (status != 0)
2251     {
2252       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2253                sock->addr, rrd_strerror(errno));
2254       close (fd);
2255       freeaddrinfo(ai_res);
2256       return (-1);
2257     }
2258
2259     listen_fds[listen_fds_num].fd = fd;
2260     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2261     listen_fds_num++;
2262   } /* for (ai_ptr) */
2263
2264   freeaddrinfo(ai_res);
2265   return (0);
2266 } /* }}} static int open_listen_socket_network */
2267
2268 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2269 {
2270   assert(sock != NULL);
2271   assert(sock->addr != NULL);
2272
2273   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2274       || sock->addr[0] == '/')
2275     return (open_listen_socket_unix(sock));
2276   else
2277     return (open_listen_socket_network(sock));
2278 } /* }}} int open_listen_socket */
2279
2280 static int close_listen_sockets (void) /* {{{ */
2281 {
2282   size_t i;
2283
2284   for (i = 0; i < listen_fds_num; i++)
2285   {
2286     close (listen_fds[i].fd);
2287
2288     if (listen_fds[i].family == PF_UNIX)
2289       unlink(listen_fds[i].addr);
2290   }
2291
2292   free (listen_fds);
2293   listen_fds = NULL;
2294   listen_fds_num = 0;
2295
2296   return (0);
2297 } /* }}} int close_listen_sockets */
2298
2299 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2300 {
2301   struct pollfd *pollfds;
2302   int pollfds_num;
2303   int status;
2304   int i;
2305
2306   if (listen_fds_num < 1)
2307   {
2308     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2309     return (NULL);
2310   }
2311
2312   pollfds_num = listen_fds_num;
2313   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2314   if (pollfds == NULL)
2315   {
2316     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2317     return (NULL);
2318   }
2319   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2320
2321   RRDD_LOG(LOG_INFO, "listening for connections");
2322
2323   while (do_shutdown == 0)
2324   {
2325     for (i = 0; i < pollfds_num; i++)
2326     {
2327       pollfds[i].fd = listen_fds[i].fd;
2328       pollfds[i].events = POLLIN | POLLPRI;
2329       pollfds[i].revents = 0;
2330     }
2331
2332     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2333     if (do_shutdown)
2334       break;
2335     else if (status == 0) /* timeout */
2336       continue;
2337     else if (status < 0) /* error */
2338     {
2339       status = errno;
2340       if (status != EINTR)
2341       {
2342         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2343       }
2344       continue;
2345     }
2346
2347     for (i = 0; i < pollfds_num; i++)
2348     {
2349       listen_socket_t *client_sock;
2350       struct sockaddr_storage client_sa;
2351       socklen_t client_sa_size;
2352       pthread_t tid;
2353       pthread_attr_t attr;
2354
2355       if (pollfds[i].revents == 0)
2356         continue;
2357
2358       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2359       {
2360         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2361             "poll(2) returned something unexpected for listen FD #%i.",
2362             pollfds[i].fd);
2363         continue;
2364       }
2365
2366       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2367       if (client_sock == NULL)
2368       {
2369         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2370         continue;
2371       }
2372       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2373
2374       client_sa_size = sizeof (client_sa);
2375       client_sock->fd = accept (pollfds[i].fd,
2376           (struct sockaddr *) &client_sa, &client_sa_size);
2377       if (client_sock->fd < 0)
2378       {
2379         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2380         free(client_sock);
2381         continue;
2382       }
2383
2384       pthread_attr_init (&attr);
2385       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2386
2387       status = pthread_create (&tid, &attr, connection_thread_main,
2388                                client_sock);
2389       if (status != 0)
2390       {
2391         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2392         close_connection(client_sock);
2393         continue;
2394       }
2395     } /* for (pollfds_num) */
2396   } /* while (do_shutdown == 0) */
2397
2398   RRDD_LOG(LOG_INFO, "starting shutdown");
2399
2400   close_listen_sockets ();
2401
2402   pthread_mutex_lock (&connection_threads_lock);
2403   while (connection_threads_num > 0)
2404   {
2405     pthread_t wait_for;
2406
2407     wait_for = connection_threads[0];
2408
2409     pthread_mutex_unlock (&connection_threads_lock);
2410     pthread_join (wait_for, /* retval = */ NULL);
2411     pthread_mutex_lock (&connection_threads_lock);
2412   }
2413   pthread_mutex_unlock (&connection_threads_lock);
2414
2415   free(pollfds);
2416
2417   return (NULL);
2418 } /* }}} void *listen_thread_main */
2419
2420 static int daemonize (void) /* {{{ */
2421 {
2422   int pid_fd;
2423   char *base_dir;
2424
2425   daemon_uid = geteuid();
2426
2427   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2428   if (pid_fd < 0)
2429     pid_fd = check_pidfile();
2430   if (pid_fd < 0)
2431     return pid_fd;
2432
2433   /* open all the listen sockets */
2434   if (config_listen_address_list_len > 0)
2435   {
2436     for (int i = 0; i < config_listen_address_list_len; i++)
2437     {
2438       open_listen_socket (config_listen_address_list[i]);
2439       free_listen_socket (config_listen_address_list[i]);
2440     }
2441
2442     free(config_listen_address_list);
2443   }
2444   else
2445   {
2446     listen_socket_t sock;
2447     memset(&sock, 0, sizeof(sock));
2448     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
2449     open_listen_socket (&sock);
2450   }
2451
2452   if (listen_fds_num < 1)
2453   {
2454     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2455     goto error;
2456   }
2457
2458   if (!stay_foreground)
2459   {
2460     pid_t child;
2461
2462     child = fork ();
2463     if (child < 0)
2464     {
2465       fprintf (stderr, "daemonize: fork(2) failed.\n");
2466       goto error;
2467     }
2468     else if (child > 0)
2469       exit(0);
2470
2471     /* Become session leader */
2472     setsid ();
2473
2474     /* Open the first three file descriptors to /dev/null */
2475     close (2);
2476     close (1);
2477     close (0);
2478
2479     open ("/dev/null", O_RDWR);
2480     dup (0);
2481     dup (0);
2482   } /* if (!stay_foreground) */
2483
2484   /* Change into the /tmp directory. */
2485   base_dir = (config_base_dir != NULL)
2486     ? config_base_dir
2487     : "/tmp";
2488
2489   if (chdir (base_dir) != 0)
2490   {
2491     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2492     goto error;
2493   }
2494
2495   install_signal_handlers();
2496
2497   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2498   RRDD_LOG(LOG_INFO, "starting up");
2499
2500   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2501                                 (GDestroyNotify) free_cache_item);
2502   if (cache_tree == NULL)
2503   {
2504     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2505     goto error;
2506   }
2507
2508   return write_pidfile (pid_fd);
2509
2510 error:
2511   remove_pidfile();
2512   return -1;
2513 } /* }}} int daemonize */
2514
2515 static int cleanup (void) /* {{{ */
2516 {
2517   do_shutdown++;
2518
2519   pthread_cond_signal (&cache_cond);
2520   pthread_join (queue_thread, /* return = */ NULL);
2521
2522   remove_pidfile ();
2523
2524   free(config_base_dir);
2525   free(config_pid_file);
2526   free(journal_cur);
2527   free(journal_old);
2528
2529   pthread_mutex_lock(&cache_lock);
2530   g_tree_destroy(cache_tree);
2531
2532   RRDD_LOG(LOG_INFO, "goodbye");
2533   closelog ();
2534
2535   return (0);
2536 } /* }}} int cleanup */
2537
2538 static int read_options (int argc, char **argv) /* {{{ */
2539 {
2540   int option;
2541   int status = 0;
2542
2543   while ((option = getopt(argc, argv, "gl:L:f:w:b:Bz:p:j:h?F")) != -1)
2544   {
2545     switch (option)
2546     {
2547       case 'g':
2548         stay_foreground=1;
2549         break;
2550
2551       case 'L':
2552       case 'l':
2553       {
2554         listen_socket_t **temp;
2555         listen_socket_t *new;
2556
2557         new = malloc(sizeof(listen_socket_t));
2558         if (new == NULL)
2559         {
2560           fprintf(stderr, "read_options: malloc failed.\n");
2561           return(2);
2562         }
2563         memset(new, 0, sizeof(listen_socket_t));
2564
2565         temp = (listen_socket_t **) rrd_realloc (config_listen_address_list,
2566             sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2567         if (temp == NULL)
2568         {
2569           fprintf (stderr, "read_options: realloc failed.\n");
2570           return (2);
2571         }
2572         config_listen_address_list = temp;
2573
2574         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2575         new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2576
2577         temp[config_listen_address_list_len] = new;
2578         config_listen_address_list_len++;
2579       }
2580       break;
2581
2582       case 'f':
2583       {
2584         int temp;
2585
2586         temp = atoi (optarg);
2587         if (temp > 0)
2588           config_flush_interval = temp;
2589         else
2590         {
2591           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2592           status = 3;
2593         }
2594       }
2595       break;
2596
2597       case 'w':
2598       {
2599         int temp;
2600
2601         temp = atoi (optarg);
2602         if (temp > 0)
2603           config_write_interval = temp;
2604         else
2605         {
2606           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2607           status = 2;
2608         }
2609       }
2610       break;
2611
2612       case 'z':
2613       {
2614         int temp;
2615
2616         temp = atoi(optarg);
2617         if (temp > 0)
2618           config_write_jitter = temp;
2619         else
2620         {
2621           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2622           status = 2;
2623         }
2624
2625         break;
2626       }
2627
2628       case 'B':
2629         config_write_base_only = 1;
2630         break;
2631
2632       case 'b':
2633       {
2634         size_t len;
2635         char base_realpath[PATH_MAX];
2636
2637         if (config_base_dir != NULL)
2638           free (config_base_dir);
2639         config_base_dir = strdup (optarg);
2640         if (config_base_dir == NULL)
2641         {
2642           fprintf (stderr, "read_options: strdup failed.\n");
2643           return (3);
2644         }
2645
2646         /* make sure that the base directory is not resolved via
2647          * symbolic links.  this makes some performance-enhancing
2648          * assumptions possible (we don't have to resolve paths
2649          * that start with a "/")
2650          */
2651         if (realpath(config_base_dir, base_realpath) == NULL)
2652         {
2653           fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2654           return 5;
2655         }
2656         else if (strncmp(config_base_dir,
2657                          base_realpath, sizeof(base_realpath)) != 0)
2658         {
2659           fprintf(stderr,
2660                   "Base directory (-b) resolved via file system links!\n"
2661                   "Please consult rrdcached '-b' documentation!\n"
2662                   "Consider specifying the real directory (%s)\n",
2663                   base_realpath);
2664           return 5;
2665         }
2666
2667         len = strlen (config_base_dir);
2668         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2669         {
2670           config_base_dir[len - 1] = 0;
2671           len--;
2672         }
2673
2674         if (len < 1)
2675         {
2676           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2677           return (4);
2678         }
2679
2680         _config_base_dir_len = len;
2681       }
2682       break;
2683
2684       case 'p':
2685       {
2686         if (config_pid_file != NULL)
2687           free (config_pid_file);
2688         config_pid_file = strdup (optarg);
2689         if (config_pid_file == NULL)
2690         {
2691           fprintf (stderr, "read_options: strdup failed.\n");
2692           return (3);
2693         }
2694       }
2695       break;
2696
2697       case 'F':
2698         config_flush_at_shutdown = 1;
2699         break;
2700
2701       case 'j':
2702       {
2703         struct stat statbuf;
2704         const char *dir = optarg;
2705
2706         status = stat(dir, &statbuf);
2707         if (status != 0)
2708         {
2709           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2710           return 6;
2711         }
2712
2713         if (!S_ISDIR(statbuf.st_mode)
2714             || access(dir, R_OK|W_OK|X_OK) != 0)
2715         {
2716           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2717                   errno ? rrd_strerror(errno) : "");
2718           return 6;
2719         }
2720
2721         journal_cur = malloc(PATH_MAX + 1);
2722         journal_old = malloc(PATH_MAX + 1);
2723         if (journal_cur == NULL || journal_old == NULL)
2724         {
2725           fprintf(stderr, "malloc failure for journal files\n");
2726           return 6;
2727         }
2728         else 
2729         {
2730           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2731           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2732         }
2733       }
2734       break;
2735
2736       case 'h':
2737       case '?':
2738         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2739             "\n"
2740             "Usage: rrdcached [options]\n"
2741             "\n"
2742             "Valid options are:\n"
2743             "  -l <address>  Socket address to listen to.\n"
2744             "  -L <address>  Socket address to listen to ('FLUSH' only).\n"
2745             "  -w <seconds>  Interval in which to write data.\n"
2746             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2747             "  -f <seconds>  Interval in which to flush dead data.\n"
2748             "  -p <file>     Location of the PID-file.\n"
2749             "  -b <dir>      Base directory to change to.\n"
2750             "  -B            Restrict file access to paths within -b <dir>\n"
2751             "  -g            Do not fork and run in the foreground.\n"
2752             "  -j <dir>      Directory in which to create the journal files.\n"
2753             "  -F            Always flush all updates at shutdown\n"
2754             "\n"
2755             "For more information and a detailed description of all options "
2756             "please refer\n"
2757             "to the rrdcached(1) manual page.\n",
2758             VERSION);
2759         status = -1;
2760         break;
2761     } /* switch (option) */
2762   } /* while (getopt) */
2763
2764   /* advise the user when values are not sane */
2765   if (config_flush_interval < 2 * config_write_interval)
2766     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2767             " 2x write interval (-w) !\n");
2768   if (config_write_jitter > config_write_interval)
2769     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2770             " write interval (-w) !\n");
2771
2772   if (config_write_base_only && config_base_dir == NULL)
2773     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2774             "  Consult the rrdcached documentation\n");
2775
2776   if (journal_cur == NULL)
2777     config_flush_at_shutdown = 1;
2778
2779   return (status);
2780 } /* }}} int read_options */
2781
2782 int main (int argc, char **argv)
2783 {
2784   int status;
2785
2786   status = read_options (argc, argv);
2787   if (status != 0)
2788   {
2789     if (status < 0)
2790       status = 0;
2791     return (status);
2792   }
2793
2794   status = daemonize ();
2795   if (status != 0)
2796   {
2797     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
2798     return (1);
2799   }
2800
2801   journal_init();
2802
2803   /* start the queue thread */
2804   memset (&queue_thread, 0, sizeof (queue_thread));
2805   status = pthread_create (&queue_thread,
2806                            NULL, /* attr */
2807                            queue_thread_main,
2808                            NULL); /* args */
2809   if (status != 0)
2810   {
2811     RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2812     cleanup();
2813     return (1);
2814   }
2815
2816   listen_thread_main (NULL);
2817   cleanup ();
2818
2819   return (0);
2820 } /* int main */
2821
2822 /*
2823  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2824  */