Much simpler handling of timestamp errors. Return an error to the user
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
73
74 #include "rrd.h"
75 #include "rrd_client.h"
76
77 #include <stdlib.h>
78 #include <stdint.h>
79 #include <stdio.h>
80 #include <unistd.h>
81 #include <string.h>
82 #include <strings.h>
83 #include <stdint.h>
84 #include <inttypes.h>
85
86 #include <sys/types.h>
87 #include <sys/stat.h>
88 #include <fcntl.h>
89 #include <signal.h>
90 #include <sys/socket.h>
91 #include <sys/un.h>
92 #include <netdb.h>
93 #include <poll.h>
94 #include <syslog.h>
95 #include <pthread.h>
96 #include <errno.h>
97 #include <assert.h>
98 #include <sys/time.h>
99 #include <time.h>
100
101 #include <glib-2.0/glib.h>
102 /* }}} */
103
104 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
105
106 #ifndef __GNUC__
107 # define __attribute__(x) /**/
108 #endif
109
110 /*
111  * Types
112  */
113 typedef enum
114 {
115   PRIV_LOW,
116   PRIV_HIGH
117 } socket_privilege;
118
119 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
120
121 struct listen_socket_s
122 {
123   int fd;
124   char addr[PATH_MAX + 1];
125   int family;
126   socket_privilege privilege;
127
128   /* state for BATCH processing */
129   time_t batch_start;
130   int batch_cmd;
131
132   /* buffered IO */
133   char *rbuf;
134   off_t next_cmd;
135   off_t next_read;
136
137   char *wbuf;
138   ssize_t wbuf_len;
139 };
140 typedef struct listen_socket_s listen_socket_t;
141
142 struct cache_item_s;
143 typedef struct cache_item_s cache_item_t;
144 struct cache_item_s
145 {
146   char *file;
147   char **values;
148   int values_num;
149   time_t last_flush_time;
150   time_t last_update_stamp;
151 #define CI_FLAGS_IN_TREE  (1<<0)
152 #define CI_FLAGS_IN_QUEUE (1<<1)
153   int flags;
154   pthread_cond_t  flushed;
155   cache_item_t *prev;
156   cache_item_t *next;
157 };
158
159 struct callback_flush_data_s
160 {
161   time_t now;
162   time_t abs_timeout;
163   char **keys;
164   size_t keys_num;
165 };
166 typedef struct callback_flush_data_s callback_flush_data_t;
167
168 enum queue_side_e
169 {
170   HEAD,
171   TAIL
172 };
173 typedef enum queue_side_e queue_side_t;
174
175 /* max length of socket command or response */
176 #define CMD_MAX 4096
177 #define RBUF_SIZE (CMD_MAX*2)
178
179 /*
180  * Variables
181  */
182 static int stay_foreground = 0;
183 static uid_t daemon_uid;
184
185 static listen_socket_t *listen_fds = NULL;
186 static size_t listen_fds_num = 0;
187
188 static int do_shutdown = 0;
189
190 static pthread_t queue_thread;
191
192 static pthread_t *connection_threads = NULL;
193 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
194 static int connection_threads_num = 0;
195
196 /* Cache stuff */
197 static GTree          *cache_tree = NULL;
198 static cache_item_t   *cache_queue_head = NULL;
199 static cache_item_t   *cache_queue_tail = NULL;
200 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
201 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
202
203 static int config_write_interval = 300;
204 static int config_write_jitter   = 0;
205 static int config_flush_interval = 3600;
206 static int config_flush_at_shutdown = 0;
207 static char *config_pid_file = NULL;
208 static char *config_base_dir = NULL;
209 static size_t _config_base_dir_len = 0;
210 static int config_write_base_only = 0;
211
212 static listen_socket_t **config_listen_address_list = NULL;
213 static int config_listen_address_list_len = 0;
214
215 static uint64_t stats_queue_length = 0;
216 static uint64_t stats_updates_received = 0;
217 static uint64_t stats_flush_received = 0;
218 static uint64_t stats_updates_written = 0;
219 static uint64_t stats_data_sets_written = 0;
220 static uint64_t stats_journal_bytes = 0;
221 static uint64_t stats_journal_rotate = 0;
222 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
223
224 /* Journaled updates */
225 static char *journal_cur = NULL;
226 static char *journal_old = NULL;
227 static FILE *journal_fh = NULL;
228 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
229 static int journal_write(char *cmd, char *args);
230 static void journal_done(void);
231 static void journal_rotate(void);
232
233 /* 
234  * Functions
235  */
236 static void sig_common (const char *sig) /* {{{ */
237 {
238   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
239   do_shutdown++;
240   pthread_cond_broadcast(&cache_cond);
241 } /* }}} void sig_common */
242
243 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
244 {
245   sig_common("INT");
246 } /* }}} void sig_int_handler */
247
248 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
249 {
250   sig_common("TERM");
251 } /* }}} void sig_term_handler */
252
253 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
254 {
255   config_flush_at_shutdown = 1;
256   sig_common("USR1");
257 } /* }}} void sig_usr1_handler */
258
259 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
260 {
261   config_flush_at_shutdown = 0;
262   sig_common("USR2");
263 } /* }}} void sig_usr2_handler */
264
265 static void install_signal_handlers(void) /* {{{ */
266 {
267   /* These structures are static, because `sigaction' behaves weird if the are
268    * overwritten.. */
269   static struct sigaction sa_int;
270   static struct sigaction sa_term;
271   static struct sigaction sa_pipe;
272   static struct sigaction sa_usr1;
273   static struct sigaction sa_usr2;
274
275   /* Install signal handlers */
276   memset (&sa_int, 0, sizeof (sa_int));
277   sa_int.sa_handler = sig_int_handler;
278   sigaction (SIGINT, &sa_int, NULL);
279
280   memset (&sa_term, 0, sizeof (sa_term));
281   sa_term.sa_handler = sig_term_handler;
282   sigaction (SIGTERM, &sa_term, NULL);
283
284   memset (&sa_pipe, 0, sizeof (sa_pipe));
285   sa_pipe.sa_handler = SIG_IGN;
286   sigaction (SIGPIPE, &sa_pipe, NULL);
287
288   memset (&sa_pipe, 0, sizeof (sa_usr1));
289   sa_usr1.sa_handler = sig_usr1_handler;
290   sigaction (SIGUSR1, &sa_usr1, NULL);
291
292   memset (&sa_usr2, 0, sizeof (sa_usr2));
293   sa_usr2.sa_handler = sig_usr2_handler;
294   sigaction (SIGUSR2, &sa_usr2, NULL);
295
296 } /* }}} void install_signal_handlers */
297
298 static int open_pidfile(char *action, int oflag) /* {{{ */
299 {
300   int fd;
301   char *file;
302
303   file = (config_pid_file != NULL)
304     ? config_pid_file
305     : LOCALSTATEDIR "/run/rrdcached.pid";
306
307   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
308   if (fd < 0)
309     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
310             action, file, rrd_strerror(errno));
311
312   return(fd);
313 } /* }}} static int open_pidfile */
314
315 /* check existing pid file to see whether a daemon is running */
316 static int check_pidfile(void)
317 {
318   int pid_fd;
319   pid_t pid;
320   char pid_str[16];
321
322   pid_fd = open_pidfile("open", O_RDWR);
323   if (pid_fd < 0)
324     return pid_fd;
325
326   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
327     return -1;
328
329   pid = atoi(pid_str);
330   if (pid <= 0)
331     return -1;
332
333   /* another running process that we can signal COULD be
334    * a competing rrdcached */
335   if (pid != getpid() && kill(pid, 0) == 0)
336   {
337     fprintf(stderr,
338             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
339     close(pid_fd);
340     return -1;
341   }
342
343   lseek(pid_fd, 0, SEEK_SET);
344   ftruncate(pid_fd, 0);
345
346   fprintf(stderr,
347           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
348           "rrdcached: starting normally.\n", pid);
349
350   return pid_fd;
351 } /* }}} static int check_pidfile */
352
353 static int write_pidfile (int fd) /* {{{ */
354 {
355   pid_t pid;
356   FILE *fh;
357
358   pid = getpid ();
359
360   fh = fdopen (fd, "w");
361   if (fh == NULL)
362   {
363     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
364     close(fd);
365     return (-1);
366   }
367
368   fprintf (fh, "%i\n", (int) pid);
369   fclose (fh);
370
371   return (0);
372 } /* }}} int write_pidfile */
373
374 static int remove_pidfile (void) /* {{{ */
375 {
376   char *file;
377   int status;
378
379   file = (config_pid_file != NULL)
380     ? config_pid_file
381     : LOCALSTATEDIR "/run/rrdcached.pid";
382
383   status = unlink (file);
384   if (status == 0)
385     return (0);
386   return (errno);
387 } /* }}} int remove_pidfile */
388
389 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
390 {
391   char *eol;
392
393   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
394                sock->next_read - sock->next_cmd);
395
396   if (eol == NULL)
397   {
398     /* no commands left, move remainder back to front of rbuf */
399     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
400             sock->next_read - sock->next_cmd);
401     sock->next_read -= sock->next_cmd;
402     sock->next_cmd = 0;
403     *len = 0;
404     return NULL;
405   }
406   else
407   {
408     char *cmd = sock->rbuf + sock->next_cmd;
409     *eol = '\0';
410
411     sock->next_cmd = eol - sock->rbuf + 1;
412
413     if (eol > sock->rbuf && *(eol-1) == '\r')
414       *(--eol) = '\0'; /* handle "\r\n" EOL */
415
416     *len = eol - cmd;
417
418     return cmd;
419   }
420
421   /* NOTREACHED */
422   assert(1==0);
423 }
424
425 /* add the characters directly to the write buffer */
426 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
427 {
428   char *new_buf;
429
430   assert(sock != NULL);
431
432   new_buf = realloc(sock->wbuf, sock->wbuf_len + len + 1);
433   if (new_buf == NULL)
434   {
435     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
436     return -1;
437   }
438
439   strncpy(new_buf + sock->wbuf_len, str, len + 1);
440
441   sock->wbuf = new_buf;
442   sock->wbuf_len += len;
443
444   return 0;
445 } /* }}} static int add_to_wbuf */
446
447 /* add the text to the "extra" info that's sent after the status line */
448 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
449 {
450   va_list argp;
451   char buffer[CMD_MAX];
452   int len;
453
454   if (sock == NULL) return 0; /* journal replay mode */
455   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
456
457   va_start(argp, fmt);
458 #ifdef HAVE_VSNPRINTF
459   len = vsnprintf(buffer, sizeof(buffer)-1, fmt, argp);
460 #else
461   len = vsprintf(buffer, fmt, argp);
462 #endif
463   va_end(argp);
464   if (len < 0)
465   {
466     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
467     return -1;
468   }
469
470   return add_to_wbuf(sock, buffer, len);
471 } /* }}} static int add_response_info */
472
473 static int count_lines(char *str) /* {{{ */
474 {
475   int lines = 0;
476
477   if (str != NULL)
478   {
479     while ((str = strchr(str, '\n')) != NULL)
480     {
481       ++lines;
482       ++str;
483     }
484   }
485
486   return lines;
487 } /* }}} static int count_lines */
488
489 /* send the response back to the user.
490  * returns 0 on success, -1 on error
491  * write buffer is always zeroed after this call */
492 static int send_response (listen_socket_t *sock, response_code rc,
493                           char *fmt, ...) /* {{{ */
494 {
495   va_list argp;
496   char buffer[CMD_MAX];
497   int lines;
498   ssize_t wrote;
499   int rclen, len;
500
501   if (sock == NULL) return rc;  /* journal replay mode */
502
503   if (sock->batch_start)
504   {
505     if (rc == RESP_OK)
506       return rc; /* no response on success during BATCH */
507     lines = sock->batch_cmd;
508   }
509   else if (rc == RESP_OK)
510     lines = count_lines(sock->wbuf);
511   else
512     lines = -1;
513
514   rclen = sprintf(buffer, "%d ", lines);
515   va_start(argp, fmt);
516 #ifdef HAVE_VSNPRINTF
517   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen-1, fmt, argp);
518 #else
519   len = vsprintf(buffer+rclen, fmt, argp);
520 #endif
521   va_end(argp);
522   if (len < 0)
523     return -1;
524
525   len += rclen;
526
527   /* append the result to the wbuf, don't write to the user */
528   if (sock->batch_start)
529     return add_to_wbuf(sock, buffer, len);
530
531   /* first write must be complete */
532   if (len != write(sock->fd, buffer, len))
533   {
534     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
535     return -1;
536   }
537
538   if (sock->wbuf != NULL && rc == RESP_OK)
539   {
540     wrote = 0;
541     while (wrote < sock->wbuf_len)
542     {
543       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
544       if (wb <= 0)
545       {
546         RRDD_LOG(LOG_INFO, "send_response: could not write results");
547         return -1;
548       }
549       wrote += wb;
550     }
551   }
552
553   free(sock->wbuf); sock->wbuf = NULL;
554   sock->wbuf_len = 0;
555
556   return 0;
557 } /* }}} */
558
559 static void wipe_ci_values(cache_item_t *ci, time_t when)
560 {
561   ci->values = NULL;
562   ci->values_num = 0;
563
564   ci->last_flush_time = when;
565   if (config_write_jitter > 0)
566     ci->last_flush_time += (random() % config_write_jitter);
567 }
568
569 /* remove_from_queue
570  * remove a "cache_item_t" item from the queue.
571  * must hold 'cache_lock' when calling this
572  */
573 static void remove_from_queue(cache_item_t *ci) /* {{{ */
574 {
575   if (ci == NULL) return;
576   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
577
578   if (ci->prev == NULL)
579     cache_queue_head = ci->next; /* reset head */
580   else
581     ci->prev->next = ci->next;
582
583   if (ci->next == NULL)
584     cache_queue_tail = ci->prev; /* reset the tail */
585   else
586     ci->next->prev = ci->prev;
587
588   ci->next = ci->prev = NULL;
589   ci->flags &= ~CI_FLAGS_IN_QUEUE;
590 } /* }}} static void remove_from_queue */
591
592 /* remove an entry from the tree and free all its resources.
593  * must hold 'cache lock' while calling this.
594  * returns 0 on success, otherwise errno */
595 static int forget_file(const char *file)
596 {
597   cache_item_t *ci;
598
599   ci = g_tree_lookup(cache_tree, file);
600   if (ci == NULL)
601     return ENOENT;
602
603   g_tree_remove (cache_tree, file);
604   remove_from_queue(ci);
605
606   for (int i=0; i < ci->values_num; i++)
607     free(ci->values[i]);
608
609   free (ci->values);
610   free (ci->file);
611
612   /* in case anyone is waiting */
613   pthread_cond_broadcast(&ci->flushed);
614
615   free (ci);
616
617   return 0;
618 } /* }}} static int forget_file */
619
620 /*
621  * enqueue_cache_item:
622  * `cache_lock' must be acquired before calling this function!
623  */
624 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
625     queue_side_t side)
626 {
627   if (ci == NULL)
628     return (-1);
629
630   if (ci->values_num == 0)
631     return (0);
632
633   if (side == HEAD)
634   {
635     if (cache_queue_head == ci)
636       return 0;
637
638     /* remove if further down in queue */
639     remove_from_queue(ci);
640
641     ci->prev = NULL;
642     ci->next = cache_queue_head;
643     if (ci->next != NULL)
644       ci->next->prev = ci;
645     cache_queue_head = ci;
646
647     if (cache_queue_tail == NULL)
648       cache_queue_tail = cache_queue_head;
649   }
650   else /* (side == TAIL) */
651   {
652     /* We don't move values back in the list.. */
653     if (ci->flags & CI_FLAGS_IN_QUEUE)
654       return (0);
655
656     assert (ci->next == NULL);
657     assert (ci->prev == NULL);
658
659     ci->prev = cache_queue_tail;
660
661     if (cache_queue_tail == NULL)
662       cache_queue_head = ci;
663     else
664       cache_queue_tail->next = ci;
665
666     cache_queue_tail = ci;
667   }
668
669   ci->flags |= CI_FLAGS_IN_QUEUE;
670
671   pthread_cond_broadcast(&cache_cond);
672   pthread_mutex_lock (&stats_lock);
673   stats_queue_length++;
674   pthread_mutex_unlock (&stats_lock);
675
676   return (0);
677 } /* }}} int enqueue_cache_item */
678
679 /*
680  * tree_callback_flush:
681  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
682  * while this is in progress.
683  */
684 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
685     gpointer data)
686 {
687   cache_item_t *ci;
688   callback_flush_data_t *cfd;
689
690   ci = (cache_item_t *) value;
691   cfd = (callback_flush_data_t *) data;
692
693   if (ci->flags & CI_FLAGS_IN_QUEUE)
694     return FALSE;
695
696   if ((ci->last_flush_time <= cfd->abs_timeout)
697       && (ci->values_num > 0))
698   {
699     enqueue_cache_item (ci, TAIL);
700   }
701   else if ((do_shutdown != 0)
702       && (ci->values_num > 0))
703   {
704     enqueue_cache_item (ci, TAIL);
705   }
706   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
707       && (ci->values_num <= 0))
708   {
709     char **temp;
710
711     temp = (char **) realloc (cfd->keys,
712         sizeof (char *) * (cfd->keys_num + 1));
713     if (temp == NULL)
714     {
715       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
716       return (FALSE);
717     }
718     cfd->keys = temp;
719     /* Make really sure this points to the _same_ place */
720     assert ((char *) key == ci->file);
721     cfd->keys[cfd->keys_num] = (char *) key;
722     cfd->keys_num++;
723   }
724
725   return (FALSE);
726 } /* }}} gboolean tree_callback_flush */
727
728 static int flush_old_values (int max_age)
729 {
730   callback_flush_data_t cfd;
731   size_t k;
732
733   memset (&cfd, 0, sizeof (cfd));
734   /* Pass the current time as user data so that we don't need to call
735    * `time' for each node. */
736   cfd.now = time (NULL);
737   cfd.keys = NULL;
738   cfd.keys_num = 0;
739
740   if (max_age > 0)
741     cfd.abs_timeout = cfd.now - max_age;
742   else
743     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
744
745   /* `tree_callback_flush' will return the keys of all values that haven't
746    * been touched in the last `config_flush_interval' seconds in `cfd'.
747    * The char*'s in this array point to the same memory as ci->file, so we
748    * don't need to free them separately. */
749   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
750
751   for (k = 0; k < cfd.keys_num; k++)
752   {
753     /* should never fail, since we have held the cache_lock
754      * the entire time */
755     assert( forget_file(cfd.keys[k]) == 0 );
756   }
757
758   if (cfd.keys != NULL)
759   {
760     free (cfd.keys);
761     cfd.keys = NULL;
762   }
763
764   return (0);
765 } /* int flush_old_values */
766
767 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
768 {
769   struct timeval now;
770   struct timespec next_flush;
771   int final_flush = 0; /* make sure we only flush once on shutdown */
772
773   gettimeofday (&now, NULL);
774   next_flush.tv_sec = now.tv_sec + config_flush_interval;
775   next_flush.tv_nsec = 1000 * now.tv_usec;
776
777   pthread_mutex_lock (&cache_lock);
778   while ((do_shutdown == 0) || (cache_queue_head != NULL))
779   {
780     cache_item_t *ci;
781     char *file;
782     char **values;
783     int values_num;
784     int status;
785     int i;
786
787     /* First, check if it's time to do the cache flush. */
788     gettimeofday (&now, NULL);
789     if ((now.tv_sec > next_flush.tv_sec)
790         || ((now.tv_sec == next_flush.tv_sec)
791           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
792     {
793       /* Flush all values that haven't been written in the last
794        * `config_write_interval' seconds. */
795       flush_old_values (config_write_interval);
796
797       /* Determine the time of the next cache flush. */
798       next_flush.tv_sec =
799         now.tv_sec + next_flush.tv_sec % config_flush_interval;
800
801       /* unlock the cache while we rotate so we don't block incoming
802        * updates if the fsync() blocks on disk I/O */
803       pthread_mutex_unlock(&cache_lock);
804       journal_rotate();
805       pthread_mutex_lock(&cache_lock);
806     }
807
808     /* Now, check if there's something to store away. If not, wait until
809      * something comes in or it's time to do the cache flush.  if we are
810      * shutting down, do not wait around.  */
811     if (cache_queue_head == NULL && !do_shutdown)
812     {
813       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
814       if ((status != 0) && (status != ETIMEDOUT))
815       {
816         RRDD_LOG (LOG_ERR, "queue_thread_main: "
817             "pthread_cond_timedwait returned %i.", status);
818       }
819     }
820
821     /* We're about to shut down */
822     if (do_shutdown != 0 && !final_flush++)
823     {
824       if (config_flush_at_shutdown)
825         flush_old_values (-1); /* flush everything */
826       else
827         break;
828     }
829
830     /* Check if a value has arrived. This may be NULL if we timed out or there
831      * was an interrupt such as a signal. */
832     if (cache_queue_head == NULL)
833       continue;
834
835     ci = cache_queue_head;
836
837     /* copy the relevant parts */
838     file = strdup (ci->file);
839     if (file == NULL)
840     {
841       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
842       continue;
843     }
844
845     assert(ci->values != NULL);
846     assert(ci->values_num > 0);
847
848     values = ci->values;
849     values_num = ci->values_num;
850
851     wipe_ci_values(ci, time(NULL));
852     remove_from_queue(ci);
853
854     pthread_mutex_lock (&stats_lock);
855     assert (stats_queue_length > 0);
856     stats_queue_length--;
857     pthread_mutex_unlock (&stats_lock);
858
859     pthread_mutex_unlock (&cache_lock);
860
861     rrd_clear_error ();
862     status = rrd_update_r (file, NULL, values_num, (void *) values);
863     if (status != 0)
864     {
865       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
866           "rrd_update_r (%s) failed with status %i. (%s)",
867           file, status, rrd_get_error());
868     }
869
870     journal_write("wrote", file);
871     pthread_cond_broadcast(&ci->flushed);
872
873     for (i = 0; i < values_num; i++)
874       free (values[i]);
875
876     free(values);
877     free(file);
878
879     if (status == 0)
880     {
881       pthread_mutex_lock (&stats_lock);
882       stats_updates_written++;
883       stats_data_sets_written += values_num;
884       pthread_mutex_unlock (&stats_lock);
885     }
886
887     pthread_mutex_lock (&cache_lock);
888
889     /* We're about to shut down */
890     if (do_shutdown != 0 && !final_flush++)
891     {
892       if (config_flush_at_shutdown)
893           flush_old_values (-1); /* flush everything */
894       else
895         break;
896     }
897   } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
898   pthread_mutex_unlock (&cache_lock);
899
900   if (config_flush_at_shutdown)
901   {
902     assert(cache_queue_head == NULL);
903     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
904   }
905
906   journal_done();
907
908   return (NULL);
909 } /* }}} void *queue_thread_main */
910
911 static int buffer_get_field (char **buffer_ret, /* {{{ */
912     size_t *buffer_size_ret, char **field_ret)
913 {
914   char *buffer;
915   size_t buffer_pos;
916   size_t buffer_size;
917   char *field;
918   size_t field_size;
919   int status;
920
921   buffer = *buffer_ret;
922   buffer_pos = 0;
923   buffer_size = *buffer_size_ret;
924   field = *buffer_ret;
925   field_size = 0;
926
927   if (buffer_size <= 0)
928     return (-1);
929
930   /* This is ensured by `handle_request'. */
931   assert (buffer[buffer_size - 1] == '\0');
932
933   status = -1;
934   while (buffer_pos < buffer_size)
935   {
936     /* Check for end-of-field or end-of-buffer */
937     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
938     {
939       field[field_size] = 0;
940       field_size++;
941       buffer_pos++;
942       status = 0;
943       break;
944     }
945     /* Handle escaped characters. */
946     else if (buffer[buffer_pos] == '\\')
947     {
948       if (buffer_pos >= (buffer_size - 1))
949         break;
950       buffer_pos++;
951       field[field_size] = buffer[buffer_pos];
952       field_size++;
953       buffer_pos++;
954     }
955     /* Normal operation */ 
956     else
957     {
958       field[field_size] = buffer[buffer_pos];
959       field_size++;
960       buffer_pos++;
961     }
962   } /* while (buffer_pos < buffer_size) */
963
964   if (status != 0)
965     return (status);
966
967   *buffer_ret = buffer + buffer_pos;
968   *buffer_size_ret = buffer_size - buffer_pos;
969   *field_ret = field;
970
971   return (0);
972 } /* }}} int buffer_get_field */
973
974 /* if we're restricting writes to the base directory,
975  * check whether the file falls within the dir
976  * returns 1 if OK, otherwise 0
977  */
978 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
979 {
980   assert(file != NULL);
981
982   if (!config_write_base_only
983       || sock == NULL /* journal replay */
984       || config_base_dir == NULL)
985     return 1;
986
987   if (strstr(file, "../") != NULL) goto err;
988
989   /* relative paths without "../" are ok */
990   if (*file != '/') return 1;
991
992   /* file must be of the format base + "/" + <1+ char filename> */
993   if (strlen(file) < _config_base_dir_len + 2) goto err;
994   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
995   if (*(file + _config_base_dir_len) != '/') goto err;
996
997   return 1;
998
999 err:
1000   if (sock != NULL && sock->fd >= 0)
1001     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1002
1003   return 0;
1004 } /* }}} static int check_file_access */
1005
1006 /* when using a base dir, convert relative paths to absolute paths.
1007  * if necessary, modifies the "filename" pointer to point
1008  * to the new path created in "tmp".  "tmp" is provided
1009  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1010  *
1011  * this allows us to optimize for the expected case (absolute path)
1012  * with a no-op.
1013  */
1014 static void get_abs_path(char **filename, char *tmp)
1015 {
1016   assert(tmp != NULL);
1017   assert(filename != NULL && *filename != NULL);
1018
1019   if (config_base_dir == NULL || **filename == '/')
1020     return;
1021
1022   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1023   *filename = tmp;
1024 } /* }}} static int get_abs_path */
1025
1026 /* returns 1 if we have the required privilege level,
1027  * otherwise issue an error to the user on sock */
1028 static int has_privilege (listen_socket_t *sock, /* {{{ */
1029                           socket_privilege priv)
1030 {
1031   if (sock == NULL) /* journal replay */
1032     return 1;
1033
1034   if (sock->privilege >= priv)
1035     return 1;
1036
1037   return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1038 } /* }}} static int has_privilege */
1039
1040 static int flush_file (const char *filename) /* {{{ */
1041 {
1042   cache_item_t *ci;
1043
1044   pthread_mutex_lock (&cache_lock);
1045
1046   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1047   if (ci == NULL)
1048   {
1049     pthread_mutex_unlock (&cache_lock);
1050     return (ENOENT);
1051   }
1052
1053   if (ci->values_num > 0)
1054   {
1055     /* Enqueue at head */
1056     enqueue_cache_item (ci, HEAD);
1057     pthread_cond_wait(&ci->flushed, &cache_lock);
1058   }
1059
1060   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1061    * may have been purged during our cond_wait() */
1062
1063   pthread_mutex_unlock(&cache_lock);
1064
1065   return (0);
1066 } /* }}} int flush_file */
1067
1068 static int handle_request_help (listen_socket_t *sock, /* {{{ */
1069     char *buffer, size_t buffer_size)
1070 {
1071   int status;
1072   char **help_text;
1073   char *command;
1074
1075   char *help_help[2] =
1076   {
1077     "Command overview\n"
1078     ,
1079     "HELP [<command>]\n"
1080     "FLUSH <filename>\n"
1081     "FLUSHALL\n"
1082     "PENDING <filename>\n"
1083     "FORGET <filename>\n"
1084     "UPDATE <filename> <values> [<values> ...]\n"
1085     "BATCH\n"
1086     "STATS\n"
1087   };
1088
1089   char *help_flush[2] =
1090   {
1091     "Help for FLUSH\n"
1092     ,
1093     "Usage: FLUSH <filename>\n"
1094     "\n"
1095     "Adds the given filename to the head of the update queue and returns\n"
1096     "after is has been dequeued.\n"
1097   };
1098
1099   char *help_flushall[2] =
1100   {
1101     "Help for FLUSHALL\n"
1102     ,
1103     "Usage: FLUSHALL\n"
1104     "\n"
1105     "Triggers writing of all pending updates.  Returns immediately.\n"
1106   };
1107
1108   char *help_pending[2] =
1109   {
1110     "Help for PENDING\n"
1111     ,
1112     "Usage: PENDING <filename>\n"
1113     "\n"
1114     "Shows any 'pending' updates for a file, in order.\n"
1115     "The updates shown have not yet been written to the underlying RRD file.\n"
1116   };
1117
1118   char *help_forget[2] =
1119   {
1120     "Help for FORGET\n"
1121     ,
1122     "Usage: FORGET <filename>\n"
1123     "\n"
1124     "Removes the file completely from the cache.\n"
1125     "Any pending updates for the file will be lost.\n"
1126   };
1127
1128   char *help_update[2] =
1129   {
1130     "Help for UPDATE\n"
1131     ,
1132     "Usage: UPDATE <filename> <values> [<values> ...]\n"
1133     "\n"
1134     "Adds the given file to the internal cache if it is not yet known and\n"
1135     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1136     "for details.\n"
1137     "\n"
1138     "Each <values> has the following form:\n"
1139     "  <values> = <time>:<value>[:<value>[...]]\n"
1140     "See the rrdupdate(1) manpage for details.\n"
1141   };
1142
1143   char *help_stats[2] =
1144   {
1145     "Help for STATS\n"
1146     ,
1147     "Usage: STATS\n"
1148     "\n"
1149     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1150     "a description of the values.\n"
1151   };
1152
1153   char *help_batch[2] =
1154   {
1155     "Help for BATCH\n"
1156     ,
1157     "The 'BATCH' command permits the client to initiate a bulk load\n"
1158     "   of commands to rrdcached.\n"
1159     "\n"
1160     "Usage:\n"
1161     "\n"
1162     "    client: BATCH\n"
1163     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1164     "    client: command #1\n"
1165     "    client: command #2\n"
1166     "    client: ... and so on\n"
1167     "    client: .\n"
1168     "    server: 2 errors\n"
1169     "    server: 7 message for command #7\n"
1170     "    server: 9 message for command #9\n"
1171     "\n"
1172     "For more information, consult the rrdcached(1) documentation.\n"
1173   };
1174
1175   status = buffer_get_field (&buffer, &buffer_size, &command);
1176   if (status != 0)
1177     help_text = help_help;
1178   else
1179   {
1180     if (strcasecmp (command, "update") == 0)
1181       help_text = help_update;
1182     else if (strcasecmp (command, "flush") == 0)
1183       help_text = help_flush;
1184     else if (strcasecmp (command, "flushall") == 0)
1185       help_text = help_flushall;
1186     else if (strcasecmp (command, "pending") == 0)
1187       help_text = help_pending;
1188     else if (strcasecmp (command, "forget") == 0)
1189       help_text = help_forget;
1190     else if (strcasecmp (command, "stats") == 0)
1191       help_text = help_stats;
1192     else if (strcasecmp (command, "batch") == 0)
1193       help_text = help_batch;
1194     else
1195       help_text = help_help;
1196   }
1197
1198   add_response_info(sock, help_text[1]);
1199   return send_response(sock, RESP_OK, help_text[0]);
1200 } /* }}} int handle_request_help */
1201
1202 static int handle_request_stats (listen_socket_t *sock) /* {{{ */
1203 {
1204   uint64_t copy_queue_length;
1205   uint64_t copy_updates_received;
1206   uint64_t copy_flush_received;
1207   uint64_t copy_updates_written;
1208   uint64_t copy_data_sets_written;
1209   uint64_t copy_journal_bytes;
1210   uint64_t copy_journal_rotate;
1211
1212   uint64_t tree_nodes_number;
1213   uint64_t tree_depth;
1214
1215   pthread_mutex_lock (&stats_lock);
1216   copy_queue_length       = stats_queue_length;
1217   copy_updates_received   = stats_updates_received;
1218   copy_flush_received     = stats_flush_received;
1219   copy_updates_written    = stats_updates_written;
1220   copy_data_sets_written  = stats_data_sets_written;
1221   copy_journal_bytes      = stats_journal_bytes;
1222   copy_journal_rotate     = stats_journal_rotate;
1223   pthread_mutex_unlock (&stats_lock);
1224
1225   pthread_mutex_lock (&cache_lock);
1226   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1227   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1228   pthread_mutex_unlock (&cache_lock);
1229
1230   add_response_info(sock,
1231                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1232   add_response_info(sock,
1233                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1234   add_response_info(sock,
1235                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1236   add_response_info(sock,
1237                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1238   add_response_info(sock,
1239                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1240   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1241   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1242   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1243   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1244
1245   send_response(sock, RESP_OK, "Statistics follow\n");
1246
1247   return (0);
1248 } /* }}} int handle_request_stats */
1249
1250 static int handle_request_flush (listen_socket_t *sock, /* {{{ */
1251     char *buffer, size_t buffer_size)
1252 {
1253   char *file, file_tmp[PATH_MAX];
1254   int status;
1255
1256   status = buffer_get_field (&buffer, &buffer_size, &file);
1257   if (status != 0)
1258   {
1259     return send_response(sock, RESP_ERR, "Usage: flush <filename>\n");
1260   }
1261   else
1262   {
1263     pthread_mutex_lock(&stats_lock);
1264     stats_flush_received++;
1265     pthread_mutex_unlock(&stats_lock);
1266
1267     get_abs_path(&file, file_tmp);
1268     if (!check_file_access(file, sock)) return 0;
1269
1270     status = flush_file (file);
1271     if (status == 0)
1272       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1273     else if (status == ENOENT)
1274     {
1275       /* no file in our tree; see whether it exists at all */
1276       struct stat statbuf;
1277
1278       memset(&statbuf, 0, sizeof(statbuf));
1279       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1280         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1281       else
1282         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1283     }
1284     else if (status < 0)
1285       return send_response(sock, RESP_ERR, "Internal error.\n");
1286     else
1287       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1288   }
1289
1290   /* NOTREACHED */
1291   assert(1==0);
1292 } /* }}} int handle_request_flush */
1293
1294 static int handle_request_flushall(listen_socket_t *sock) /* {{{ */
1295 {
1296   int status;
1297
1298   status = has_privilege(sock, PRIV_HIGH);
1299   if (status <= 0)
1300     return status;
1301
1302   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1303
1304   pthread_mutex_lock(&cache_lock);
1305   flush_old_values(-1);
1306   pthread_mutex_unlock(&cache_lock);
1307
1308   return send_response(sock, RESP_OK, "Started flush.\n");
1309 } /* }}} static int handle_request_flushall */
1310
1311 static int handle_request_pending(listen_socket_t *sock, /* {{{ */
1312                                   char *buffer, size_t buffer_size)
1313 {
1314   int status;
1315   char *file, file_tmp[PATH_MAX];
1316   cache_item_t *ci;
1317
1318   status = buffer_get_field(&buffer, &buffer_size, &file);
1319   if (status != 0)
1320     return send_response(sock, RESP_ERR,
1321                          "Usage: PENDING <filename>\n");
1322
1323   status = has_privilege(sock, PRIV_HIGH);
1324   if (status <= 0)
1325     return status;
1326
1327   get_abs_path(&file, file_tmp);
1328
1329   pthread_mutex_lock(&cache_lock);
1330   ci = g_tree_lookup(cache_tree, file);
1331   if (ci == NULL)
1332   {
1333     pthread_mutex_unlock(&cache_lock);
1334     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1335   }
1336
1337   for (int i=0; i < ci->values_num; i++)
1338     add_response_info(sock, "%s\n", ci->values[i]);
1339
1340   pthread_mutex_unlock(&cache_lock);
1341   return send_response(sock, RESP_OK, "updates pending\n");
1342 } /* }}} static int handle_request_pending */
1343
1344 static int handle_request_forget(listen_socket_t *sock, /* {{{ */
1345                                  char *buffer, size_t buffer_size)
1346 {
1347   int status;
1348   char *file, file_tmp[PATH_MAX];
1349
1350   status = buffer_get_field(&buffer, &buffer_size, &file);
1351   if (status != 0)
1352     return send_response(sock, RESP_ERR,
1353                          "Usage: FORGET <filename>\n");
1354
1355   status = has_privilege(sock, PRIV_HIGH);
1356   if (status <= 0)
1357     return status;
1358
1359   get_abs_path(&file, file_tmp);
1360   if (!check_file_access(file, sock)) return 0;
1361
1362   pthread_mutex_lock(&cache_lock);
1363   status = forget_file(file);
1364   pthread_mutex_unlock(&cache_lock);
1365
1366   if (status == 0)
1367   {
1368     if (sock != NULL)
1369       journal_write("forget", file);
1370
1371     return send_response(sock, RESP_OK, "Gone!\n");
1372   }
1373   else
1374     return send_response(sock, RESP_ERR, "cannot forget: %s\n",
1375                          status < 0 ? "Internal error" : rrd_strerror(status));
1376
1377   /* NOTREACHED */
1378   assert(1==0);
1379 } /* }}} static int handle_request_forget */
1380
1381 static int handle_request_update (listen_socket_t *sock, /* {{{ */
1382                                   time_t now,
1383                                   char *buffer, size_t buffer_size)
1384 {
1385   char *file, file_tmp[PATH_MAX];
1386   int values_num = 0;
1387   int status;
1388   char orig_buf[CMD_MAX];
1389
1390   cache_item_t *ci;
1391
1392   status = has_privilege(sock, PRIV_HIGH);
1393   if (status <= 0)
1394     return status;
1395
1396   /* save it for the journal later */
1397   strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1398
1399   status = buffer_get_field (&buffer, &buffer_size, &file);
1400   if (status != 0)
1401     return send_response(sock, RESP_ERR,
1402                          "Usage: UPDATE <filename> <values> [<values> ...]\n");
1403
1404   pthread_mutex_lock(&stats_lock);
1405   stats_updates_received++;
1406   pthread_mutex_unlock(&stats_lock);
1407
1408   get_abs_path(&file, file_tmp);
1409   if (!check_file_access(file, sock)) return 0;
1410
1411   pthread_mutex_lock (&cache_lock);
1412   ci = g_tree_lookup (cache_tree, file);
1413
1414   if (ci == NULL) /* {{{ */
1415   {
1416     struct stat statbuf;
1417
1418     /* don't hold the lock while we setup; stat(2) might block */
1419     pthread_mutex_unlock(&cache_lock);
1420
1421     memset (&statbuf, 0, sizeof (statbuf));
1422     status = stat (file, &statbuf);
1423     if (status != 0)
1424     {
1425       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1426
1427       status = errno;
1428       if (status == ENOENT)
1429         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1430       else
1431         return send_response(sock, RESP_ERR,
1432                              "stat failed with error %i.\n", status);
1433     }
1434     if (!S_ISREG (statbuf.st_mode))
1435       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1436
1437     if (access(file, R_OK|W_OK) != 0)
1438       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1439                            file, rrd_strerror(errno));
1440
1441     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1442     if (ci == NULL)
1443     {
1444       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1445
1446       return send_response(sock, RESP_ERR, "malloc failed.\n");
1447     }
1448     memset (ci, 0, sizeof (cache_item_t));
1449
1450     ci->file = strdup (file);
1451     if (ci->file == NULL)
1452     {
1453       free (ci);
1454       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1455
1456       return send_response(sock, RESP_ERR, "strdup failed.\n");
1457     }
1458
1459     wipe_ci_values(ci, now);
1460     ci->flags = CI_FLAGS_IN_TREE;
1461     pthread_cond_init(&ci->flushed, NULL);
1462
1463     pthread_mutex_lock(&cache_lock);
1464     g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1465   } /* }}} */
1466   assert (ci != NULL);
1467
1468   /* don't re-write updates in replay mode */
1469   if (sock != NULL)
1470     journal_write("update", orig_buf);
1471
1472   while (buffer_size > 0)
1473   {
1474     char **temp;
1475     char *value;
1476     time_t stamp;
1477     char *eostamp;
1478
1479     status = buffer_get_field (&buffer, &buffer_size, &value);
1480     if (status != 0)
1481     {
1482       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1483       break;
1484     }
1485
1486     /* make sure update time is always moving forward */
1487     stamp = strtol(value, &eostamp, 10);
1488     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1489     {
1490       pthread_mutex_unlock(&cache_lock);
1491       return send_response(sock, RESP_ERR,
1492                            "Cannot find timestamp in '%s'!\n", value);
1493     }
1494     else if (stamp <= ci->last_update_stamp)
1495     {
1496       pthread_mutex_unlock(&cache_lock);
1497       return send_response(sock, RESP_ERR,
1498                            "illegal attempt to update using time %ld when last"
1499                            " update time is %ld (minimum one second step)\n",
1500                            stamp, ci->last_update_stamp);
1501     }
1502     else
1503       ci->last_update_stamp = stamp;
1504
1505     temp = (char **) realloc (ci->values,
1506         sizeof (char *) * (ci->values_num + 1));
1507     if (temp == NULL)
1508     {
1509       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1510       continue;
1511     }
1512     ci->values = temp;
1513
1514     ci->values[ci->values_num] = strdup (value);
1515     if (ci->values[ci->values_num] == NULL)
1516     {
1517       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1518       continue;
1519     }
1520     ci->values_num++;
1521
1522     values_num++;
1523   }
1524
1525   if (((now - ci->last_flush_time) >= config_write_interval)
1526       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1527       && (ci->values_num > 0))
1528   {
1529     enqueue_cache_item (ci, TAIL);
1530   }
1531
1532   pthread_mutex_unlock (&cache_lock);
1533
1534   if (values_num < 1)
1535     return send_response(sock, RESP_ERR, "No values updated.\n");
1536   else
1537     return send_response(sock, RESP_OK,
1538                          "errors, enqueued %i value(s).\n", values_num);
1539
1540   /* NOTREACHED */
1541   assert(1==0);
1542
1543 } /* }}} int handle_request_update */
1544
1545 /* we came across a "WROTE" entry during journal replay.
1546  * throw away any values that we have accumulated for this file
1547  */
1548 static int handle_request_wrote (const char *buffer, time_t now) /* {{{ */
1549 {
1550   int i;
1551   cache_item_t *ci;
1552   const char *file = buffer;
1553
1554   pthread_mutex_lock(&cache_lock);
1555
1556   ci = g_tree_lookup(cache_tree, file);
1557   if (ci == NULL)
1558   {
1559     pthread_mutex_unlock(&cache_lock);
1560     return (0);
1561   }
1562
1563   if (ci->values)
1564   {
1565     for (i=0; i < ci->values_num; i++)
1566       free(ci->values[i]);
1567
1568     free(ci->values);
1569   }
1570
1571   wipe_ci_values(ci, now);
1572   remove_from_queue(ci);
1573
1574   pthread_mutex_unlock(&cache_lock);
1575   return (0);
1576 } /* }}} int handle_request_wrote */
1577
1578 /* start "BATCH" processing */
1579 static int batch_start (listen_socket_t *sock) /* {{{ */
1580 {
1581   int status;
1582   if (sock->batch_start)
1583     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1584
1585   status = send_response(sock, RESP_OK,
1586                          "Go ahead.  End with dot '.' on its own line.\n");
1587   sock->batch_start = time(NULL);
1588   sock->batch_cmd = 0;
1589
1590   return status;
1591 } /* }}} static int batch_start */
1592
1593 /* finish "BATCH" processing and return results to the client */
1594 static int batch_done (listen_socket_t *sock) /* {{{ */
1595 {
1596   assert(sock->batch_start);
1597   sock->batch_start = 0;
1598   sock->batch_cmd  = 0;
1599   return send_response(sock, RESP_OK, "errors\n");
1600 } /* }}} static int batch_done */
1601
1602 /* if sock==NULL, we are in journal replay mode */
1603 static int handle_request (listen_socket_t *sock, /* {{{ */
1604                            time_t now,
1605                            char *buffer, size_t buffer_size)
1606 {
1607   char *buffer_ptr;
1608   char *command;
1609   int status;
1610
1611   assert (buffer[buffer_size - 1] == '\0');
1612
1613   buffer_ptr = buffer;
1614   command = NULL;
1615   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1616   if (status != 0)
1617   {
1618     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1619     return (-1);
1620   }
1621
1622   if (sock != NULL && sock->batch_start)
1623     sock->batch_cmd++;
1624
1625   if (strcasecmp (command, "update") == 0)
1626     return (handle_request_update (sock, now, buffer_ptr, buffer_size));
1627   else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
1628   {
1629     /* this is only valid in replay mode */
1630     return (handle_request_wrote (buffer_ptr, now));
1631   }
1632   else if (strcasecmp (command, "flush") == 0)
1633     return (handle_request_flush (sock, buffer_ptr, buffer_size));
1634   else if (strcasecmp (command, "flushall") == 0)
1635     return (handle_request_flushall(sock));
1636   else if (strcasecmp (command, "pending") == 0)
1637     return (handle_request_pending(sock, buffer_ptr, buffer_size));
1638   else if (strcasecmp (command, "forget") == 0)
1639     return (handle_request_forget(sock, buffer_ptr, buffer_size));
1640   else if (strcasecmp (command, "stats") == 0)
1641     return (handle_request_stats (sock));
1642   else if (strcasecmp (command, "help") == 0)
1643     return (handle_request_help (sock, buffer_ptr, buffer_size));
1644   else if (strcasecmp (command, "batch") == 0 && sock != NULL)
1645     return batch_start(sock);
1646   else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_start)
1647     return batch_done(sock);
1648   else
1649     return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
1650
1651   /* NOTREACHED */
1652   assert(1==0);
1653 } /* }}} int handle_request */
1654
1655 /* MUST NOT hold journal_lock before calling this */
1656 static void journal_rotate(void) /* {{{ */
1657 {
1658   FILE *old_fh = NULL;
1659   int new_fd;
1660
1661   if (journal_cur == NULL || journal_old == NULL)
1662     return;
1663
1664   pthread_mutex_lock(&journal_lock);
1665
1666   /* we rotate this way (rename before close) so that the we can release
1667    * the journal lock as fast as possible.  Journal writes to the new
1668    * journal can proceed immediately after the new file is opened.  The
1669    * fclose can then block without affecting new updates.
1670    */
1671   if (journal_fh != NULL)
1672   {
1673     old_fh = journal_fh;
1674     journal_fh = NULL;
1675     rename(journal_cur, journal_old);
1676     ++stats_journal_rotate;
1677   }
1678
1679   new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1680                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1681   if (new_fd >= 0)
1682   {
1683     journal_fh = fdopen(new_fd, "a");
1684     if (journal_fh == NULL)
1685       close(new_fd);
1686   }
1687
1688   pthread_mutex_unlock(&journal_lock);
1689
1690   if (old_fh != NULL)
1691     fclose(old_fh);
1692
1693   if (journal_fh == NULL)
1694   {
1695     RRDD_LOG(LOG_CRIT,
1696              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1697              journal_cur, rrd_strerror(errno));
1698
1699     RRDD_LOG(LOG_ERR,
1700              "JOURNALING DISABLED: All values will be flushed at shutdown");
1701     config_flush_at_shutdown = 1;
1702   }
1703
1704 } /* }}} static void journal_rotate */
1705
1706 static void journal_done(void) /* {{{ */
1707 {
1708   if (journal_cur == NULL)
1709     return;
1710
1711   pthread_mutex_lock(&journal_lock);
1712   if (journal_fh != NULL)
1713   {
1714     fclose(journal_fh);
1715     journal_fh = NULL;
1716   }
1717
1718   if (config_flush_at_shutdown)
1719   {
1720     RRDD_LOG(LOG_INFO, "removing journals");
1721     unlink(journal_old);
1722     unlink(journal_cur);
1723   }
1724   else
1725   {
1726     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1727              "journals will be used at next startup");
1728   }
1729
1730   pthread_mutex_unlock(&journal_lock);
1731
1732 } /* }}} static void journal_done */
1733
1734 static int journal_write(char *cmd, char *args) /* {{{ */
1735 {
1736   int chars;
1737
1738   if (journal_fh == NULL)
1739     return 0;
1740
1741   pthread_mutex_lock(&journal_lock);
1742   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1743   pthread_mutex_unlock(&journal_lock);
1744
1745   if (chars > 0)
1746   {
1747     pthread_mutex_lock(&stats_lock);
1748     stats_journal_bytes += chars;
1749     pthread_mutex_unlock(&stats_lock);
1750   }
1751
1752   return chars;
1753 } /* }}} static int journal_write */
1754
1755 static int journal_replay (const char *file) /* {{{ */
1756 {
1757   FILE *fh;
1758   int entry_cnt = 0;
1759   int fail_cnt = 0;
1760   uint64_t line = 0;
1761   char entry[CMD_MAX];
1762   time_t now;
1763
1764   if (file == NULL) return 0;
1765
1766   {
1767     char *reason;
1768     int status = 0;
1769     struct stat statbuf;
1770
1771     memset(&statbuf, 0, sizeof(statbuf));
1772     if (stat(file, &statbuf) != 0)
1773     {
1774       if (errno == ENOENT)
1775         return 0;
1776
1777       reason = "stat error";
1778       status = errno;
1779     }
1780     else if (!S_ISREG(statbuf.st_mode))
1781     {
1782       reason = "not a regular file";
1783       status = EPERM;
1784     }
1785     if (statbuf.st_uid != daemon_uid)
1786     {
1787       reason = "not owned by daemon user";
1788       status = EACCES;
1789     }
1790     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1791     {
1792       reason = "must not be user/group writable";
1793       status = EACCES;
1794     }
1795
1796     if (status != 0)
1797     {
1798       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1799                file, rrd_strerror(status), reason);
1800       return 0;
1801     }
1802   }
1803
1804   fh = fopen(file, "r");
1805   if (fh == NULL)
1806   {
1807     if (errno != ENOENT)
1808       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1809                file, rrd_strerror(errno));
1810     return 0;
1811   }
1812   else
1813     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1814
1815   now = time(NULL);
1816
1817   while(!feof(fh))
1818   {
1819     size_t entry_len;
1820
1821     ++line;
1822     if (fgets(entry, sizeof(entry), fh) == NULL)
1823       break;
1824     entry_len = strlen(entry);
1825
1826     /* check \n termination in case journal writing crashed mid-line */
1827     if (entry_len == 0)
1828       continue;
1829     else if (entry[entry_len - 1] != '\n')
1830     {
1831       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1832       ++fail_cnt;
1833       continue;
1834     }
1835
1836     entry[entry_len - 1] = '\0';
1837
1838     if (handle_request(NULL, now, entry, entry_len) == 0)
1839       ++entry_cnt;
1840     else
1841       ++fail_cnt;
1842   }
1843
1844   fclose(fh);
1845
1846   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1847            entry_cnt, fail_cnt);
1848
1849   return entry_cnt > 0 ? 1 : 0;
1850 } /* }}} static int journal_replay */
1851
1852 static void journal_init(void) /* {{{ */
1853 {
1854   int had_journal = 0;
1855
1856   if (journal_cur == NULL) return;
1857
1858   pthread_mutex_lock(&journal_lock);
1859
1860   RRDD_LOG(LOG_INFO, "checking for journal files");
1861
1862   had_journal += journal_replay(journal_old);
1863   had_journal += journal_replay(journal_cur);
1864
1865   /* it must have been a crash.  start a flush */
1866   if (had_journal && config_flush_at_shutdown)
1867     flush_old_values(-1);
1868
1869   pthread_mutex_unlock(&journal_lock);
1870   journal_rotate();
1871
1872   RRDD_LOG(LOG_INFO, "journal processing complete");
1873
1874 } /* }}} static void journal_init */
1875
1876 static void close_connection(listen_socket_t *sock)
1877 {
1878   close(sock->fd) ;  sock->fd   = -1;
1879   free(sock->rbuf);  sock->rbuf = NULL;
1880   free(sock->wbuf);  sock->wbuf = NULL;
1881
1882   free(sock);
1883 }
1884
1885 static void *connection_thread_main (void *args) /* {{{ */
1886 {
1887   pthread_t self;
1888   listen_socket_t *sock;
1889   int i;
1890   int fd;
1891
1892   sock = (listen_socket_t *) args;
1893   fd = sock->fd;
1894
1895   /* init read buffers */
1896   sock->next_read = sock->next_cmd = 0;
1897   sock->rbuf = malloc(RBUF_SIZE);
1898   if (sock->rbuf == NULL)
1899   {
1900     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1901     close_connection(sock);
1902     return NULL;
1903   }
1904
1905   pthread_mutex_lock (&connection_threads_lock);
1906   {
1907     pthread_t *temp;
1908
1909     temp = (pthread_t *) realloc (connection_threads,
1910         sizeof (pthread_t) * (connection_threads_num + 1));
1911     if (temp == NULL)
1912     {
1913       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1914     }
1915     else
1916     {
1917       connection_threads = temp;
1918       connection_threads[connection_threads_num] = pthread_self ();
1919       connection_threads_num++;
1920     }
1921   }
1922   pthread_mutex_unlock (&connection_threads_lock);
1923
1924   while (do_shutdown == 0)
1925   {
1926     char *cmd;
1927     ssize_t cmd_len;
1928     ssize_t rbytes;
1929     time_t now;
1930
1931     struct pollfd pollfd;
1932     int status;
1933
1934     pollfd.fd = fd;
1935     pollfd.events = POLLIN | POLLPRI;
1936     pollfd.revents = 0;
1937
1938     status = poll (&pollfd, 1, /* timeout = */ 500);
1939     if (do_shutdown)
1940       break;
1941     else if (status == 0) /* timeout */
1942       continue;
1943     else if (status < 0) /* error */
1944     {
1945       status = errno;
1946       if (status != EINTR)
1947         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1948       continue;
1949     }
1950
1951     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1952       break;
1953     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1954     {
1955       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1956           "poll(2) returned something unexpected: %#04hx",
1957           pollfd.revents);
1958       break;
1959     }
1960
1961     rbytes = read(fd, sock->rbuf + sock->next_read,
1962                   RBUF_SIZE - sock->next_read);
1963     if (rbytes < 0)
1964     {
1965       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
1966       break;
1967     }
1968     else if (rbytes == 0)
1969       break; /* eof */
1970
1971     sock->next_read += rbytes;
1972
1973     if (sock->batch_start)
1974       now = sock->batch_start;
1975     else
1976       now = time(NULL);
1977
1978     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
1979     {
1980       status = handle_request (sock, now, cmd, cmd_len+1);
1981       if (status != 0)
1982         goto out_close;
1983     }
1984   }
1985
1986 out_close:
1987   close_connection(sock);
1988
1989   self = pthread_self ();
1990   /* Remove this thread from the connection threads list */
1991   pthread_mutex_lock (&connection_threads_lock);
1992   /* Find out own index in the array */
1993   for (i = 0; i < connection_threads_num; i++)
1994     if (pthread_equal (connection_threads[i], self) != 0)
1995       break;
1996   assert (i < connection_threads_num);
1997
1998   /* Move the trailing threads forward. */
1999   if (i < (connection_threads_num - 1))
2000   {
2001     memmove (connection_threads + i,
2002         connection_threads + i + 1,
2003         sizeof (pthread_t) * (connection_threads_num - i - 1));
2004   }
2005
2006   connection_threads_num--;
2007   pthread_mutex_unlock (&connection_threads_lock);
2008
2009   return (NULL);
2010 } /* }}} void *connection_thread_main */
2011
2012 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2013 {
2014   int fd;
2015   struct sockaddr_un sa;
2016   listen_socket_t *temp;
2017   int status;
2018   const char *path;
2019
2020   path = sock->addr;
2021   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2022     path += strlen("unix:");
2023
2024   temp = (listen_socket_t *) realloc (listen_fds,
2025       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2026   if (temp == NULL)
2027   {
2028     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2029     return (-1);
2030   }
2031   listen_fds = temp;
2032   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2033
2034   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2035   if (fd < 0)
2036   {
2037     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2038              rrd_strerror(errno));
2039     return (-1);
2040   }
2041
2042   memset (&sa, 0, sizeof (sa));
2043   sa.sun_family = AF_UNIX;
2044   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2045
2046   /* if we've gotten this far, we own the pid file.  any daemon started
2047    * with the same args must not be alive.  therefore, ensure that we can
2048    * create the socket...
2049    */
2050   unlink(path);
2051
2052   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2053   if (status != 0)
2054   {
2055     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2056              path, rrd_strerror(errno));
2057     close (fd);
2058     return (-1);
2059   }
2060
2061   status = listen (fd, /* backlog = */ 10);
2062   if (status != 0)
2063   {
2064     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2065              path, rrd_strerror(errno));
2066     close (fd);
2067     unlink (path);
2068     return (-1);
2069   }
2070
2071   listen_fds[listen_fds_num].fd = fd;
2072   listen_fds[listen_fds_num].family = PF_UNIX;
2073   strncpy(listen_fds[listen_fds_num].addr, path,
2074           sizeof (listen_fds[listen_fds_num].addr) - 1);
2075   listen_fds_num++;
2076
2077   return (0);
2078 } /* }}} int open_listen_socket_unix */
2079
2080 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2081 {
2082   struct addrinfo ai_hints;
2083   struct addrinfo *ai_res;
2084   struct addrinfo *ai_ptr;
2085   char addr_copy[NI_MAXHOST];
2086   char *addr;
2087   char *port;
2088   int status;
2089
2090   strncpy (addr_copy, sock->addr, sizeof (addr_copy));
2091   addr_copy[sizeof (addr_copy) - 1] = 0;
2092   addr = addr_copy;
2093
2094   memset (&ai_hints, 0, sizeof (ai_hints));
2095   ai_hints.ai_flags = 0;
2096 #ifdef AI_ADDRCONFIG
2097   ai_hints.ai_flags |= AI_ADDRCONFIG;
2098 #endif
2099   ai_hints.ai_family = AF_UNSPEC;
2100   ai_hints.ai_socktype = SOCK_STREAM;
2101
2102   port = NULL;
2103   if (*addr == '[') /* IPv6+port format */
2104   {
2105     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2106     addr++;
2107
2108     port = strchr (addr, ']');
2109     if (port == NULL)
2110     {
2111       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2112       return (-1);
2113     }
2114     *port = 0;
2115     port++;
2116
2117     if (*port == ':')
2118       port++;
2119     else if (*port == 0)
2120       port = NULL;
2121     else
2122     {
2123       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2124       return (-1);
2125     }
2126   } /* if (*addr = ']') */
2127   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2128   {
2129     port = rindex(addr, ':');
2130     if (port != NULL)
2131     {
2132       *port = 0;
2133       port++;
2134     }
2135   }
2136   ai_res = NULL;
2137   status = getaddrinfo (addr,
2138                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2139                         &ai_hints, &ai_res);
2140   if (status != 0)
2141   {
2142     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2143              addr, gai_strerror (status));
2144     return (-1);
2145   }
2146
2147   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2148   {
2149     int fd;
2150     listen_socket_t *temp;
2151     int one = 1;
2152
2153     temp = (listen_socket_t *) realloc (listen_fds,
2154         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2155     if (temp == NULL)
2156     {
2157       fprintf (stderr,
2158                "rrdcached: open_listen_socket_network: realloc failed.\n");
2159       continue;
2160     }
2161     listen_fds = temp;
2162     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2163
2164     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2165     if (fd < 0)
2166     {
2167       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2168                rrd_strerror(errno));
2169       continue;
2170     }
2171
2172     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2173
2174     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2175     if (status != 0)
2176     {
2177       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2178                sock->addr, rrd_strerror(errno));
2179       close (fd);
2180       continue;
2181     }
2182
2183     status = listen (fd, /* backlog = */ 10);
2184     if (status != 0)
2185     {
2186       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2187                sock->addr, rrd_strerror(errno));
2188       close (fd);
2189       return (-1);
2190     }
2191
2192     listen_fds[listen_fds_num].fd = fd;
2193     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2194     listen_fds_num++;
2195   } /* for (ai_ptr) */
2196
2197   return (0);
2198 } /* }}} static int open_listen_socket_network */
2199
2200 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2201 {
2202   assert(sock != NULL);
2203   assert(sock->addr != NULL);
2204
2205   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2206       || sock->addr[0] == '/')
2207     return (open_listen_socket_unix(sock));
2208   else
2209     return (open_listen_socket_network(sock));
2210 } /* }}} int open_listen_socket */
2211
2212 static int close_listen_sockets (void) /* {{{ */
2213 {
2214   size_t i;
2215
2216   for (i = 0; i < listen_fds_num; i++)
2217   {
2218     close (listen_fds[i].fd);
2219
2220     if (listen_fds[i].family == PF_UNIX)
2221       unlink(listen_fds[i].addr);
2222   }
2223
2224   free (listen_fds);
2225   listen_fds = NULL;
2226   listen_fds_num = 0;
2227
2228   return (0);
2229 } /* }}} int close_listen_sockets */
2230
2231 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2232 {
2233   struct pollfd *pollfds;
2234   int pollfds_num;
2235   int status;
2236   int i;
2237
2238   if (listen_fds_num < 1)
2239   {
2240     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2241     return (NULL);
2242   }
2243
2244   pollfds_num = listen_fds_num;
2245   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2246   if (pollfds == NULL)
2247   {
2248     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2249     return (NULL);
2250   }
2251   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2252
2253   RRDD_LOG(LOG_INFO, "listening for connections");
2254
2255   while (do_shutdown == 0)
2256   {
2257     assert (pollfds_num == ((int) listen_fds_num));
2258     for (i = 0; i < pollfds_num; i++)
2259     {
2260       pollfds[i].fd = listen_fds[i].fd;
2261       pollfds[i].events = POLLIN | POLLPRI;
2262       pollfds[i].revents = 0;
2263     }
2264
2265     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2266     if (do_shutdown)
2267       break;
2268     else if (status == 0) /* timeout */
2269       continue;
2270     else if (status < 0) /* error */
2271     {
2272       status = errno;
2273       if (status != EINTR)
2274       {
2275         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2276       }
2277       continue;
2278     }
2279
2280     for (i = 0; i < pollfds_num; i++)
2281     {
2282       listen_socket_t *client_sock;
2283       struct sockaddr_storage client_sa;
2284       socklen_t client_sa_size;
2285       pthread_t tid;
2286       pthread_attr_t attr;
2287
2288       if (pollfds[i].revents == 0)
2289         continue;
2290
2291       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2292       {
2293         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2294             "poll(2) returned something unexpected for listen FD #%i.",
2295             pollfds[i].fd);
2296         continue;
2297       }
2298
2299       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2300       if (client_sock == NULL)
2301       {
2302         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2303         continue;
2304       }
2305       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2306
2307       client_sa_size = sizeof (client_sa);
2308       client_sock->fd = accept (pollfds[i].fd,
2309           (struct sockaddr *) &client_sa, &client_sa_size);
2310       if (client_sock->fd < 0)
2311       {
2312         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2313         free(client_sock);
2314         continue;
2315       }
2316
2317       pthread_attr_init (&attr);
2318       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2319
2320       status = pthread_create (&tid, &attr, connection_thread_main,
2321                                client_sock);
2322       if (status != 0)
2323       {
2324         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2325         close_connection(client_sock);
2326         continue;
2327       }
2328     } /* for (pollfds_num) */
2329   } /* while (do_shutdown == 0) */
2330
2331   RRDD_LOG(LOG_INFO, "starting shutdown");
2332
2333   close_listen_sockets ();
2334
2335   pthread_mutex_lock (&connection_threads_lock);
2336   while (connection_threads_num > 0)
2337   {
2338     pthread_t wait_for;
2339
2340     wait_for = connection_threads[0];
2341
2342     pthread_mutex_unlock (&connection_threads_lock);
2343     pthread_join (wait_for, /* retval = */ NULL);
2344     pthread_mutex_lock (&connection_threads_lock);
2345   }
2346   pthread_mutex_unlock (&connection_threads_lock);
2347
2348   return (NULL);
2349 } /* }}} void *listen_thread_main */
2350
2351 static int daemonize (void) /* {{{ */
2352 {
2353   int pid_fd;
2354   char *base_dir;
2355
2356   daemon_uid = geteuid();
2357
2358   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2359   if (pid_fd < 0)
2360     pid_fd = check_pidfile();
2361   if (pid_fd < 0)
2362     return pid_fd;
2363
2364   /* open all the listen sockets */
2365   if (config_listen_address_list_len > 0)
2366   {
2367     for (int i = 0; i < config_listen_address_list_len; i++)
2368       open_listen_socket (config_listen_address_list[i]);
2369   }
2370   else
2371   {
2372     listen_socket_t sock;
2373     memset(&sock, 0, sizeof(sock));
2374     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
2375     open_listen_socket (&sock);
2376   }
2377
2378   if (listen_fds_num < 1)
2379   {
2380     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2381     goto error;
2382   }
2383
2384   if (!stay_foreground)
2385   {
2386     pid_t child;
2387
2388     child = fork ();
2389     if (child < 0)
2390     {
2391       fprintf (stderr, "daemonize: fork(2) failed.\n");
2392       goto error;
2393     }
2394     else if (child > 0)
2395       exit(0);
2396
2397     /* Become session leader */
2398     setsid ();
2399
2400     /* Open the first three file descriptors to /dev/null */
2401     close (2);
2402     close (1);
2403     close (0);
2404
2405     open ("/dev/null", O_RDWR);
2406     dup (0);
2407     dup (0);
2408   } /* if (!stay_foreground) */
2409
2410   /* Change into the /tmp directory. */
2411   base_dir = (config_base_dir != NULL)
2412     ? config_base_dir
2413     : "/tmp";
2414
2415   if (chdir (base_dir) != 0)
2416   {
2417     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2418     goto error;
2419   }
2420
2421   install_signal_handlers();
2422
2423   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2424   RRDD_LOG(LOG_INFO, "starting up");
2425
2426   cache_tree = g_tree_new ((GCompareFunc) strcmp);
2427   if (cache_tree == NULL)
2428   {
2429     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2430     goto error;
2431   }
2432
2433   return write_pidfile (pid_fd);
2434
2435 error:
2436   remove_pidfile();
2437   return -1;
2438 } /* }}} int daemonize */
2439
2440 static int cleanup (void) /* {{{ */
2441 {
2442   do_shutdown++;
2443
2444   pthread_cond_signal (&cache_cond);
2445   pthread_join (queue_thread, /* return = */ NULL);
2446
2447   remove_pidfile ();
2448
2449   RRDD_LOG(LOG_INFO, "goodbye");
2450   closelog ();
2451
2452   return (0);
2453 } /* }}} int cleanup */
2454
2455 static int read_options (int argc, char **argv) /* {{{ */
2456 {
2457   int option;
2458   int status = 0;
2459
2460   while ((option = getopt(argc, argv, "gl:L:f:w:b:Bz:p:j:h?F")) != -1)
2461   {
2462     switch (option)
2463     {
2464       case 'g':
2465         stay_foreground=1;
2466         break;
2467
2468       case 'L':
2469       case 'l':
2470       {
2471         listen_socket_t **temp;
2472         listen_socket_t *new;
2473
2474         new = malloc(sizeof(listen_socket_t));
2475         if (new == NULL)
2476         {
2477           fprintf(stderr, "read_options: malloc failed.\n");
2478           return(2);
2479         }
2480         memset(new, 0, sizeof(listen_socket_t));
2481
2482         temp = (listen_socket_t **) realloc (config_listen_address_list,
2483             sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2484         if (temp == NULL)
2485         {
2486           fprintf (stderr, "read_options: realloc failed.\n");
2487           return (2);
2488         }
2489         config_listen_address_list = temp;
2490
2491         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2492         new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2493
2494         temp[config_listen_address_list_len] = new;
2495         config_listen_address_list_len++;
2496       }
2497       break;
2498
2499       case 'f':
2500       {
2501         int temp;
2502
2503         temp = atoi (optarg);
2504         if (temp > 0)
2505           config_flush_interval = temp;
2506         else
2507         {
2508           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2509           status = 3;
2510         }
2511       }
2512       break;
2513
2514       case 'w':
2515       {
2516         int temp;
2517
2518         temp = atoi (optarg);
2519         if (temp > 0)
2520           config_write_interval = temp;
2521         else
2522         {
2523           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2524           status = 2;
2525         }
2526       }
2527       break;
2528
2529       case 'z':
2530       {
2531         int temp;
2532
2533         temp = atoi(optarg);
2534         if (temp > 0)
2535           config_write_jitter = temp;
2536         else
2537         {
2538           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2539           status = 2;
2540         }
2541
2542         break;
2543       }
2544
2545       case 'B':
2546         config_write_base_only = 1;
2547         break;
2548
2549       case 'b':
2550       {
2551         size_t len;
2552         char base_realpath[PATH_MAX];
2553
2554         if (config_base_dir != NULL)
2555           free (config_base_dir);
2556         config_base_dir = strdup (optarg);
2557         if (config_base_dir == NULL)
2558         {
2559           fprintf (stderr, "read_options: strdup failed.\n");
2560           return (3);
2561         }
2562
2563         /* make sure that the base directory is not resolved via
2564          * symbolic links.  this makes some performance-enhancing
2565          * assumptions possible (we don't have to resolve paths
2566          * that start with a "/")
2567          */
2568         if (realpath(config_base_dir, base_realpath) == NULL)
2569         {
2570           fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2571           return 5;
2572         }
2573         else if (strncmp(config_base_dir,
2574                          base_realpath, sizeof(base_realpath)) != 0)
2575         {
2576           fprintf(stderr,
2577                   "Base directory (-b) resolved via file system links!\n"
2578                   "Please consult rrdcached '-b' documentation!\n"
2579                   "Consider specifying the real directory (%s)\n",
2580                   base_realpath);
2581           return 5;
2582         }
2583
2584         len = strlen (config_base_dir);
2585         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2586         {
2587           config_base_dir[len - 1] = 0;
2588           len--;
2589         }
2590
2591         if (len < 1)
2592         {
2593           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2594           return (4);
2595         }
2596
2597         _config_base_dir_len = len;
2598       }
2599       break;
2600
2601       case 'p':
2602       {
2603         if (config_pid_file != NULL)
2604           free (config_pid_file);
2605         config_pid_file = strdup (optarg);
2606         if (config_pid_file == NULL)
2607         {
2608           fprintf (stderr, "read_options: strdup failed.\n");
2609           return (3);
2610         }
2611       }
2612       break;
2613
2614       case 'F':
2615         config_flush_at_shutdown = 1;
2616         break;
2617
2618       case 'j':
2619       {
2620         struct stat statbuf;
2621         const char *dir = optarg;
2622
2623         status = stat(dir, &statbuf);
2624         if (status != 0)
2625         {
2626           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2627           return 6;
2628         }
2629
2630         if (!S_ISDIR(statbuf.st_mode)
2631             || access(dir, R_OK|W_OK|X_OK) != 0)
2632         {
2633           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2634                   errno ? rrd_strerror(errno) : "");
2635           return 6;
2636         }
2637
2638         journal_cur = malloc(PATH_MAX + 1);
2639         journal_old = malloc(PATH_MAX + 1);
2640         if (journal_cur == NULL || journal_old == NULL)
2641         {
2642           fprintf(stderr, "malloc failure for journal files\n");
2643           return 6;
2644         }
2645         else 
2646         {
2647           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2648           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2649         }
2650       }
2651       break;
2652
2653       case 'h':
2654       case '?':
2655         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2656             "\n"
2657             "Usage: rrdcached [options]\n"
2658             "\n"
2659             "Valid options are:\n"
2660             "  -l <address>  Socket address to listen to.\n"
2661             "  -L <address>  Socket address to listen to ('FLUSH' only).\n"
2662             "  -w <seconds>  Interval in which to write data.\n"
2663             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2664             "  -f <seconds>  Interval in which to flush dead data.\n"
2665             "  -p <file>     Location of the PID-file.\n"
2666             "  -b <dir>      Base directory to change to.\n"
2667             "  -B            Restrict file access to paths within -b <dir>\n"
2668             "  -g            Do not fork and run in the foreground.\n"
2669             "  -j <dir>      Directory in which to create the journal files.\n"
2670             "  -F            Always flush all updates at shutdown\n"
2671             "\n"
2672             "For more information and a detailed description of all options "
2673             "please refer\n"
2674             "to the rrdcached(1) manual page.\n",
2675             VERSION);
2676         status = -1;
2677         break;
2678     } /* switch (option) */
2679   } /* while (getopt) */
2680
2681   /* advise the user when values are not sane */
2682   if (config_flush_interval < 2 * config_write_interval)
2683     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2684             " 2x write interval (-w) !\n");
2685   if (config_write_jitter > config_write_interval)
2686     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2687             " write interval (-w) !\n");
2688
2689   if (config_write_base_only && config_base_dir == NULL)
2690     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2691             "  Consult the rrdcached documentation\n");
2692
2693   if (journal_cur == NULL)
2694     config_flush_at_shutdown = 1;
2695
2696   return (status);
2697 } /* }}} int read_options */
2698
2699 int main (int argc, char **argv)
2700 {
2701   int status;
2702
2703   status = read_options (argc, argv);
2704   if (status != 0)
2705   {
2706     if (status < 0)
2707       status = 0;
2708     return (status);
2709   }
2710
2711   status = daemonize ();
2712   if (status != 0)
2713   {
2714     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
2715     return (1);
2716   }
2717
2718   journal_init();
2719
2720   /* start the queue thread */
2721   memset (&queue_thread, 0, sizeof (queue_thread));
2722   status = pthread_create (&queue_thread,
2723                            NULL, /* attr */
2724                            queue_thread_main,
2725                            NULL); /* args */
2726   if (status != 0)
2727   {
2728     RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2729     cleanup();
2730     return (1);
2731   }
2732
2733   listen_thread_main (NULL);
2734   cleanup ();
2735
2736   return (0);
2737 } /* int main */
2738
2739 /*
2740  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2741  */