511b04e02a3c923b4a851aa5ae7026b7355e28d7
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
67
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
76
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
91
92 #include <glib-2.0/glib.h>
93 /* }}} */
94
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
96
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
100
101 /*
102  * Types
103  */
104 typedef enum
105 {
106   PRIV_LOW,
107   PRIV_HIGH
108 } socket_privilege;
109
110 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
111
112 struct listen_socket_s
113 {
114   int fd;
115   char addr[PATH_MAX + 1];
116   int family;
117   socket_privilege privilege;
118
119   /* state for BATCH processing */
120   time_t batch_start;
121   int batch_cmd;
122
123   /* buffered IO */
124   char *rbuf;
125   off_t next_cmd;
126   off_t next_read;
127
128   char *wbuf;
129   ssize_t wbuf_len;
130 };
131 typedef struct listen_socket_s listen_socket_t;
132
133 struct cache_item_s;
134 typedef struct cache_item_s cache_item_t;
135 struct cache_item_s
136 {
137   char *file;
138   char **values;
139   int values_num;
140   time_t last_flush_time;
141   time_t last_update_stamp;
142 #define CI_FLAGS_IN_TREE  (1<<0)
143 #define CI_FLAGS_IN_QUEUE (1<<1)
144   int flags;
145   pthread_cond_t  flushed;
146   cache_item_t *prev;
147   cache_item_t *next;
148 };
149
150 struct callback_flush_data_s
151 {
152   time_t now;
153   time_t abs_timeout;
154   char **keys;
155   size_t keys_num;
156 };
157 typedef struct callback_flush_data_s callback_flush_data_t;
158
159 enum queue_side_e
160 {
161   HEAD,
162   TAIL
163 };
164 typedef enum queue_side_e queue_side_t;
165
166 /* max length of socket command or response */
167 #define CMD_MAX 4096
168 #define RBUF_SIZE (CMD_MAX*2)
169
170 /*
171  * Variables
172  */
173 static int stay_foreground = 0;
174 static uid_t daemon_uid;
175
176 static listen_socket_t *listen_fds = NULL;
177 static size_t listen_fds_num = 0;
178
179 static int do_shutdown = 0;
180
181 static pthread_t queue_thread;
182
183 static pthread_t *connection_threads = NULL;
184 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
185 static int connection_threads_num = 0;
186
187 /* Cache stuff */
188 static GTree          *cache_tree = NULL;
189 static cache_item_t   *cache_queue_head = NULL;
190 static cache_item_t   *cache_queue_tail = NULL;
191 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
192 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
193
194 static int config_write_interval = 300;
195 static int config_write_jitter   = 0;
196 static int config_flush_interval = 3600;
197 static int config_flush_at_shutdown = 0;
198 static char *config_pid_file = NULL;
199 static char *config_base_dir = NULL;
200 static size_t _config_base_dir_len = 0;
201 static int config_write_base_only = 0;
202
203 static listen_socket_t **config_listen_address_list = NULL;
204 static int config_listen_address_list_len = 0;
205
206 static uint64_t stats_queue_length = 0;
207 static uint64_t stats_updates_received = 0;
208 static uint64_t stats_flush_received = 0;
209 static uint64_t stats_updates_written = 0;
210 static uint64_t stats_data_sets_written = 0;
211 static uint64_t stats_journal_bytes = 0;
212 static uint64_t stats_journal_rotate = 0;
213 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
214
215 /* Journaled updates */
216 static char *journal_cur = NULL;
217 static char *journal_old = NULL;
218 static FILE *journal_fh = NULL;
219 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
220 static int journal_write(char *cmd, char *args);
221 static void journal_done(void);
222 static void journal_rotate(void);
223
224 /* 
225  * Functions
226  */
227 static void sig_common (const char *sig) /* {{{ */
228 {
229   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
230   do_shutdown++;
231   pthread_cond_broadcast(&cache_cond);
232 } /* }}} void sig_common */
233
234 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
235 {
236   sig_common("INT");
237 } /* }}} void sig_int_handler */
238
239 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
240 {
241   sig_common("TERM");
242 } /* }}} void sig_term_handler */
243
244 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
245 {
246   config_flush_at_shutdown = 1;
247   sig_common("USR1");
248 } /* }}} void sig_usr1_handler */
249
250 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
251 {
252   config_flush_at_shutdown = 0;
253   sig_common("USR2");
254 } /* }}} void sig_usr2_handler */
255
256 static void install_signal_handlers(void) /* {{{ */
257 {
258   /* These structures are static, because `sigaction' behaves weird if the are
259    * overwritten.. */
260   static struct sigaction sa_int;
261   static struct sigaction sa_term;
262   static struct sigaction sa_pipe;
263   static struct sigaction sa_usr1;
264   static struct sigaction sa_usr2;
265
266   /* Install signal handlers */
267   memset (&sa_int, 0, sizeof (sa_int));
268   sa_int.sa_handler = sig_int_handler;
269   sigaction (SIGINT, &sa_int, NULL);
270
271   memset (&sa_term, 0, sizeof (sa_term));
272   sa_term.sa_handler = sig_term_handler;
273   sigaction (SIGTERM, &sa_term, NULL);
274
275   memset (&sa_pipe, 0, sizeof (sa_pipe));
276   sa_pipe.sa_handler = SIG_IGN;
277   sigaction (SIGPIPE, &sa_pipe, NULL);
278
279   memset (&sa_pipe, 0, sizeof (sa_usr1));
280   sa_usr1.sa_handler = sig_usr1_handler;
281   sigaction (SIGUSR1, &sa_usr1, NULL);
282
283   memset (&sa_usr2, 0, sizeof (sa_usr2));
284   sa_usr2.sa_handler = sig_usr2_handler;
285   sigaction (SIGUSR2, &sa_usr2, NULL);
286
287 } /* }}} void install_signal_handlers */
288
289 static int open_pidfile(void) /* {{{ */
290 {
291   int fd;
292   char *file;
293
294   file = (config_pid_file != NULL)
295     ? config_pid_file
296     : LOCALSTATEDIR "/run/rrdcached.pid";
297
298   fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
299   if (fd < 0)
300     fprintf(stderr, "FATAL: cannot create '%s' (%s)\n",
301             file, rrd_strerror(errno));
302
303   return(fd);
304 } /* }}} static int open_pidfile */
305
306 static int write_pidfile (int fd) /* {{{ */
307 {
308   pid_t pid;
309   FILE *fh;
310
311   pid = getpid ();
312
313   fh = fdopen (fd, "w");
314   if (fh == NULL)
315   {
316     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
317     close(fd);
318     return (-1);
319   }
320
321   fprintf (fh, "%i\n", (int) pid);
322   fclose (fh);
323
324   return (0);
325 } /* }}} int write_pidfile */
326
327 static int remove_pidfile (void) /* {{{ */
328 {
329   char *file;
330   int status;
331
332   file = (config_pid_file != NULL)
333     ? config_pid_file
334     : LOCALSTATEDIR "/run/rrdcached.pid";
335
336   status = unlink (file);
337   if (status == 0)
338     return (0);
339   return (errno);
340 } /* }}} int remove_pidfile */
341
342 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
343 {
344   char *eol;
345
346   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
347                sock->next_read - sock->next_cmd);
348
349   if (eol == NULL)
350   {
351     /* no commands left, move remainder back to front of rbuf */
352     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
353             sock->next_read - sock->next_cmd);
354     sock->next_read -= sock->next_cmd;
355     sock->next_cmd = 0;
356     *len = 0;
357     return NULL;
358   }
359   else
360   {
361     char *cmd = sock->rbuf + sock->next_cmd;
362     *eol = '\0';
363
364     sock->next_cmd = eol - sock->rbuf + 1;
365
366     if (eol > sock->rbuf && *(eol-1) == '\r')
367       *(--eol) = '\0'; /* handle "\r\n" EOL */
368
369     *len = eol - cmd;
370
371     return cmd;
372   }
373
374   /* NOTREACHED */
375   assert(1==0);
376 }
377
378 /* add the characters directly to the write buffer */
379 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
380 {
381   char *new_buf;
382
383   assert(sock != NULL);
384
385   new_buf = realloc(sock->wbuf, sock->wbuf_len + len + 1);
386   if (new_buf == NULL)
387   {
388     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
389     return -1;
390   }
391
392   strncpy(new_buf + sock->wbuf_len, str, len + 1);
393
394   sock->wbuf = new_buf;
395   sock->wbuf_len += len;
396
397   return 0;
398 } /* }}} static int add_to_wbuf */
399
400 /* add the text to the "extra" info that's sent after the status line */
401 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
402 {
403   va_list argp;
404   char buffer[CMD_MAX];
405   int len;
406
407   if (sock == NULL) return 0; /* journal replay mode */
408   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
409
410   va_start(argp, fmt);
411 #ifdef HAVE_VSNPRINTF
412   len = vsnprintf(buffer, sizeof(buffer)-1, fmt, argp);
413 #else
414   len = vsprintf(buffer, fmt, argp);
415 #endif
416   va_end(argp);
417   if (len < 0)
418   {
419     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
420     return -1;
421   }
422
423   return add_to_wbuf(sock, buffer, len);
424 } /* }}} static int add_response_info */
425
426 static int count_lines(char *str) /* {{{ */
427 {
428   int lines = 0;
429
430   if (str != NULL)
431   {
432     while ((str = strchr(str, '\n')) != NULL)
433     {
434       ++lines;
435       ++str;
436     }
437   }
438
439   return lines;
440 } /* }}} static int count_lines */
441
442 /* send the response back to the user.
443  * returns 0 on success, -1 on error
444  * write buffer is always zeroed after this call */
445 static int send_response (listen_socket_t *sock, response_code rc,
446                           char *fmt, ...) /* {{{ */
447 {
448   va_list argp;
449   char buffer[CMD_MAX];
450   int lines;
451   ssize_t wrote;
452   int rclen, len;
453
454   if (sock == NULL) return rc;  /* journal replay mode */
455
456   if (sock->batch_start)
457   {
458     if (rc == RESP_OK)
459       return rc; /* no response on success during BATCH */
460     lines = sock->batch_cmd;
461   }
462   else if (rc == RESP_OK)
463     lines = count_lines(sock->wbuf);
464   else
465     lines = -1;
466
467   rclen = sprintf(buffer, "%d ", lines);
468   va_start(argp, fmt);
469 #ifdef HAVE_VSNPRINTF
470   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen-1, fmt, argp);
471 #else
472   len = vsprintf(buffer+rclen, fmt, argp);
473 #endif
474   va_end(argp);
475   if (len < 0)
476     return -1;
477
478   len += rclen;
479
480   /* append the result to the wbuf, don't write to the user */
481   if (sock->batch_start)
482     return add_to_wbuf(sock, buffer, len);
483
484   /* first write must be complete */
485   if (len != write(sock->fd, buffer, len))
486   {
487     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
488     return -1;
489   }
490
491   if (sock->wbuf != NULL && rc == RESP_OK)
492   {
493     wrote = 0;
494     while (wrote < sock->wbuf_len)
495     {
496       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
497       if (wb <= 0)
498       {
499         RRDD_LOG(LOG_INFO, "send_response: could not write results");
500         return -1;
501       }
502       wrote += wb;
503     }
504   }
505
506   free(sock->wbuf); sock->wbuf = NULL;
507   sock->wbuf_len = 0;
508
509   return 0;
510 } /* }}} */
511
512 static void wipe_ci_values(cache_item_t *ci, time_t when)
513 {
514   ci->values = NULL;
515   ci->values_num = 0;
516
517   ci->last_flush_time = when;
518   if (config_write_jitter > 0)
519     ci->last_flush_time += (random() % config_write_jitter);
520 }
521
522 /* remove_from_queue
523  * remove a "cache_item_t" item from the queue.
524  * must hold 'cache_lock' when calling this
525  */
526 static void remove_from_queue(cache_item_t *ci) /* {{{ */
527 {
528   if (ci == NULL) return;
529
530   if (ci->prev == NULL)
531     cache_queue_head = ci->next; /* reset head */
532   else
533     ci->prev->next = ci->next;
534
535   if (ci->next == NULL)
536     cache_queue_tail = ci->prev; /* reset the tail */
537   else
538     ci->next->prev = ci->prev;
539
540   ci->next = ci->prev = NULL;
541   ci->flags &= ~CI_FLAGS_IN_QUEUE;
542 } /* }}} static void remove_from_queue */
543
544 /* remove an entry from the tree and free all its resources.
545  * must hold 'cache lock' while calling this.
546  * returns 0 on success, otherwise errno */
547 static int forget_file(const char *file)
548 {
549   cache_item_t *ci;
550
551   ci = g_tree_lookup(cache_tree, file);
552   if (ci == NULL)
553     return ENOENT;
554
555   g_tree_remove (cache_tree, file);
556   remove_from_queue(ci);
557
558   for (int i=0; i < ci->values_num; i++)
559     free(ci->values[i]);
560
561   free (ci->values);
562   free (ci->file);
563
564   /* in case anyone is waiting */
565   pthread_cond_broadcast(&ci->flushed);
566
567   free (ci);
568
569   return 0;
570 } /* }}} static int forget_file */
571
572 /*
573  * enqueue_cache_item:
574  * `cache_lock' must be acquired before calling this function!
575  */
576 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
577     queue_side_t side)
578 {
579   if (ci == NULL)
580     return (-1);
581
582   if (ci->values_num == 0)
583     return (0);
584
585   if (side == HEAD)
586   {
587     if (cache_queue_head == ci)
588       return 0;
589
590     /* remove from the double linked list */
591     if (ci->flags & CI_FLAGS_IN_QUEUE)
592       remove_from_queue(ci);
593
594     ci->prev = NULL;
595     ci->next = cache_queue_head;
596     if (ci->next != NULL)
597       ci->next->prev = ci;
598     cache_queue_head = ci;
599
600     if (cache_queue_tail == NULL)
601       cache_queue_tail = cache_queue_head;
602   }
603   else /* (side == TAIL) */
604   {
605     /* We don't move values back in the list.. */
606     if (ci->flags & CI_FLAGS_IN_QUEUE)
607       return (0);
608
609     assert (ci->next == NULL);
610     assert (ci->prev == NULL);
611
612     ci->prev = cache_queue_tail;
613
614     if (cache_queue_tail == NULL)
615       cache_queue_head = ci;
616     else
617       cache_queue_tail->next = ci;
618
619     cache_queue_tail = ci;
620   }
621
622   ci->flags |= CI_FLAGS_IN_QUEUE;
623
624   pthread_cond_broadcast(&cache_cond);
625   pthread_mutex_lock (&stats_lock);
626   stats_queue_length++;
627   pthread_mutex_unlock (&stats_lock);
628
629   return (0);
630 } /* }}} int enqueue_cache_item */
631
632 /*
633  * tree_callback_flush:
634  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
635  * while this is in progress.
636  */
637 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
638     gpointer data)
639 {
640   cache_item_t *ci;
641   callback_flush_data_t *cfd;
642
643   ci = (cache_item_t *) value;
644   cfd = (callback_flush_data_t *) data;
645
646   if ((ci->last_flush_time <= cfd->abs_timeout)
647       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
648       && (ci->values_num > 0))
649   {
650     enqueue_cache_item (ci, TAIL);
651   }
652   else if ((do_shutdown != 0)
653       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
654       && (ci->values_num > 0))
655   {
656     enqueue_cache_item (ci, TAIL);
657   }
658   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
659       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
660       && (ci->values_num <= 0))
661   {
662     char **temp;
663
664     temp = (char **) realloc (cfd->keys,
665         sizeof (char *) * (cfd->keys_num + 1));
666     if (temp == NULL)
667     {
668       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
669       return (FALSE);
670     }
671     cfd->keys = temp;
672     /* Make really sure this points to the _same_ place */
673     assert ((char *) key == ci->file);
674     cfd->keys[cfd->keys_num] = (char *) key;
675     cfd->keys_num++;
676   }
677
678   return (FALSE);
679 } /* }}} gboolean tree_callback_flush */
680
681 static int flush_old_values (int max_age)
682 {
683   callback_flush_data_t cfd;
684   size_t k;
685
686   memset (&cfd, 0, sizeof (cfd));
687   /* Pass the current time as user data so that we don't need to call
688    * `time' for each node. */
689   cfd.now = time (NULL);
690   cfd.keys = NULL;
691   cfd.keys_num = 0;
692
693   if (max_age > 0)
694     cfd.abs_timeout = cfd.now - max_age;
695   else
696     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
697
698   /* `tree_callback_flush' will return the keys of all values that haven't
699    * been touched in the last `config_flush_interval' seconds in `cfd'.
700    * The char*'s in this array point to the same memory as ci->file, so we
701    * don't need to free them separately. */
702   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
703
704   for (k = 0; k < cfd.keys_num; k++)
705   {
706     /* should never fail, since we have held the cache_lock
707      * the entire time */
708     assert( forget_file(cfd.keys[k]) == 0 );
709   }
710
711   if (cfd.keys != NULL)
712   {
713     free (cfd.keys);
714     cfd.keys = NULL;
715   }
716
717   return (0);
718 } /* int flush_old_values */
719
720 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
721 {
722   struct timeval now;
723   struct timespec next_flush;
724   int final_flush = 0; /* make sure we only flush once on shutdown */
725
726   gettimeofday (&now, NULL);
727   next_flush.tv_sec = now.tv_sec + config_flush_interval;
728   next_flush.tv_nsec = 1000 * now.tv_usec;
729
730   pthread_mutex_lock (&cache_lock);
731   while ((do_shutdown == 0) || (cache_queue_head != NULL))
732   {
733     cache_item_t *ci;
734     char *file;
735     char **values;
736     int values_num;
737     int status;
738     int i;
739
740     /* First, check if it's time to do the cache flush. */
741     gettimeofday (&now, NULL);
742     if ((now.tv_sec > next_flush.tv_sec)
743         || ((now.tv_sec == next_flush.tv_sec)
744           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
745     {
746       /* Flush all values that haven't been written in the last
747        * `config_write_interval' seconds. */
748       flush_old_values (config_write_interval);
749
750       /* Determine the time of the next cache flush. */
751       next_flush.tv_sec =
752         now.tv_sec + next_flush.tv_sec % config_flush_interval;
753
754       /* unlock the cache while we rotate so we don't block incoming
755        * updates if the fsync() blocks on disk I/O */
756       pthread_mutex_unlock(&cache_lock);
757       journal_rotate();
758       pthread_mutex_lock(&cache_lock);
759     }
760
761     /* Now, check if there's something to store away. If not, wait until
762      * something comes in or it's time to do the cache flush.  if we are
763      * shutting down, do not wait around.  */
764     if (cache_queue_head == NULL && !do_shutdown)
765     {
766       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
767       if ((status != 0) && (status != ETIMEDOUT))
768       {
769         RRDD_LOG (LOG_ERR, "queue_thread_main: "
770             "pthread_cond_timedwait returned %i.", status);
771       }
772     }
773
774     /* We're about to shut down */
775     if (do_shutdown != 0 && !final_flush++)
776     {
777       if (config_flush_at_shutdown)
778         flush_old_values (-1); /* flush everything */
779       else
780         break;
781     }
782
783     /* Check if a value has arrived. This may be NULL if we timed out or there
784      * was an interrupt such as a signal. */
785     if (cache_queue_head == NULL)
786       continue;
787
788     ci = cache_queue_head;
789
790     /* copy the relevant parts */
791     file = strdup (ci->file);
792     if (file == NULL)
793     {
794       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
795       continue;
796     }
797
798     assert(ci->values != NULL);
799     assert(ci->values_num > 0);
800
801     values = ci->values;
802     values_num = ci->values_num;
803
804     wipe_ci_values(ci, time(NULL));
805     remove_from_queue(ci);
806
807     pthread_mutex_lock (&stats_lock);
808     assert (stats_queue_length > 0);
809     stats_queue_length--;
810     pthread_mutex_unlock (&stats_lock);
811
812     pthread_mutex_unlock (&cache_lock);
813
814     rrd_clear_error ();
815     status = rrd_update_r (file, NULL, values_num, (void *) values);
816     if (status != 0)
817     {
818       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
819           "rrd_update_r (%s) failed with status %i. (%s)",
820           file, status, rrd_get_error());
821     }
822
823     journal_write("wrote", file);
824     pthread_cond_broadcast(&ci->flushed);
825
826     for (i = 0; i < values_num; i++)
827       free (values[i]);
828
829     free(values);
830     free(file);
831
832     if (status == 0)
833     {
834       pthread_mutex_lock (&stats_lock);
835       stats_updates_written++;
836       stats_data_sets_written += values_num;
837       pthread_mutex_unlock (&stats_lock);
838     }
839
840     pthread_mutex_lock (&cache_lock);
841
842     /* We're about to shut down */
843     if (do_shutdown != 0 && !final_flush++)
844     {
845       if (config_flush_at_shutdown)
846           flush_old_values (-1); /* flush everything */
847       else
848         break;
849     }
850   } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
851   pthread_mutex_unlock (&cache_lock);
852
853   if (config_flush_at_shutdown)
854   {
855     assert(cache_queue_head == NULL);
856     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
857   }
858
859   journal_done();
860
861   return (NULL);
862 } /* }}} void *queue_thread_main */
863
864 static int buffer_get_field (char **buffer_ret, /* {{{ */
865     size_t *buffer_size_ret, char **field_ret)
866 {
867   char *buffer;
868   size_t buffer_pos;
869   size_t buffer_size;
870   char *field;
871   size_t field_size;
872   int status;
873
874   buffer = *buffer_ret;
875   buffer_pos = 0;
876   buffer_size = *buffer_size_ret;
877   field = *buffer_ret;
878   field_size = 0;
879
880   if (buffer_size <= 0)
881     return (-1);
882
883   /* This is ensured by `handle_request'. */
884   assert (buffer[buffer_size - 1] == '\0');
885
886   status = -1;
887   while (buffer_pos < buffer_size)
888   {
889     /* Check for end-of-field or end-of-buffer */
890     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
891     {
892       field[field_size] = 0;
893       field_size++;
894       buffer_pos++;
895       status = 0;
896       break;
897     }
898     /* Handle escaped characters. */
899     else if (buffer[buffer_pos] == '\\')
900     {
901       if (buffer_pos >= (buffer_size - 1))
902         break;
903       buffer_pos++;
904       field[field_size] = buffer[buffer_pos];
905       field_size++;
906       buffer_pos++;
907     }
908     /* Normal operation */ 
909     else
910     {
911       field[field_size] = buffer[buffer_pos];
912       field_size++;
913       buffer_pos++;
914     }
915   } /* while (buffer_pos < buffer_size) */
916
917   if (status != 0)
918     return (status);
919
920   *buffer_ret = buffer + buffer_pos;
921   *buffer_size_ret = buffer_size - buffer_pos;
922   *field_ret = field;
923
924   return (0);
925 } /* }}} int buffer_get_field */
926
927 /* if we're restricting writes to the base directory,
928  * check whether the file falls within the dir
929  * returns 1 if OK, otherwise 0
930  */
931 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
932 {
933   assert(file != NULL);
934
935   if (!config_write_base_only
936       || sock == NULL /* journal replay */
937       || config_base_dir == NULL)
938     return 1;
939
940   if (strstr(file, "../") != NULL) goto err;
941
942   /* relative paths without "../" are ok */
943   if (*file != '/') return 1;
944
945   /* file must be of the format base + "/" + <1+ char filename> */
946   if (strlen(file) < _config_base_dir_len + 2) goto err;
947   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
948   if (*(file + _config_base_dir_len) != '/') goto err;
949
950   return 1;
951
952 err:
953   if (sock != NULL && sock->fd >= 0)
954     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
955
956   return 0;
957 } /* }}} static int check_file_access */
958
959 /* when using a base dir, convert relative paths to absolute paths.
960  * if necessary, modifies the "filename" pointer to point
961  * to the new path created in "tmp".  "tmp" is provided
962  * by the caller and sizeof(tmp) must be >= PATH_MAX.
963  *
964  * this allows us to optimize for the expected case (absolute path)
965  * with a no-op.
966  */
967 static void get_abs_path(char **filename, char *tmp)
968 {
969   assert(tmp != NULL);
970   assert(filename != NULL && *filename != NULL);
971
972   if (config_base_dir == NULL || **filename == '/')
973     return;
974
975   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
976   *filename = tmp;
977 } /* }}} static int get_abs_path */
978
979 /* returns 1 if we have the required privilege level,
980  * otherwise issue an error to the user on sock */
981 static int has_privilege (listen_socket_t *sock, /* {{{ */
982                           socket_privilege priv)
983 {
984   if (sock == NULL) /* journal replay */
985     return 1;
986
987   if (sock->privilege >= priv)
988     return 1;
989
990   return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
991 } /* }}} static int has_privilege */
992
993 static int flush_file (const char *filename) /* {{{ */
994 {
995   cache_item_t *ci;
996
997   pthread_mutex_lock (&cache_lock);
998
999   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1000   if (ci == NULL)
1001   {
1002     pthread_mutex_unlock (&cache_lock);
1003     return (ENOENT);
1004   }
1005
1006   if (ci->values_num > 0)
1007   {
1008     /* Enqueue at head */
1009     enqueue_cache_item (ci, HEAD);
1010     pthread_cond_wait(&ci->flushed, &cache_lock);
1011   }
1012
1013   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1014    * may have been purged during our cond_wait() */
1015
1016   pthread_mutex_unlock(&cache_lock);
1017
1018   return (0);
1019 } /* }}} int flush_file */
1020
1021 static int handle_request_help (listen_socket_t *sock, /* {{{ */
1022     char *buffer, size_t buffer_size)
1023 {
1024   int status;
1025   char **help_text;
1026   char *command;
1027
1028   char *help_help[2] =
1029   {
1030     "Command overview\n"
1031     ,
1032     "HELP [<command>]\n"
1033     "FLUSH <filename>\n"
1034     "FLUSHALL\n"
1035     "PENDING <filename>\n"
1036     "FORGET <filename>\n"
1037     "UPDATE <filename> <values> [<values> ...]\n"
1038     "BATCH\n"
1039     "STATS\n"
1040   };
1041
1042   char *help_flush[2] =
1043   {
1044     "Help for FLUSH\n"
1045     ,
1046     "Usage: FLUSH <filename>\n"
1047     "\n"
1048     "Adds the given filename to the head of the update queue and returns\n"
1049     "after is has been dequeued.\n"
1050   };
1051
1052   char *help_flushall[2] =
1053   {
1054     "Help for FLUSHALL\n"
1055     ,
1056     "Usage: FLUSHALL\n"
1057     "\n"
1058     "Triggers writing of all pending updates.  Returns immediately.\n"
1059   };
1060
1061   char *help_pending[2] =
1062   {
1063     "Help for PENDING\n"
1064     ,
1065     "Usage: PENDING <filename>\n"
1066     "\n"
1067     "Shows any 'pending' updates for a file, in order.\n"
1068     "The updates shown have not yet been written to the underlying RRD file.\n"
1069   };
1070
1071   char *help_forget[2] =
1072   {
1073     "Help for FORGET\n"
1074     ,
1075     "Usage: FORGET <filename>\n"
1076     "\n"
1077     "Removes the file completely from the cache.\n"
1078     "Any pending updates for the file will be lost.\n"
1079   };
1080
1081   char *help_update[2] =
1082   {
1083     "Help for UPDATE\n"
1084     ,
1085     "Usage: UPDATE <filename> <values> [<values> ...]\n"
1086     "\n"
1087     "Adds the given file to the internal cache if it is not yet known and\n"
1088     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1089     "for details.\n"
1090     "\n"
1091     "Each <values> has the following form:\n"
1092     "  <values> = <time>:<value>[:<value>[...]]\n"
1093     "See the rrdupdate(1) manpage for details.\n"
1094   };
1095
1096   char *help_stats[2] =
1097   {
1098     "Help for STATS\n"
1099     ,
1100     "Usage: STATS\n"
1101     "\n"
1102     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1103     "a description of the values.\n"
1104   };
1105
1106   char *help_batch[2] =
1107   {
1108     "Help for BATCH\n"
1109     ,
1110     "The 'BATCH' command permits the client to initiate a bulk load\n"
1111     "   of commands to rrdcached.\n"
1112     "\n"
1113     "Usage:\n"
1114     "\n"
1115     "    client: BATCH\n"
1116     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1117     "    client: command #1\n"
1118     "    client: command #2\n"
1119     "    client: ... and so on\n"
1120     "    client: .\n"
1121     "    server: 2 errors\n"
1122     "    server: 7 message for command #7\n"
1123     "    server: 9 message for command #9\n"
1124     "\n"
1125     "For more information, consult the rrdcached(1) documentation.\n"
1126   };
1127
1128   status = buffer_get_field (&buffer, &buffer_size, &command);
1129   if (status != 0)
1130     help_text = help_help;
1131   else
1132   {
1133     if (strcasecmp (command, "update") == 0)
1134       help_text = help_update;
1135     else if (strcasecmp (command, "flush") == 0)
1136       help_text = help_flush;
1137     else if (strcasecmp (command, "flushall") == 0)
1138       help_text = help_flushall;
1139     else if (strcasecmp (command, "pending") == 0)
1140       help_text = help_pending;
1141     else if (strcasecmp (command, "forget") == 0)
1142       help_text = help_forget;
1143     else if (strcasecmp (command, "stats") == 0)
1144       help_text = help_stats;
1145     else if (strcasecmp (command, "batch") == 0)
1146       help_text = help_batch;
1147     else
1148       help_text = help_help;
1149   }
1150
1151   add_response_info(sock, help_text[1]);
1152   return send_response(sock, RESP_OK, help_text[0]);
1153 } /* }}} int handle_request_help */
1154
1155 static int handle_request_stats (listen_socket_t *sock) /* {{{ */
1156 {
1157   uint64_t copy_queue_length;
1158   uint64_t copy_updates_received;
1159   uint64_t copy_flush_received;
1160   uint64_t copy_updates_written;
1161   uint64_t copy_data_sets_written;
1162   uint64_t copy_journal_bytes;
1163   uint64_t copy_journal_rotate;
1164
1165   uint64_t tree_nodes_number;
1166   uint64_t tree_depth;
1167
1168   pthread_mutex_lock (&stats_lock);
1169   copy_queue_length       = stats_queue_length;
1170   copy_updates_received   = stats_updates_received;
1171   copy_flush_received     = stats_flush_received;
1172   copy_updates_written    = stats_updates_written;
1173   copy_data_sets_written  = stats_data_sets_written;
1174   copy_journal_bytes      = stats_journal_bytes;
1175   copy_journal_rotate     = stats_journal_rotate;
1176   pthread_mutex_unlock (&stats_lock);
1177
1178   pthread_mutex_lock (&cache_lock);
1179   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1180   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1181   pthread_mutex_unlock (&cache_lock);
1182
1183   add_response_info(sock,
1184                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1185   add_response_info(sock,
1186                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1187   add_response_info(sock,
1188                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1189   add_response_info(sock,
1190                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1191   add_response_info(sock,
1192                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1193   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1194   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1195   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1196   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1197
1198   send_response(sock, RESP_OK, "Statistics follow\n");
1199
1200   return (0);
1201 } /* }}} int handle_request_stats */
1202
1203 static int handle_request_flush (listen_socket_t *sock, /* {{{ */
1204     char *buffer, size_t buffer_size)
1205 {
1206   char *file, file_tmp[PATH_MAX];
1207   int status;
1208
1209   status = buffer_get_field (&buffer, &buffer_size, &file);
1210   if (status != 0)
1211   {
1212     return send_response(sock, RESP_ERR, "Usage: flush <filename>\n");
1213   }
1214   else
1215   {
1216     pthread_mutex_lock(&stats_lock);
1217     stats_flush_received++;
1218     pthread_mutex_unlock(&stats_lock);
1219
1220     get_abs_path(&file, file_tmp);
1221     if (!check_file_access(file, sock)) return 0;
1222
1223     status = flush_file (file);
1224     if (status == 0)
1225       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1226     else if (status == ENOENT)
1227     {
1228       /* no file in our tree; see whether it exists at all */
1229       struct stat statbuf;
1230
1231       memset(&statbuf, 0, sizeof(statbuf));
1232       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1233         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1234       else
1235         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1236     }
1237     else if (status < 0)
1238       return send_response(sock, RESP_ERR, "Internal error.\n");
1239     else
1240       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1241   }
1242
1243   /* NOTREACHED */
1244   assert(1==0);
1245 } /* }}} int handle_request_flush */
1246
1247 static int handle_request_flushall(listen_socket_t *sock) /* {{{ */
1248 {
1249   int status;
1250
1251   status = has_privilege(sock, PRIV_HIGH);
1252   if (status <= 0)
1253     return status;
1254
1255   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1256
1257   pthread_mutex_lock(&cache_lock);
1258   flush_old_values(-1);
1259   pthread_mutex_unlock(&cache_lock);
1260
1261   return send_response(sock, RESP_OK, "Started flush.\n");
1262 } /* }}} static int handle_request_flushall */
1263
1264 static int handle_request_pending(listen_socket_t *sock, /* {{{ */
1265                                   char *buffer, size_t buffer_size)
1266 {
1267   int status;
1268   char *file, file_tmp[PATH_MAX];
1269   cache_item_t *ci;
1270
1271   status = buffer_get_field(&buffer, &buffer_size, &file);
1272   if (status != 0)
1273     return send_response(sock, RESP_ERR,
1274                          "Usage: PENDING <filename>\n");
1275
1276   status = has_privilege(sock, PRIV_HIGH);
1277   if (status <= 0)
1278     return status;
1279
1280   get_abs_path(&file, file_tmp);
1281
1282   pthread_mutex_lock(&cache_lock);
1283   ci = g_tree_lookup(cache_tree, file);
1284   if (ci == NULL)
1285   {
1286     pthread_mutex_unlock(&cache_lock);
1287     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1288   }
1289
1290   for (int i=0; i < ci->values_num; i++)
1291     add_response_info(sock, "%s\n", ci->values[i]);
1292
1293   pthread_mutex_unlock(&cache_lock);
1294   return send_response(sock, RESP_OK, "updates pending\n");
1295 } /* }}} static int handle_request_pending */
1296
1297 static int handle_request_forget(listen_socket_t *sock, /* {{{ */
1298                                  char *buffer, size_t buffer_size)
1299 {
1300   int status;
1301   char *file, file_tmp[PATH_MAX];
1302
1303   status = buffer_get_field(&buffer, &buffer_size, &file);
1304   if (status != 0)
1305     return send_response(sock, RESP_ERR,
1306                          "Usage: FORGET <filename>\n");
1307
1308   status = has_privilege(sock, PRIV_HIGH);
1309   if (status <= 0)
1310     return status;
1311
1312   get_abs_path(&file, file_tmp);
1313   if (!check_file_access(file, sock)) return 0;
1314
1315   pthread_mutex_lock(&cache_lock);
1316   status = forget_file(file);
1317   pthread_mutex_unlock(&cache_lock);
1318
1319   if (status == 0)
1320   {
1321     if (sock != NULL)
1322       journal_write("forget", file);
1323
1324     return send_response(sock, RESP_OK, "Gone!\n");
1325   }
1326   else
1327     return send_response(sock, RESP_ERR, "cannot forget: %s\n",
1328                          status < 0 ? "Internal error" : rrd_strerror(status));
1329
1330   /* NOTREACHED */
1331   assert(1==0);
1332 } /* }}} static int handle_request_forget */
1333
1334 static int handle_request_update (listen_socket_t *sock, /* {{{ */
1335                                   time_t now,
1336                                   char *buffer, size_t buffer_size)
1337 {
1338   char *file, file_tmp[PATH_MAX];
1339   int values_num = 0;
1340   int bad_timestamps = 0;
1341   int status;
1342   char orig_buf[CMD_MAX];
1343
1344   cache_item_t *ci;
1345
1346   status = has_privilege(sock, PRIV_HIGH);
1347   if (status <= 0)
1348     return status;
1349
1350   /* save it for the journal later */
1351   strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1352
1353   status = buffer_get_field (&buffer, &buffer_size, &file);
1354   if (status != 0)
1355     return send_response(sock, RESP_ERR,
1356                          "Usage: UPDATE <filename> <values> [<values> ...]\n");
1357
1358   pthread_mutex_lock(&stats_lock);
1359   stats_updates_received++;
1360   pthread_mutex_unlock(&stats_lock);
1361
1362   get_abs_path(&file, file_tmp);
1363   if (!check_file_access(file, sock)) return 0;
1364
1365   pthread_mutex_lock (&cache_lock);
1366   ci = g_tree_lookup (cache_tree, file);
1367
1368   if (ci == NULL) /* {{{ */
1369   {
1370     struct stat statbuf;
1371
1372     /* don't hold the lock while we setup; stat(2) might block */
1373     pthread_mutex_unlock(&cache_lock);
1374
1375     memset (&statbuf, 0, sizeof (statbuf));
1376     status = stat (file, &statbuf);
1377     if (status != 0)
1378     {
1379       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1380
1381       status = errno;
1382       if (status == ENOENT)
1383         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1384       else
1385         return send_response(sock, RESP_ERR,
1386                              "stat failed with error %i.\n", status);
1387     }
1388     if (!S_ISREG (statbuf.st_mode))
1389       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1390
1391     if (access(file, R_OK|W_OK) != 0)
1392       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1393                            file, rrd_strerror(errno));
1394
1395     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1396     if (ci == NULL)
1397     {
1398       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1399
1400       return send_response(sock, RESP_ERR, "malloc failed.\n");
1401     }
1402     memset (ci, 0, sizeof (cache_item_t));
1403
1404     ci->file = strdup (file);
1405     if (ci->file == NULL)
1406     {
1407       free (ci);
1408       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1409
1410       return send_response(sock, RESP_ERR, "strdup failed.\n");
1411     }
1412
1413     wipe_ci_values(ci, now);
1414     ci->flags = CI_FLAGS_IN_TREE;
1415
1416     pthread_mutex_lock(&cache_lock);
1417     g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1418   } /* }}} */
1419   assert (ci != NULL);
1420
1421   /* don't re-write updates in replay mode */
1422   if (sock != NULL)
1423     journal_write("update", orig_buf);
1424
1425   while (buffer_size > 0)
1426   {
1427     char **temp;
1428     char *value;
1429     time_t stamp;
1430     char *eostamp;
1431
1432     status = buffer_get_field (&buffer, &buffer_size, &value);
1433     if (status != 0)
1434     {
1435       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1436       break;
1437     }
1438
1439     /* make sure update time is always moving forward */
1440     stamp = strtol(value, &eostamp, 10);
1441     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1442     {
1443       ++bad_timestamps;
1444       add_response_info(sock, "Cannot find timestamp in '%s'!\n", value);
1445       continue;
1446     }
1447     else if (stamp <= ci->last_update_stamp)
1448     {
1449       ++bad_timestamps;
1450       add_response_info(sock,
1451                         "illegal attempt to update using time %ld when"
1452                         " last update time is %ld (minimum one second step)\n",
1453                         stamp, ci->last_update_stamp);
1454       continue;
1455     }
1456     else
1457       ci->last_update_stamp = stamp;
1458
1459     temp = (char **) realloc (ci->values,
1460         sizeof (char *) * (ci->values_num + 1));
1461     if (temp == NULL)
1462     {
1463       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1464       continue;
1465     }
1466     ci->values = temp;
1467
1468     ci->values[ci->values_num] = strdup (value);
1469     if (ci->values[ci->values_num] == NULL)
1470     {
1471       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1472       continue;
1473     }
1474     ci->values_num++;
1475
1476     values_num++;
1477   }
1478
1479   if (((now - ci->last_flush_time) >= config_write_interval)
1480       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1481       && (ci->values_num > 0))
1482   {
1483     enqueue_cache_item (ci, TAIL);
1484   }
1485
1486   pthread_mutex_unlock (&cache_lock);
1487
1488   if (values_num < 1)
1489   {
1490     /* if we had only one update attempt, then return the full
1491        error message... try to get the most information out
1492        of the limited error space allowed by the protocol
1493     */
1494     if (bad_timestamps == 1)
1495       return send_response(sock, RESP_ERR, "%s", sock->wbuf);
1496     else
1497       return send_response(sock, RESP_ERR,
1498                            "No values updated (%d bad timestamps).\n",
1499                            bad_timestamps);
1500   }
1501   else
1502     return send_response(sock, RESP_OK,
1503                          "errors, enqueued %i value(s).\n", values_num);
1504
1505   /* NOTREACHED */
1506   assert(1==0);
1507
1508 } /* }}} int handle_request_update */
1509
1510 /* we came across a "WROTE" entry during journal replay.
1511  * throw away any values that we have accumulated for this file
1512  */
1513 static int handle_request_wrote (const char *buffer, time_t now) /* {{{ */
1514 {
1515   int i;
1516   cache_item_t *ci;
1517   const char *file = buffer;
1518
1519   pthread_mutex_lock(&cache_lock);
1520
1521   ci = g_tree_lookup(cache_tree, file);
1522   if (ci == NULL)
1523   {
1524     pthread_mutex_unlock(&cache_lock);
1525     return (0);
1526   }
1527
1528   if (ci->values)
1529   {
1530     for (i=0; i < ci->values_num; i++)
1531       free(ci->values[i]);
1532
1533     free(ci->values);
1534   }
1535
1536   wipe_ci_values(ci, now);
1537   remove_from_queue(ci);
1538
1539   pthread_mutex_unlock(&cache_lock);
1540   return (0);
1541 } /* }}} int handle_request_wrote */
1542
1543 /* start "BATCH" processing */
1544 static int batch_start (listen_socket_t *sock) /* {{{ */
1545 {
1546   int status;
1547   if (sock->batch_start)
1548     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1549
1550   status = send_response(sock, RESP_OK,
1551                          "Go ahead.  End with dot '.' on its own line.\n");
1552   sock->batch_start = time(NULL);
1553   sock->batch_cmd = 0;
1554
1555   return status;
1556 } /* }}} static int batch_start */
1557
1558 /* finish "BATCH" processing and return results to the client */
1559 static int batch_done (listen_socket_t *sock) /* {{{ */
1560 {
1561   assert(sock->batch_start);
1562   sock->batch_start = 0;
1563   sock->batch_cmd  = 0;
1564   return send_response(sock, RESP_OK, "errors\n");
1565 } /* }}} static int batch_done */
1566
1567 /* if sock==NULL, we are in journal replay mode */
1568 static int handle_request (listen_socket_t *sock, /* {{{ */
1569                            time_t now,
1570                            char *buffer, size_t buffer_size)
1571 {
1572   char *buffer_ptr;
1573   char *command;
1574   int status;
1575
1576   assert (buffer[buffer_size - 1] == '\0');
1577
1578   buffer_ptr = buffer;
1579   command = NULL;
1580   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1581   if (status != 0)
1582   {
1583     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1584     return (-1);
1585   }
1586
1587   if (sock != NULL && sock->batch_start)
1588     sock->batch_cmd++;
1589
1590   if (strcasecmp (command, "update") == 0)
1591     return (handle_request_update (sock, now, buffer_ptr, buffer_size));
1592   else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
1593   {
1594     /* this is only valid in replay mode */
1595     return (handle_request_wrote (buffer_ptr, now));
1596   }
1597   else if (strcasecmp (command, "flush") == 0)
1598     return (handle_request_flush (sock, buffer_ptr, buffer_size));
1599   else if (strcasecmp (command, "flushall") == 0)
1600     return (handle_request_flushall(sock));
1601   else if (strcasecmp (command, "pending") == 0)
1602     return (handle_request_pending(sock, buffer_ptr, buffer_size));
1603   else if (strcasecmp (command, "forget") == 0)
1604     return (handle_request_forget(sock, buffer_ptr, buffer_size));
1605   else if (strcasecmp (command, "stats") == 0)
1606     return (handle_request_stats (sock));
1607   else if (strcasecmp (command, "help") == 0)
1608     return (handle_request_help (sock, buffer_ptr, buffer_size));
1609   else if (strcasecmp (command, "batch") == 0 && sock != NULL)
1610     return batch_start(sock);
1611   else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_start)
1612     return batch_done(sock);
1613   else
1614     return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
1615
1616   /* NOTREACHED */
1617   assert(1==0);
1618 } /* }}} int handle_request */
1619
1620 /* MUST NOT hold journal_lock before calling this */
1621 static void journal_rotate(void) /* {{{ */
1622 {
1623   FILE *old_fh = NULL;
1624   int new_fd;
1625
1626   if (journal_cur == NULL || journal_old == NULL)
1627     return;
1628
1629   pthread_mutex_lock(&journal_lock);
1630
1631   /* we rotate this way (rename before close) so that the we can release
1632    * the journal lock as fast as possible.  Journal writes to the new
1633    * journal can proceed immediately after the new file is opened.  The
1634    * fclose can then block without affecting new updates.
1635    */
1636   if (journal_fh != NULL)
1637   {
1638     old_fh = journal_fh;
1639     journal_fh = NULL;
1640     rename(journal_cur, journal_old);
1641     ++stats_journal_rotate;
1642   }
1643
1644   new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1645                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1646   if (new_fd >= 0)
1647   {
1648     journal_fh = fdopen(new_fd, "a");
1649     if (journal_fh == NULL)
1650       close(new_fd);
1651   }
1652
1653   pthread_mutex_unlock(&journal_lock);
1654
1655   if (old_fh != NULL)
1656     fclose(old_fh);
1657
1658   if (journal_fh == NULL)
1659   {
1660     RRDD_LOG(LOG_CRIT,
1661              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1662              journal_cur, rrd_strerror(errno));
1663
1664     RRDD_LOG(LOG_ERR,
1665              "JOURNALING DISABLED: All values will be flushed at shutdown");
1666     config_flush_at_shutdown = 1;
1667   }
1668
1669 } /* }}} static void journal_rotate */
1670
1671 static void journal_done(void) /* {{{ */
1672 {
1673   if (journal_cur == NULL)
1674     return;
1675
1676   pthread_mutex_lock(&journal_lock);
1677   if (journal_fh != NULL)
1678   {
1679     fclose(journal_fh);
1680     journal_fh = NULL;
1681   }
1682
1683   if (config_flush_at_shutdown)
1684   {
1685     RRDD_LOG(LOG_INFO, "removing journals");
1686     unlink(journal_old);
1687     unlink(journal_cur);
1688   }
1689   else
1690   {
1691     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1692              "journals will be used at next startup");
1693   }
1694
1695   pthread_mutex_unlock(&journal_lock);
1696
1697 } /* }}} static void journal_done */
1698
1699 static int journal_write(char *cmd, char *args) /* {{{ */
1700 {
1701   int chars;
1702
1703   if (journal_fh == NULL)
1704     return 0;
1705
1706   pthread_mutex_lock(&journal_lock);
1707   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1708   pthread_mutex_unlock(&journal_lock);
1709
1710   if (chars > 0)
1711   {
1712     pthread_mutex_lock(&stats_lock);
1713     stats_journal_bytes += chars;
1714     pthread_mutex_unlock(&stats_lock);
1715   }
1716
1717   return chars;
1718 } /* }}} static int journal_write */
1719
1720 static int journal_replay (const char *file) /* {{{ */
1721 {
1722   FILE *fh;
1723   int entry_cnt = 0;
1724   int fail_cnt = 0;
1725   uint64_t line = 0;
1726   char entry[CMD_MAX];
1727   time_t now;
1728
1729   if (file == NULL) return 0;
1730
1731   {
1732     char *reason;
1733     int status = 0;
1734     struct stat statbuf;
1735
1736     memset(&statbuf, 0, sizeof(statbuf));
1737     if (stat(file, &statbuf) != 0)
1738     {
1739       if (errno == ENOENT)
1740         return 0;
1741
1742       reason = "stat error";
1743       status = errno;
1744     }
1745     else if (!S_ISREG(statbuf.st_mode))
1746     {
1747       reason = "not a regular file";
1748       status = EPERM;
1749     }
1750     if (statbuf.st_uid != daemon_uid)
1751     {
1752       reason = "not owned by daemon user";
1753       status = EACCES;
1754     }
1755     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1756     {
1757       reason = "must not be user/group writable";
1758       status = EACCES;
1759     }
1760
1761     if (status != 0)
1762     {
1763       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1764                file, rrd_strerror(status), reason);
1765       return 0;
1766     }
1767   }
1768
1769   fh = fopen(file, "r");
1770   if (fh == NULL)
1771   {
1772     if (errno != ENOENT)
1773       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1774                file, rrd_strerror(errno));
1775     return 0;
1776   }
1777   else
1778     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1779
1780   now = time(NULL);
1781
1782   while(!feof(fh))
1783   {
1784     size_t entry_len;
1785
1786     ++line;
1787     if (fgets(entry, sizeof(entry), fh) == NULL)
1788       break;
1789     entry_len = strlen(entry);
1790
1791     /* check \n termination in case journal writing crashed mid-line */
1792     if (entry_len == 0)
1793       continue;
1794     else if (entry[entry_len - 1] != '\n')
1795     {
1796       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1797       ++fail_cnt;
1798       continue;
1799     }
1800
1801     entry[entry_len - 1] = '\0';
1802
1803     if (handle_request(NULL, now, entry, entry_len) == 0)
1804       ++entry_cnt;
1805     else
1806       ++fail_cnt;
1807   }
1808
1809   fclose(fh);
1810
1811   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1812            entry_cnt, fail_cnt);
1813
1814   return entry_cnt > 0 ? 1 : 0;
1815 } /* }}} static int journal_replay */
1816
1817 static void journal_init(void) /* {{{ */
1818 {
1819   int had_journal = 0;
1820
1821   if (journal_cur == NULL) return;
1822
1823   pthread_mutex_lock(&journal_lock);
1824
1825   RRDD_LOG(LOG_INFO, "checking for journal files");
1826
1827   had_journal += journal_replay(journal_old);
1828   had_journal += journal_replay(journal_cur);
1829
1830   /* it must have been a crash.  start a flush */
1831   if (had_journal && config_flush_at_shutdown)
1832     flush_old_values(-1);
1833
1834   pthread_mutex_unlock(&journal_lock);
1835   journal_rotate();
1836
1837   RRDD_LOG(LOG_INFO, "journal processing complete");
1838
1839 } /* }}} static void journal_init */
1840
1841 static void close_connection(listen_socket_t *sock)
1842 {
1843   close(sock->fd) ;  sock->fd   = -1;
1844   free(sock->rbuf);  sock->rbuf = NULL;
1845   free(sock->wbuf);  sock->wbuf = NULL;
1846
1847   free(sock);
1848 }
1849
1850 static void *connection_thread_main (void *args) /* {{{ */
1851 {
1852   pthread_t self;
1853   listen_socket_t *sock;
1854   int i;
1855   int fd;
1856
1857   sock = (listen_socket_t *) args;
1858   fd = sock->fd;
1859
1860   /* init read buffers */
1861   sock->next_read = sock->next_cmd = 0;
1862   sock->rbuf = malloc(RBUF_SIZE);
1863   if (sock->rbuf == NULL)
1864   {
1865     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1866     close_connection(sock);
1867     return NULL;
1868   }
1869
1870   pthread_mutex_lock (&connection_threads_lock);
1871   {
1872     pthread_t *temp;
1873
1874     temp = (pthread_t *) realloc (connection_threads,
1875         sizeof (pthread_t) * (connection_threads_num + 1));
1876     if (temp == NULL)
1877     {
1878       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1879     }
1880     else
1881     {
1882       connection_threads = temp;
1883       connection_threads[connection_threads_num] = pthread_self ();
1884       connection_threads_num++;
1885     }
1886   }
1887   pthread_mutex_unlock (&connection_threads_lock);
1888
1889   while (do_shutdown == 0)
1890   {
1891     char *cmd;
1892     ssize_t cmd_len;
1893     ssize_t rbytes;
1894     time_t now;
1895
1896     struct pollfd pollfd;
1897     int status;
1898
1899     pollfd.fd = fd;
1900     pollfd.events = POLLIN | POLLPRI;
1901     pollfd.revents = 0;
1902
1903     status = poll (&pollfd, 1, /* timeout = */ 500);
1904     if (do_shutdown)
1905       break;
1906     else if (status == 0) /* timeout */
1907       continue;
1908     else if (status < 0) /* error */
1909     {
1910       status = errno;
1911       if (status != EINTR)
1912         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1913       continue;
1914     }
1915
1916     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1917       break;
1918     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1919     {
1920       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1921           "poll(2) returned something unexpected: %#04hx",
1922           pollfd.revents);
1923       break;
1924     }
1925
1926     rbytes = read(fd, sock->rbuf + sock->next_read,
1927                   RBUF_SIZE - sock->next_read);
1928     if (rbytes < 0)
1929     {
1930       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
1931       break;
1932     }
1933     else if (rbytes == 0)
1934       break; /* eof */
1935
1936     sock->next_read += rbytes;
1937
1938     if (sock->batch_start)
1939       now = sock->batch_start;
1940     else
1941       now = time(NULL);
1942
1943     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
1944     {
1945       status = handle_request (sock, now, cmd, cmd_len+1);
1946       if (status != 0)
1947         goto out_close;
1948     }
1949   }
1950
1951 out_close:
1952   close_connection(sock);
1953
1954   self = pthread_self ();
1955   /* Remove this thread from the connection threads list */
1956   pthread_mutex_lock (&connection_threads_lock);
1957   /* Find out own index in the array */
1958   for (i = 0; i < connection_threads_num; i++)
1959     if (pthread_equal (connection_threads[i], self) != 0)
1960       break;
1961   assert (i < connection_threads_num);
1962
1963   /* Move the trailing threads forward. */
1964   if (i < (connection_threads_num - 1))
1965   {
1966     memmove (connection_threads + i,
1967         connection_threads + i + 1,
1968         sizeof (pthread_t) * (connection_threads_num - i - 1));
1969   }
1970
1971   connection_threads_num--;
1972   pthread_mutex_unlock (&connection_threads_lock);
1973
1974   return (NULL);
1975 } /* }}} void *connection_thread_main */
1976
1977 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
1978 {
1979   int fd;
1980   struct sockaddr_un sa;
1981   listen_socket_t *temp;
1982   int status;
1983   const char *path;
1984
1985   path = sock->addr;
1986   if (strncmp(path, "unix:", strlen("unix:")) == 0)
1987     path += strlen("unix:");
1988
1989   temp = (listen_socket_t *) realloc (listen_fds,
1990       sizeof (listen_fds[0]) * (listen_fds_num + 1));
1991   if (temp == NULL)
1992   {
1993     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1994     return (-1);
1995   }
1996   listen_fds = temp;
1997   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
1998
1999   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2000   if (fd < 0)
2001   {
2002     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
2003     return (-1);
2004   }
2005
2006   memset (&sa, 0, sizeof (sa));
2007   sa.sun_family = AF_UNIX;
2008   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2009
2010   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2011   if (status != 0)
2012   {
2013     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
2014     close (fd);
2015     unlink (path);
2016     return (-1);
2017   }
2018
2019   status = listen (fd, /* backlog = */ 10);
2020   if (status != 0)
2021   {
2022     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
2023     close (fd);
2024     unlink (path);
2025     return (-1);
2026   }
2027
2028   listen_fds[listen_fds_num].fd = fd;
2029   listen_fds[listen_fds_num].family = PF_UNIX;
2030   strncpy(listen_fds[listen_fds_num].addr, path,
2031           sizeof (listen_fds[listen_fds_num].addr) - 1);
2032   listen_fds_num++;
2033
2034   return (0);
2035 } /* }}} int open_listen_socket_unix */
2036
2037 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2038 {
2039   struct addrinfo ai_hints;
2040   struct addrinfo *ai_res;
2041   struct addrinfo *ai_ptr;
2042   char addr_copy[NI_MAXHOST];
2043   char *addr;
2044   char *port;
2045   int status;
2046
2047   strncpy (addr_copy, sock->addr, sizeof (addr_copy));
2048   addr_copy[sizeof (addr_copy) - 1] = 0;
2049   addr = addr_copy;
2050
2051   memset (&ai_hints, 0, sizeof (ai_hints));
2052   ai_hints.ai_flags = 0;
2053 #ifdef AI_ADDRCONFIG
2054   ai_hints.ai_flags |= AI_ADDRCONFIG;
2055 #endif
2056   ai_hints.ai_family = AF_UNSPEC;
2057   ai_hints.ai_socktype = SOCK_STREAM;
2058
2059   port = NULL;
2060   if (*addr == '[') /* IPv6+port format */
2061   {
2062     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2063     addr++;
2064
2065     port = strchr (addr, ']');
2066     if (port == NULL)
2067     {
2068       RRDD_LOG (LOG_ERR, "open_listen_socket_network: Malformed address: %s",
2069           sock->addr);
2070       return (-1);
2071     }
2072     *port = 0;
2073     port++;
2074
2075     if (*port == ':')
2076       port++;
2077     else if (*port == 0)
2078       port = NULL;
2079     else
2080     {
2081       RRDD_LOG (LOG_ERR, "open_listen_socket_network: Garbage after address: %s",
2082           port);
2083       return (-1);
2084     }
2085   } /* if (*addr = ']') */
2086   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2087   {
2088     port = rindex(addr, ':');
2089     if (port != NULL)
2090     {
2091       *port = 0;
2092       port++;
2093     }
2094   }
2095   ai_res = NULL;
2096   status = getaddrinfo (addr,
2097                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2098                         &ai_hints, &ai_res);
2099   if (status != 0)
2100   {
2101     RRDD_LOG (LOG_ERR, "open_listen_socket_network: getaddrinfo(%s) failed: "
2102         "%s", addr, gai_strerror (status));
2103     return (-1);
2104   }
2105
2106   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2107   {
2108     int fd;
2109     listen_socket_t *temp;
2110     int one = 1;
2111
2112     temp = (listen_socket_t *) realloc (listen_fds,
2113         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2114     if (temp == NULL)
2115     {
2116       RRDD_LOG (LOG_ERR, "open_listen_socket_network: realloc failed.");
2117       continue;
2118     }
2119     listen_fds = temp;
2120     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2121
2122     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2123     if (fd < 0)
2124     {
2125       RRDD_LOG (LOG_ERR, "open_listen_socket_network: socket(2) failed.");
2126       continue;
2127     }
2128
2129     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2130
2131     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2132     if (status != 0)
2133     {
2134       RRDD_LOG (LOG_ERR, "open_listen_socket_network: bind(2) failed.");
2135       close (fd);
2136       continue;
2137     }
2138
2139     status = listen (fd, /* backlog = */ 10);
2140     if (status != 0)
2141     {
2142       RRDD_LOG (LOG_ERR, "open_listen_socket_network: listen(2) failed.");
2143       close (fd);
2144       return (-1);
2145     }
2146
2147     listen_fds[listen_fds_num].fd = fd;
2148     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2149     listen_fds_num++;
2150   } /* for (ai_ptr) */
2151
2152   return (0);
2153 } /* }}} static int open_listen_socket_network */
2154
2155 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2156 {
2157   assert(sock != NULL);
2158   assert(sock->addr != NULL);
2159
2160   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2161       || sock->addr[0] == '/')
2162     return (open_listen_socket_unix(sock));
2163   else
2164     return (open_listen_socket_network(sock));
2165 } /* }}} int open_listen_socket */
2166
2167 static int close_listen_sockets (void) /* {{{ */
2168 {
2169   size_t i;
2170
2171   for (i = 0; i < listen_fds_num; i++)
2172   {
2173     close (listen_fds[i].fd);
2174
2175     if (listen_fds[i].family == PF_UNIX)
2176       unlink(listen_fds[i].addr);
2177   }
2178
2179   free (listen_fds);
2180   listen_fds = NULL;
2181   listen_fds_num = 0;
2182
2183   return (0);
2184 } /* }}} int close_listen_sockets */
2185
2186 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2187 {
2188   struct pollfd *pollfds;
2189   int pollfds_num;
2190   int status;
2191   int i;
2192
2193   for (i = 0; i < config_listen_address_list_len; i++)
2194     open_listen_socket (config_listen_address_list[i]);
2195
2196   if (config_listen_address_list_len < 1)
2197   {
2198     listen_socket_t sock;
2199     memset(&sock, 0, sizeof(sock));
2200     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
2201     open_listen_socket (&sock);
2202   }
2203
2204   if (listen_fds_num < 1)
2205   {
2206     RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
2207         "could be opened. Sorry.");
2208     return (NULL);
2209   }
2210
2211   pollfds_num = listen_fds_num;
2212   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2213   if (pollfds == NULL)
2214   {
2215     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2216     return (NULL);
2217   }
2218   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2219
2220   RRDD_LOG(LOG_INFO, "listening for connections");
2221
2222   while (do_shutdown == 0)
2223   {
2224     assert (pollfds_num == ((int) listen_fds_num));
2225     for (i = 0; i < pollfds_num; i++)
2226     {
2227       pollfds[i].fd = listen_fds[i].fd;
2228       pollfds[i].events = POLLIN | POLLPRI;
2229       pollfds[i].revents = 0;
2230     }
2231
2232     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2233     if (do_shutdown)
2234       break;
2235     else if (status == 0) /* timeout */
2236       continue;
2237     else if (status < 0) /* error */
2238     {
2239       status = errno;
2240       if (status != EINTR)
2241       {
2242         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2243       }
2244       continue;
2245     }
2246
2247     for (i = 0; i < pollfds_num; i++)
2248     {
2249       listen_socket_t *client_sock;
2250       struct sockaddr_storage client_sa;
2251       socklen_t client_sa_size;
2252       pthread_t tid;
2253       pthread_attr_t attr;
2254
2255       if (pollfds[i].revents == 0)
2256         continue;
2257
2258       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2259       {
2260         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2261             "poll(2) returned something unexpected for listen FD #%i.",
2262             pollfds[i].fd);
2263         continue;
2264       }
2265
2266       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2267       if (client_sock == NULL)
2268       {
2269         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2270         continue;
2271       }
2272       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2273
2274       client_sa_size = sizeof (client_sa);
2275       client_sock->fd = accept (pollfds[i].fd,
2276           (struct sockaddr *) &client_sa, &client_sa_size);
2277       if (client_sock->fd < 0)
2278       {
2279         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2280         free(client_sock);
2281         continue;
2282       }
2283
2284       pthread_attr_init (&attr);
2285       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2286
2287       status = pthread_create (&tid, &attr, connection_thread_main,
2288                                client_sock);
2289       if (status != 0)
2290       {
2291         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2292         close_connection(client_sock);
2293         continue;
2294       }
2295     } /* for (pollfds_num) */
2296   } /* while (do_shutdown == 0) */
2297
2298   RRDD_LOG(LOG_INFO, "starting shutdown");
2299
2300   close_listen_sockets ();
2301
2302   pthread_mutex_lock (&connection_threads_lock);
2303   while (connection_threads_num > 0)
2304   {
2305     pthread_t wait_for;
2306
2307     wait_for = connection_threads[0];
2308
2309     pthread_mutex_unlock (&connection_threads_lock);
2310     pthread_join (wait_for, /* retval = */ NULL);
2311     pthread_mutex_lock (&connection_threads_lock);
2312   }
2313   pthread_mutex_unlock (&connection_threads_lock);
2314
2315   return (NULL);
2316 } /* }}} void *listen_thread_main */
2317
2318 static int daemonize (void) /* {{{ */
2319 {
2320   int status;
2321   int fd;
2322   char *base_dir;
2323
2324   daemon_uid = geteuid();
2325
2326   fd = open_pidfile();
2327   if (fd < 0) return fd;
2328
2329   if (!stay_foreground)
2330   {
2331     pid_t child;
2332
2333     child = fork ();
2334     if (child < 0)
2335     {
2336       fprintf (stderr, "daemonize: fork(2) failed.\n");
2337       return (-1);
2338     }
2339     else if (child > 0)
2340     {
2341       return (1);
2342     }
2343
2344     /* Become session leader */
2345     setsid ();
2346
2347     /* Open the first three file descriptors to /dev/null */
2348     close (2);
2349     close (1);
2350     close (0);
2351
2352     open ("/dev/null", O_RDWR);
2353     dup (0);
2354     dup (0);
2355   } /* if (!stay_foreground) */
2356
2357   /* Change into the /tmp directory. */
2358   base_dir = (config_base_dir != NULL)
2359     ? config_base_dir
2360     : "/tmp";
2361   status = chdir (base_dir);
2362   if (status != 0)
2363   {
2364     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2365     return (-1);
2366   }
2367
2368   install_signal_handlers();
2369
2370   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2371   RRDD_LOG(LOG_INFO, "starting up");
2372
2373   cache_tree = g_tree_new ((GCompareFunc) strcmp);
2374   if (cache_tree == NULL)
2375   {
2376     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2377     return (-1);
2378   }
2379
2380   status = write_pidfile (fd);
2381   return status;
2382 } /* }}} int daemonize */
2383
2384 static int cleanup (void) /* {{{ */
2385 {
2386   do_shutdown++;
2387
2388   pthread_cond_signal (&cache_cond);
2389   pthread_join (queue_thread, /* return = */ NULL);
2390
2391   remove_pidfile ();
2392
2393   RRDD_LOG(LOG_INFO, "goodbye");
2394   closelog ();
2395
2396   return (0);
2397 } /* }}} int cleanup */
2398
2399 static int read_options (int argc, char **argv) /* {{{ */
2400 {
2401   int option;
2402   int status = 0;
2403
2404   while ((option = getopt(argc, argv, "gl:L:f:w:b:Bz:p:j:h?F")) != -1)
2405   {
2406     switch (option)
2407     {
2408       case 'g':
2409         stay_foreground=1;
2410         break;
2411
2412       case 'L':
2413       case 'l':
2414       {
2415         listen_socket_t **temp;
2416         listen_socket_t *new;
2417
2418         new = malloc(sizeof(listen_socket_t));
2419         if (new == NULL)
2420         {
2421           fprintf(stderr, "read_options: malloc failed.\n");
2422           return(2);
2423         }
2424         memset(new, 0, sizeof(listen_socket_t));
2425
2426         temp = (listen_socket_t **) realloc (config_listen_address_list,
2427             sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2428         if (temp == NULL)
2429         {
2430           fprintf (stderr, "read_options: realloc failed.\n");
2431           return (2);
2432         }
2433         config_listen_address_list = temp;
2434
2435         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2436         new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2437
2438         temp[config_listen_address_list_len] = new;
2439         config_listen_address_list_len++;
2440       }
2441       break;
2442
2443       case 'f':
2444       {
2445         int temp;
2446
2447         temp = atoi (optarg);
2448         if (temp > 0)
2449           config_flush_interval = temp;
2450         else
2451         {
2452           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2453           status = 3;
2454         }
2455       }
2456       break;
2457
2458       case 'w':
2459       {
2460         int temp;
2461
2462         temp = atoi (optarg);
2463         if (temp > 0)
2464           config_write_interval = temp;
2465         else
2466         {
2467           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2468           status = 2;
2469         }
2470       }
2471       break;
2472
2473       case 'z':
2474       {
2475         int temp;
2476
2477         temp = atoi(optarg);
2478         if (temp > 0)
2479           config_write_jitter = temp;
2480         else
2481         {
2482           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2483           status = 2;
2484         }
2485
2486         break;
2487       }
2488
2489       case 'B':
2490         config_write_base_only = 1;
2491         break;
2492
2493       case 'b':
2494       {
2495         size_t len;
2496         char base_realpath[PATH_MAX];
2497
2498         if (config_base_dir != NULL)
2499           free (config_base_dir);
2500         config_base_dir = strdup (optarg);
2501         if (config_base_dir == NULL)
2502         {
2503           fprintf (stderr, "read_options: strdup failed.\n");
2504           return (3);
2505         }
2506
2507         /* make sure that the base directory is not resolved via
2508          * symbolic links.  this makes some performance-enhancing
2509          * assumptions possible (we don't have to resolve paths
2510          * that start with a "/")
2511          */
2512         if (realpath(config_base_dir, base_realpath) == NULL)
2513         {
2514           fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2515           return 5;
2516         }
2517         else if (strncmp(config_base_dir,
2518                          base_realpath, sizeof(base_realpath)) != 0)
2519         {
2520           fprintf(stderr,
2521                   "Base directory (-b) resolved via file system links!\n"
2522                   "Please consult rrdcached '-b' documentation!\n"
2523                   "Consider specifying the real directory (%s)\n",
2524                   base_realpath);
2525           return 5;
2526         }
2527
2528         len = strlen (config_base_dir);
2529         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2530         {
2531           config_base_dir[len - 1] = 0;
2532           len--;
2533         }
2534
2535         if (len < 1)
2536         {
2537           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2538           return (4);
2539         }
2540
2541         _config_base_dir_len = len;
2542       }
2543       break;
2544
2545       case 'p':
2546       {
2547         if (config_pid_file != NULL)
2548           free (config_pid_file);
2549         config_pid_file = strdup (optarg);
2550         if (config_pid_file == NULL)
2551         {
2552           fprintf (stderr, "read_options: strdup failed.\n");
2553           return (3);
2554         }
2555       }
2556       break;
2557
2558       case 'F':
2559         config_flush_at_shutdown = 1;
2560         break;
2561
2562       case 'j':
2563       {
2564         struct stat statbuf;
2565         const char *dir = optarg;
2566
2567         status = stat(dir, &statbuf);
2568         if (status != 0)
2569         {
2570           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2571           return 6;
2572         }
2573
2574         if (!S_ISDIR(statbuf.st_mode)
2575             || access(dir, R_OK|W_OK|X_OK) != 0)
2576         {
2577           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2578                   errno ? rrd_strerror(errno) : "");
2579           return 6;
2580         }
2581
2582         journal_cur = malloc(PATH_MAX + 1);
2583         journal_old = malloc(PATH_MAX + 1);
2584         if (journal_cur == NULL || journal_old == NULL)
2585         {
2586           fprintf(stderr, "malloc failure for journal files\n");
2587           return 6;
2588         }
2589         else 
2590         {
2591           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2592           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2593         }
2594       }
2595       break;
2596
2597       case 'h':
2598       case '?':
2599         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2600             "\n"
2601             "Usage: rrdcached [options]\n"
2602             "\n"
2603             "Valid options are:\n"
2604             "  -l <address>  Socket address to listen to.\n"
2605             "  -L <address>  Socket address to listen to ('FLUSH' only).\n"
2606             "  -w <seconds>  Interval in which to write data.\n"
2607             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2608             "  -f <seconds>  Interval in which to flush dead data.\n"
2609             "  -p <file>     Location of the PID-file.\n"
2610             "  -b <dir>      Base directory to change to.\n"
2611             "  -B            Restrict file access to paths within -b <dir>\n"
2612             "  -g            Do not fork and run in the foreground.\n"
2613             "  -j <dir>      Directory in which to create the journal files.\n"
2614             "  -F            Always flush all updates at shutdown\n"
2615             "\n"
2616             "For more information and a detailed description of all options "
2617             "please refer\n"
2618             "to the rrdcached(1) manual page.\n",
2619             VERSION);
2620         status = -1;
2621         break;
2622     } /* switch (option) */
2623   } /* while (getopt) */
2624
2625   /* advise the user when values are not sane */
2626   if (config_flush_interval < 2 * config_write_interval)
2627     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2628             " 2x write interval (-w) !\n");
2629   if (config_write_jitter > config_write_interval)
2630     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2631             " write interval (-w) !\n");
2632
2633   if (config_write_base_only && config_base_dir == NULL)
2634     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2635             "  Consult the rrdcached documentation\n");
2636
2637   if (journal_cur == NULL)
2638     config_flush_at_shutdown = 1;
2639
2640   return (status);
2641 } /* }}} int read_options */
2642
2643 int main (int argc, char **argv)
2644 {
2645   int status;
2646
2647   status = read_options (argc, argv);
2648   if (status != 0)
2649   {
2650     if (status < 0)
2651       status = 0;
2652     return (status);
2653   }
2654
2655   status = daemonize ();
2656   if (status == 1)
2657   {
2658     struct sigaction sigchld;
2659
2660     memset (&sigchld, 0, sizeof (sigchld));
2661     sigchld.sa_handler = SIG_IGN;
2662     sigaction (SIGCHLD, &sigchld, NULL);
2663
2664     return (0);
2665   }
2666   else if (status != 0)
2667   {
2668     fprintf (stderr, "daemonize failed, exiting.\n");
2669     return (1);
2670   }
2671
2672   journal_init();
2673
2674   /* start the queue thread */
2675   memset (&queue_thread, 0, sizeof (queue_thread));
2676   status = pthread_create (&queue_thread,
2677                            NULL, /* attr */
2678                            queue_thread_main,
2679                            NULL); /* args */
2680   if (status != 0)
2681   {
2682     RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2683     cleanup();
2684     return (1);
2685   }
2686
2687   listen_thread_main (NULL);
2688   cleanup ();
2689
2690   return (0);
2691 } /* int main */
2692
2693 /*
2694  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2695  */