this ensures that the response is protocol-compliant even if additional
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
67
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
76
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
91
92 #include <glib-2.0/glib.h>
93 /* }}} */
94
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
96
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
100
101 /*
102  * Types
103  */
104 typedef enum
105 {
106   PRIV_LOW,
107   PRIV_HIGH
108 } socket_privilege;
109
110 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
111
112 struct listen_socket_s
113 {
114   int fd;
115   char addr[PATH_MAX + 1];
116   int family;
117   socket_privilege privilege;
118
119   /* state for BATCH processing */
120   int batch_mode;
121   int batch_cmd;
122
123   /* buffered IO */
124   char *rbuf;
125   off_t next_cmd;
126   off_t next_read;
127
128   char *wbuf;
129   ssize_t wbuf_len;
130 };
131 typedef struct listen_socket_s listen_socket_t;
132
133 struct cache_item_s;
134 typedef struct cache_item_s cache_item_t;
135 struct cache_item_s
136 {
137   char *file;
138   char **values;
139   int values_num;
140   time_t last_flush_time;
141   time_t last_update_stamp;
142 #define CI_FLAGS_IN_TREE  (1<<0)
143 #define CI_FLAGS_IN_QUEUE (1<<1)
144   int flags;
145   pthread_cond_t  flushed;
146   cache_item_t *prev;
147   cache_item_t *next;
148 };
149
150 struct callback_flush_data_s
151 {
152   time_t now;
153   time_t abs_timeout;
154   char **keys;
155   size_t keys_num;
156 };
157 typedef struct callback_flush_data_s callback_flush_data_t;
158
159 enum queue_side_e
160 {
161   HEAD,
162   TAIL
163 };
164 typedef enum queue_side_e queue_side_t;
165
166 /* max length of socket command or response */
167 #define CMD_MAX 4096
168 #define RBUF_SIZE (CMD_MAX*2)
169
170 /*
171  * Variables
172  */
173 static int stay_foreground = 0;
174 static uid_t daemon_uid;
175
176 static listen_socket_t *listen_fds = NULL;
177 static size_t listen_fds_num = 0;
178
179 static int do_shutdown = 0;
180
181 static pthread_t queue_thread;
182
183 static pthread_t *connection_threads = NULL;
184 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
185 static int connection_threads_num = 0;
186
187 /* Cache stuff */
188 static GTree          *cache_tree = NULL;
189 static cache_item_t   *cache_queue_head = NULL;
190 static cache_item_t   *cache_queue_tail = NULL;
191 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
192 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
193
194 static int config_write_interval = 300;
195 static int config_write_jitter   = 0;
196 static int config_flush_interval = 3600;
197 static int config_flush_at_shutdown = 0;
198 static char *config_pid_file = NULL;
199 static char *config_base_dir = NULL;
200 static size_t _config_base_dir_len = 0;
201 static int config_write_base_only = 0;
202
203 static listen_socket_t **config_listen_address_list = NULL;
204 static int config_listen_address_list_len = 0;
205
206 static uint64_t stats_queue_length = 0;
207 static uint64_t stats_updates_received = 0;
208 static uint64_t stats_flush_received = 0;
209 static uint64_t stats_updates_written = 0;
210 static uint64_t stats_data_sets_written = 0;
211 static uint64_t stats_journal_bytes = 0;
212 static uint64_t stats_journal_rotate = 0;
213 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
214
215 /* Journaled updates */
216 static char *journal_cur = NULL;
217 static char *journal_old = NULL;
218 static FILE *journal_fh = NULL;
219 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
220 static int journal_write(char *cmd, char *args);
221 static void journal_done(void);
222 static void journal_rotate(void);
223
224 /* 
225  * Functions
226  */
227 static void sig_common (const char *sig) /* {{{ */
228 {
229   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
230   do_shutdown++;
231   pthread_cond_broadcast(&cache_cond);
232 } /* }}} void sig_common */
233
234 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
235 {
236   sig_common("INT");
237 } /* }}} void sig_int_handler */
238
239 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
240 {
241   sig_common("TERM");
242 } /* }}} void sig_term_handler */
243
244 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
245 {
246   config_flush_at_shutdown = 1;
247   sig_common("USR1");
248 } /* }}} void sig_usr1_handler */
249
250 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
251 {
252   config_flush_at_shutdown = 0;
253   sig_common("USR2");
254 } /* }}} void sig_usr2_handler */
255
256 static void install_signal_handlers(void) /* {{{ */
257 {
258   /* These structures are static, because `sigaction' behaves weird if the are
259    * overwritten.. */
260   static struct sigaction sa_int;
261   static struct sigaction sa_term;
262   static struct sigaction sa_pipe;
263   static struct sigaction sa_usr1;
264   static struct sigaction sa_usr2;
265
266   /* Install signal handlers */
267   memset (&sa_int, 0, sizeof (sa_int));
268   sa_int.sa_handler = sig_int_handler;
269   sigaction (SIGINT, &sa_int, NULL);
270
271   memset (&sa_term, 0, sizeof (sa_term));
272   sa_term.sa_handler = sig_term_handler;
273   sigaction (SIGTERM, &sa_term, NULL);
274
275   memset (&sa_pipe, 0, sizeof (sa_pipe));
276   sa_pipe.sa_handler = SIG_IGN;
277   sigaction (SIGPIPE, &sa_pipe, NULL);
278
279   memset (&sa_pipe, 0, sizeof (sa_usr1));
280   sa_usr1.sa_handler = sig_usr1_handler;
281   sigaction (SIGUSR1, &sa_usr1, NULL);
282
283   memset (&sa_usr2, 0, sizeof (sa_usr2));
284   sa_usr2.sa_handler = sig_usr2_handler;
285   sigaction (SIGUSR2, &sa_usr2, NULL);
286
287 } /* }}} void install_signal_handlers */
288
289 static int open_pidfile(void) /* {{{ */
290 {
291   int fd;
292   char *file;
293
294   file = (config_pid_file != NULL)
295     ? config_pid_file
296     : LOCALSTATEDIR "/run/rrdcached.pid";
297
298   fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
299   if (fd < 0)
300     fprintf(stderr, "FATAL: cannot create '%s' (%s)\n",
301             file, rrd_strerror(errno));
302
303   return(fd);
304 } /* }}} static int open_pidfile */
305
306 static int write_pidfile (int fd) /* {{{ */
307 {
308   pid_t pid;
309   FILE *fh;
310
311   pid = getpid ();
312
313   fh = fdopen (fd, "w");
314   if (fh == NULL)
315   {
316     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
317     close(fd);
318     return (-1);
319   }
320
321   fprintf (fh, "%i\n", (int) pid);
322   fclose (fh);
323
324   return (0);
325 } /* }}} int write_pidfile */
326
327 static int remove_pidfile (void) /* {{{ */
328 {
329   char *file;
330   int status;
331
332   file = (config_pid_file != NULL)
333     ? config_pid_file
334     : LOCALSTATEDIR "/run/rrdcached.pid";
335
336   status = unlink (file);
337   if (status == 0)
338     return (0);
339   return (errno);
340 } /* }}} int remove_pidfile */
341
342 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
343 {
344   char *eol;
345
346   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
347                sock->next_read - sock->next_cmd);
348
349   if (eol == NULL)
350   {
351     /* no commands left, move remainder back to front of rbuf */
352     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
353             sock->next_read - sock->next_cmd);
354     sock->next_read -= sock->next_cmd;
355     sock->next_cmd = 0;
356     *len = 0;
357     return NULL;
358   }
359   else
360   {
361     char *cmd = sock->rbuf + sock->next_cmd;
362     *eol = '\0';
363
364     sock->next_cmd = eol - sock->rbuf + 1;
365
366     if (eol > sock->rbuf && *(eol-1) == '\r')
367       *(--eol) = '\0'; /* handle "\r\n" EOL */
368
369     *len = eol - cmd;
370
371     return cmd;
372   }
373
374   /* NOTREACHED */
375   assert(1==0);
376 }
377
378 /* add the characters directly to the write buffer */
379 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
380 {
381   char *new_buf;
382
383   assert(sock != NULL);
384
385   new_buf = realloc(sock->wbuf, sock->wbuf_len + len + 1);
386   if (new_buf == NULL)
387   {
388     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
389     return -1;
390   }
391
392   strncpy(new_buf + sock->wbuf_len, str, len + 1);
393
394   sock->wbuf = new_buf;
395   sock->wbuf_len += len;
396
397   return 0;
398 } /* }}} static int add_to_wbuf */
399
400 /* add the text to the "extra" info that's sent after the status line */
401 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
402 {
403   va_list argp;
404   char buffer[CMD_MAX];
405   int len;
406
407   if (sock == NULL) return 0; /* journal replay mode */
408   if (sock->batch_mode) return 0; /* no extra info returned when in BATCH */
409
410   va_start(argp, fmt);
411 #ifdef HAVE_VSNPRINTF
412   len = vsnprintf(buffer, sizeof(buffer)-1, fmt, argp);
413 #else
414   len = vsprintf(buffer, fmt, argp);
415 #endif
416   va_end(argp);
417   if (len < 0)
418   {
419     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
420     return -1;
421   }
422
423   return add_to_wbuf(sock, buffer, len);
424 } /* }}} static int add_response_info */
425
426 static int count_lines(char *str) /* {{{ */
427 {
428   int lines = 0;
429
430   if (str != NULL)
431   {
432     while ((str = strchr(str, '\n')) != NULL)
433     {
434       ++lines;
435       ++str;
436     }
437   }
438
439   return lines;
440 } /* }}} static int count_lines */
441
442 /* send the response back to the user.
443  * returns 0 on success, -1 on error
444  * write buffer is always zeroed after this call */
445 static int send_response (listen_socket_t *sock, response_code rc,
446                           char *fmt, ...) /* {{{ */
447 {
448   va_list argp;
449   char buffer[CMD_MAX];
450   int lines;
451   ssize_t wrote;
452   int rclen, len;
453
454   if (sock == NULL) return rc;  /* journal replay mode */
455
456   if (sock->batch_mode)
457   {
458     if (rc == RESP_OK)
459       return rc; /* no response on success during BATCH */
460     lines = sock->batch_cmd;
461   }
462   else if (rc == RESP_OK)
463     lines = count_lines(sock->wbuf);
464   else
465     lines = -1;
466
467   rclen = sprintf(buffer, "%d ", lines);
468   va_start(argp, fmt);
469 #ifdef HAVE_VSNPRINTF
470   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen-1, fmt, argp);
471 #else
472   len = vsprintf(buffer+rclen, fmt, argp);
473 #endif
474   va_end(argp);
475   if (len < 0)
476     return -1;
477
478   len += rclen;
479
480   /* append the result to the wbuf, don't write to the user */
481   if (sock->batch_mode)
482     return add_to_wbuf(sock, buffer, len);
483
484   /* first write must be complete */
485   if (len != write(sock->fd, buffer, len))
486   {
487     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
488     return -1;
489   }
490
491   if (sock->wbuf != NULL && rc == RESP_OK)
492   {
493     wrote = 0;
494     while (wrote < sock->wbuf_len)
495     {
496       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
497       if (wb <= 0)
498       {
499         RRDD_LOG(LOG_INFO, "send_response: could not write results");
500         return -1;
501       }
502       wrote += wb;
503     }
504   }
505
506   free(sock->wbuf); sock->wbuf = NULL;
507   sock->wbuf_len = 0;
508
509   return 0;
510 } /* }}} */
511
512 static void wipe_ci_values(cache_item_t *ci, time_t when)
513 {
514   ci->values = NULL;
515   ci->values_num = 0;
516
517   ci->last_flush_time = when;
518   if (config_write_jitter > 0)
519     ci->last_flush_time += (random() % config_write_jitter);
520 }
521
522 /* remove_from_queue
523  * remove a "cache_item_t" item from the queue.
524  * must hold 'cache_lock' when calling this
525  */
526 static void remove_from_queue(cache_item_t *ci) /* {{{ */
527 {
528   if (ci == NULL) return;
529
530   if (ci->prev == NULL)
531     cache_queue_head = ci->next; /* reset head */
532   else
533     ci->prev->next = ci->next;
534
535   if (ci->next == NULL)
536     cache_queue_tail = ci->prev; /* reset the tail */
537   else
538     ci->next->prev = ci->prev;
539
540   ci->next = ci->prev = NULL;
541   ci->flags &= ~CI_FLAGS_IN_QUEUE;
542 } /* }}} static void remove_from_queue */
543
544 /* remove an entry from the tree and free all its resources.
545  * must hold 'cache lock' while calling this.
546  * returns 0 on success, otherwise errno */
547 static int forget_file(const char *file)
548 {
549   cache_item_t *ci;
550
551   ci = g_tree_lookup(cache_tree, file);
552   if (ci == NULL)
553     return ENOENT;
554
555   g_tree_remove (cache_tree, file);
556   remove_from_queue(ci);
557
558   for (int i=0; i < ci->values_num; i++)
559     free(ci->values[i]);
560
561   free (ci->values);
562   free (ci->file);
563
564   /* in case anyone is waiting */
565   pthread_cond_broadcast(&ci->flushed);
566
567   free (ci);
568
569   return 0;
570 } /* }}} static int forget_file */
571
572 /*
573  * enqueue_cache_item:
574  * `cache_lock' must be acquired before calling this function!
575  */
576 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
577     queue_side_t side)
578 {
579   if (ci == NULL)
580     return (-1);
581
582   if (ci->values_num == 0)
583     return (0);
584
585   if (side == HEAD)
586   {
587     if (cache_queue_head == ci)
588       return 0;
589
590     /* remove from the double linked list */
591     if (ci->flags & CI_FLAGS_IN_QUEUE)
592       remove_from_queue(ci);
593
594     ci->prev = NULL;
595     ci->next = cache_queue_head;
596     if (ci->next != NULL)
597       ci->next->prev = ci;
598     cache_queue_head = ci;
599
600     if (cache_queue_tail == NULL)
601       cache_queue_tail = cache_queue_head;
602   }
603   else /* (side == TAIL) */
604   {
605     /* We don't move values back in the list.. */
606     if (ci->flags & CI_FLAGS_IN_QUEUE)
607       return (0);
608
609     assert (ci->next == NULL);
610     assert (ci->prev == NULL);
611
612     ci->prev = cache_queue_tail;
613
614     if (cache_queue_tail == NULL)
615       cache_queue_head = ci;
616     else
617       cache_queue_tail->next = ci;
618
619     cache_queue_tail = ci;
620   }
621
622   ci->flags |= CI_FLAGS_IN_QUEUE;
623
624   pthread_cond_broadcast(&cache_cond);
625   pthread_mutex_lock (&stats_lock);
626   stats_queue_length++;
627   pthread_mutex_unlock (&stats_lock);
628
629   return (0);
630 } /* }}} int enqueue_cache_item */
631
632 /*
633  * tree_callback_flush:
634  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
635  * while this is in progress.
636  */
637 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
638     gpointer data)
639 {
640   cache_item_t *ci;
641   callback_flush_data_t *cfd;
642
643   ci = (cache_item_t *) value;
644   cfd = (callback_flush_data_t *) data;
645
646   if ((ci->last_flush_time <= cfd->abs_timeout)
647       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
648       && (ci->values_num > 0))
649   {
650     enqueue_cache_item (ci, TAIL);
651   }
652   else if ((do_shutdown != 0)
653       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
654       && (ci->values_num > 0))
655   {
656     enqueue_cache_item (ci, TAIL);
657   }
658   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
659       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
660       && (ci->values_num <= 0))
661   {
662     char **temp;
663
664     temp = (char **) realloc (cfd->keys,
665         sizeof (char *) * (cfd->keys_num + 1));
666     if (temp == NULL)
667     {
668       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
669       return (FALSE);
670     }
671     cfd->keys = temp;
672     /* Make really sure this points to the _same_ place */
673     assert ((char *) key == ci->file);
674     cfd->keys[cfd->keys_num] = (char *) key;
675     cfd->keys_num++;
676   }
677
678   return (FALSE);
679 } /* }}} gboolean tree_callback_flush */
680
681 static int flush_old_values (int max_age)
682 {
683   callback_flush_data_t cfd;
684   size_t k;
685
686   memset (&cfd, 0, sizeof (cfd));
687   /* Pass the current time as user data so that we don't need to call
688    * `time' for each node. */
689   cfd.now = time (NULL);
690   cfd.keys = NULL;
691   cfd.keys_num = 0;
692
693   if (max_age > 0)
694     cfd.abs_timeout = cfd.now - max_age;
695   else
696     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
697
698   /* `tree_callback_flush' will return the keys of all values that haven't
699    * been touched in the last `config_flush_interval' seconds in `cfd'.
700    * The char*'s in this array point to the same memory as ci->file, so we
701    * don't need to free them separately. */
702   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
703
704   for (k = 0; k < cfd.keys_num; k++)
705   {
706     /* should never fail, since we have held the cache_lock
707      * the entire time */
708     assert( forget_file(cfd.keys[k]) == 0 );
709   }
710
711   if (cfd.keys != NULL)
712   {
713     free (cfd.keys);
714     cfd.keys = NULL;
715   }
716
717   return (0);
718 } /* int flush_old_values */
719
720 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
721 {
722   struct timeval now;
723   struct timespec next_flush;
724   int final_flush = 0; /* make sure we only flush once on shutdown */
725
726   gettimeofday (&now, NULL);
727   next_flush.tv_sec = now.tv_sec + config_flush_interval;
728   next_flush.tv_nsec = 1000 * now.tv_usec;
729
730   pthread_mutex_lock (&cache_lock);
731   while ((do_shutdown == 0) || (cache_queue_head != NULL))
732   {
733     cache_item_t *ci;
734     char *file;
735     char **values;
736     int values_num;
737     int status;
738     int i;
739
740     /* First, check if it's time to do the cache flush. */
741     gettimeofday (&now, NULL);
742     if ((now.tv_sec > next_flush.tv_sec)
743         || ((now.tv_sec == next_flush.tv_sec)
744           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
745     {
746       /* Flush all values that haven't been written in the last
747        * `config_write_interval' seconds. */
748       flush_old_values (config_write_interval);
749
750       /* Determine the time of the next cache flush. */
751       while (next_flush.tv_sec <= now.tv_sec)
752         next_flush.tv_sec += config_flush_interval;
753
754       /* unlock the cache while we rotate so we don't block incoming
755        * updates if the fsync() blocks on disk I/O */
756       pthread_mutex_unlock(&cache_lock);
757       journal_rotate();
758       pthread_mutex_lock(&cache_lock);
759     }
760
761     /* Now, check if there's something to store away. If not, wait until
762      * something comes in or it's time to do the cache flush.  if we are
763      * shutting down, do not wait around.  */
764     if (cache_queue_head == NULL && !do_shutdown)
765     {
766       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
767       if ((status != 0) && (status != ETIMEDOUT))
768       {
769         RRDD_LOG (LOG_ERR, "queue_thread_main: "
770             "pthread_cond_timedwait returned %i.", status);
771       }
772     }
773
774     /* We're about to shut down */
775     if (do_shutdown != 0 && !final_flush++)
776     {
777       if (config_flush_at_shutdown)
778         flush_old_values (-1); /* flush everything */
779       else
780         break;
781     }
782
783     /* Check if a value has arrived. This may be NULL if we timed out or there
784      * was an interrupt such as a signal. */
785     if (cache_queue_head == NULL)
786       continue;
787
788     ci = cache_queue_head;
789
790     /* copy the relevant parts */
791     file = strdup (ci->file);
792     if (file == NULL)
793     {
794       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
795       continue;
796     }
797
798     assert(ci->values != NULL);
799     assert(ci->values_num > 0);
800
801     values = ci->values;
802     values_num = ci->values_num;
803
804     wipe_ci_values(ci, time(NULL));
805     remove_from_queue(ci);
806
807     pthread_mutex_lock (&stats_lock);
808     assert (stats_queue_length > 0);
809     stats_queue_length--;
810     pthread_mutex_unlock (&stats_lock);
811
812     pthread_mutex_unlock (&cache_lock);
813
814     rrd_clear_error ();
815     status = rrd_update_r (file, NULL, values_num, (void *) values);
816     if (status != 0)
817     {
818       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
819           "rrd_update_r (%s) failed with status %i. (%s)",
820           file, status, rrd_get_error());
821     }
822
823     journal_write("wrote", file);
824     pthread_cond_broadcast(&ci->flushed);
825
826     for (i = 0; i < values_num; i++)
827       free (values[i]);
828
829     free(values);
830     free(file);
831
832     if (status == 0)
833     {
834       pthread_mutex_lock (&stats_lock);
835       stats_updates_written++;
836       stats_data_sets_written += values_num;
837       pthread_mutex_unlock (&stats_lock);
838     }
839
840     pthread_mutex_lock (&cache_lock);
841
842     /* We're about to shut down */
843     if (do_shutdown != 0 && !final_flush++)
844     {
845       if (config_flush_at_shutdown)
846           flush_old_values (-1); /* flush everything */
847       else
848         break;
849     }
850   } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
851   pthread_mutex_unlock (&cache_lock);
852
853   if (config_flush_at_shutdown)
854   {
855     assert(cache_queue_head == NULL);
856     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
857   }
858
859   journal_done();
860
861   return (NULL);
862 } /* }}} void *queue_thread_main */
863
864 static int buffer_get_field (char **buffer_ret, /* {{{ */
865     size_t *buffer_size_ret, char **field_ret)
866 {
867   char *buffer;
868   size_t buffer_pos;
869   size_t buffer_size;
870   char *field;
871   size_t field_size;
872   int status;
873
874   buffer = *buffer_ret;
875   buffer_pos = 0;
876   buffer_size = *buffer_size_ret;
877   field = *buffer_ret;
878   field_size = 0;
879
880   if (buffer_size <= 0)
881     return (-1);
882
883   /* This is ensured by `handle_request'. */
884   assert (buffer[buffer_size - 1] == '\0');
885
886   status = -1;
887   while (buffer_pos < buffer_size)
888   {
889     /* Check for end-of-field or end-of-buffer */
890     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
891     {
892       field[field_size] = 0;
893       field_size++;
894       buffer_pos++;
895       status = 0;
896       break;
897     }
898     /* Handle escaped characters. */
899     else if (buffer[buffer_pos] == '\\')
900     {
901       if (buffer_pos >= (buffer_size - 1))
902         break;
903       buffer_pos++;
904       field[field_size] = buffer[buffer_pos];
905       field_size++;
906       buffer_pos++;
907     }
908     /* Normal operation */ 
909     else
910     {
911       field[field_size] = buffer[buffer_pos];
912       field_size++;
913       buffer_pos++;
914     }
915   } /* while (buffer_pos < buffer_size) */
916
917   if (status != 0)
918     return (status);
919
920   *buffer_ret = buffer + buffer_pos;
921   *buffer_size_ret = buffer_size - buffer_pos;
922   *field_ret = field;
923
924   return (0);
925 } /* }}} int buffer_get_field */
926
927 /* if we're restricting writes to the base directory,
928  * check whether the file falls within the dir
929  * returns 1 if OK, otherwise 0
930  */
931 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
932 {
933   assert(file != NULL);
934
935   if (!config_write_base_only
936       || sock == NULL /* journal replay */
937       || config_base_dir == NULL)
938     return 1;
939
940   if (strstr(file, "../") != NULL) goto err;
941
942   /* relative paths without "../" are ok */
943   if (*file != '/') return 1;
944
945   /* file must be of the format base + "/" + <1+ char filename> */
946   if (strlen(file) < _config_base_dir_len + 2) goto err;
947   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
948   if (*(file + _config_base_dir_len) != '/') goto err;
949
950   return 1;
951
952 err:
953   if (sock != NULL && sock->fd >= 0)
954     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
955
956   return 0;
957 } /* }}} static int check_file_access */
958
959 /* returns 1 if we have the required privilege level,
960  * otherwise issue an error to the user on sock */
961 static int has_privilege (listen_socket_t *sock, /* {{{ */
962                           socket_privilege priv)
963 {
964   if (sock == NULL) /* journal replay */
965     return 1;
966
967   if (sock->privilege >= priv)
968     return 1;
969
970   return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
971 } /* }}} static int has_privilege */
972
973 static int flush_file (const char *filename) /* {{{ */
974 {
975   cache_item_t *ci;
976
977   pthread_mutex_lock (&cache_lock);
978
979   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
980   if (ci == NULL)
981   {
982     pthread_mutex_unlock (&cache_lock);
983     return (ENOENT);
984   }
985
986   if (ci->values_num > 0)
987   {
988     /* Enqueue at head */
989     enqueue_cache_item (ci, HEAD);
990     pthread_cond_wait(&ci->flushed, &cache_lock);
991   }
992
993   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
994    * may have been purged during our cond_wait() */
995
996   pthread_mutex_unlock(&cache_lock);
997
998   return (0);
999 } /* }}} int flush_file */
1000
1001 static int handle_request_help (listen_socket_t *sock, /* {{{ */
1002     char *buffer, size_t buffer_size)
1003 {
1004   int status;
1005   char **help_text;
1006   char *command;
1007
1008   char *help_help[2] =
1009   {
1010     "Command overview\n"
1011     ,
1012     "HELP [<command>]\n"
1013     "FLUSH <filename>\n"
1014     "FLUSHALL\n"
1015     "PENDING <filename>\n"
1016     "FORGET <filename>\n"
1017     "UPDATE <filename> <values> [<values> ...]\n"
1018     "BATCH\n"
1019     "STATS\n"
1020   };
1021
1022   char *help_flush[2] =
1023   {
1024     "Help for FLUSH\n"
1025     ,
1026     "Usage: FLUSH <filename>\n"
1027     "\n"
1028     "Adds the given filename to the head of the update queue and returns\n"
1029     "after is has been dequeued.\n"
1030   };
1031
1032   char *help_flushall[2] =
1033   {
1034     "Help for FLUSHALL\n"
1035     ,
1036     "Usage: FLUSHALL\n"
1037     "\n"
1038     "Triggers writing of all pending updates.  Returns immediately.\n"
1039   };
1040
1041   char *help_pending[2] =
1042   {
1043     "Help for PENDING\n"
1044     ,
1045     "Usage: PENDING <filename>\n"
1046     "\n"
1047     "Shows any 'pending' updates for a file, in order.\n"
1048     "The updates shown have not yet been written to the underlying RRD file.\n"
1049   };
1050
1051   char *help_forget[2] =
1052   {
1053     "Help for FORGET\n"
1054     ,
1055     "Usage: FORGET <filename>\n"
1056     "\n"
1057     "Removes the file completely from the cache.\n"
1058     "Any pending updates for the file will be lost.\n"
1059   };
1060
1061   char *help_update[2] =
1062   {
1063     "Help for UPDATE\n"
1064     ,
1065     "Usage: UPDATE <filename> <values> [<values> ...]\n"
1066     "\n"
1067     "Adds the given file to the internal cache if it is not yet known and\n"
1068     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1069     "for details.\n"
1070     "\n"
1071     "Each <values> has the following form:\n"
1072     "  <values> = <time>:<value>[:<value>[...]]\n"
1073     "See the rrdupdate(1) manpage for details.\n"
1074   };
1075
1076   char *help_stats[2] =
1077   {
1078     "Help for STATS\n"
1079     ,
1080     "Usage: STATS\n"
1081     "\n"
1082     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1083     "a description of the values.\n"
1084   };
1085
1086   char *help_batch[2] =
1087   {
1088     "Help for BATCH\n"
1089     ,
1090     "The 'BATCH' command permits the client to initiate a bulk load\n"
1091     "   of commands to rrdcached.\n"
1092     "\n"
1093     "Usage:\n"
1094     "\n"
1095     "    client: BATCH\n"
1096     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1097     "    client: command #1\n"
1098     "    client: command #2\n"
1099     "    client: ... and so on\n"
1100     "    client: .\n"
1101     "    server: 2 errors\n"
1102     "    server: 7 message for command #7\n"
1103     "    server: 9 message for command #9\n"
1104     "\n"
1105     "For more information, consult the rrdcached(1) documentation.\n"
1106   };
1107
1108   status = buffer_get_field (&buffer, &buffer_size, &command);
1109   if (status != 0)
1110     help_text = help_help;
1111   else
1112   {
1113     if (strcasecmp (command, "update") == 0)
1114       help_text = help_update;
1115     else if (strcasecmp (command, "flush") == 0)
1116       help_text = help_flush;
1117     else if (strcasecmp (command, "flushall") == 0)
1118       help_text = help_flushall;
1119     else if (strcasecmp (command, "pending") == 0)
1120       help_text = help_pending;
1121     else if (strcasecmp (command, "forget") == 0)
1122       help_text = help_forget;
1123     else if (strcasecmp (command, "stats") == 0)
1124       help_text = help_stats;
1125     else if (strcasecmp (command, "batch") == 0)
1126       help_text = help_batch;
1127     else
1128       help_text = help_help;
1129   }
1130
1131   add_response_info(sock, help_text[1]);
1132   return send_response(sock, RESP_OK, help_text[0]);
1133 } /* }}} int handle_request_help */
1134
1135 static int handle_request_stats (listen_socket_t *sock) /* {{{ */
1136 {
1137   uint64_t copy_queue_length;
1138   uint64_t copy_updates_received;
1139   uint64_t copy_flush_received;
1140   uint64_t copy_updates_written;
1141   uint64_t copy_data_sets_written;
1142   uint64_t copy_journal_bytes;
1143   uint64_t copy_journal_rotate;
1144
1145   uint64_t tree_nodes_number;
1146   uint64_t tree_depth;
1147
1148   pthread_mutex_lock (&stats_lock);
1149   copy_queue_length       = stats_queue_length;
1150   copy_updates_received   = stats_updates_received;
1151   copy_flush_received     = stats_flush_received;
1152   copy_updates_written    = stats_updates_written;
1153   copy_data_sets_written  = stats_data_sets_written;
1154   copy_journal_bytes      = stats_journal_bytes;
1155   copy_journal_rotate     = stats_journal_rotate;
1156   pthread_mutex_unlock (&stats_lock);
1157
1158   pthread_mutex_lock (&cache_lock);
1159   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1160   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1161   pthread_mutex_unlock (&cache_lock);
1162
1163   add_response_info(sock,
1164                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1165   add_response_info(sock,
1166                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1167   add_response_info(sock,
1168                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1169   add_response_info(sock,
1170                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1171   add_response_info(sock,
1172                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1173   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1174   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1175   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1176   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1177
1178   send_response(sock, RESP_OK, "Statistics follow\n");
1179
1180   return (0);
1181 } /* }}} int handle_request_stats */
1182
1183 static int handle_request_flush (listen_socket_t *sock, /* {{{ */
1184     char *buffer, size_t buffer_size)
1185 {
1186   char *file;
1187   int status;
1188
1189   status = buffer_get_field (&buffer, &buffer_size, &file);
1190   if (status != 0)
1191   {
1192     return send_response(sock, RESP_ERR, "Usage: flush <filename>\n");
1193   }
1194   else
1195   {
1196     pthread_mutex_lock(&stats_lock);
1197     stats_flush_received++;
1198     pthread_mutex_unlock(&stats_lock);
1199
1200     if (!check_file_access(file, sock)) return 0;
1201
1202     status = flush_file (file);
1203     if (status == 0)
1204       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1205     else if (status == ENOENT)
1206     {
1207       /* no file in our tree; see whether it exists at all */
1208       struct stat statbuf;
1209
1210       memset(&statbuf, 0, sizeof(statbuf));
1211       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1212         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1213       else
1214         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1215     }
1216     else if (status < 0)
1217       return send_response(sock, RESP_ERR, "Internal error.\n");
1218     else
1219       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1220   }
1221
1222   /* NOTREACHED */
1223   assert(1==0);
1224 } /* }}} int handle_request_slurp */
1225
1226 static int handle_request_flushall(listen_socket_t *sock) /* {{{ */
1227 {
1228   int status;
1229
1230   status = has_privilege(sock, PRIV_HIGH);
1231   if (status <= 0)
1232     return status;
1233
1234   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1235
1236   pthread_mutex_lock(&cache_lock);
1237   flush_old_values(-1);
1238   pthread_mutex_unlock(&cache_lock);
1239
1240   return send_response(sock, RESP_OK, "Started flush.\n");
1241 } /* }}} static int handle_request_flushall */
1242
1243 static int handle_request_pending(listen_socket_t *sock, /* {{{ */
1244                                   char *buffer, size_t buffer_size)
1245 {
1246   int status;
1247   char *file;
1248   cache_item_t *ci;
1249
1250   status = buffer_get_field(&buffer, &buffer_size, &file);
1251   if (status != 0)
1252     return send_response(sock, RESP_ERR,
1253                          "Usage: PENDING <filename>\n");
1254
1255   status = has_privilege(sock, PRIV_HIGH);
1256   if (status <= 0)
1257     return status;
1258
1259   pthread_mutex_lock(&cache_lock);
1260   ci = g_tree_lookup(cache_tree, file);
1261   if (ci == NULL)
1262   {
1263     pthread_mutex_unlock(&cache_lock);
1264     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1265   }
1266
1267   for (int i=0; i < ci->values_num; i++)
1268     add_response_info(sock, "%s\n", ci->values[i]);
1269
1270   pthread_mutex_unlock(&cache_lock);
1271   return send_response(sock, RESP_OK, "updates pending\n");
1272 } /* }}} static int handle_request_pending */
1273
1274 static int handle_request_forget(listen_socket_t *sock, /* {{{ */
1275                                  char *buffer, size_t buffer_size)
1276 {
1277   int status;
1278   char *file;
1279
1280   status = buffer_get_field(&buffer, &buffer_size, &file);
1281   if (status != 0)
1282     return send_response(sock, RESP_ERR,
1283                          "Usage: FORGET <filename>\n");
1284
1285   status = has_privilege(sock, PRIV_HIGH);
1286   if (status <= 0)
1287     return status;
1288
1289   if (!check_file_access(file, sock)) return 0;
1290
1291   pthread_mutex_lock(&cache_lock);
1292   status = forget_file(file);
1293   pthread_mutex_unlock(&cache_lock);
1294
1295   if (status == 0)
1296   {
1297     if (sock != NULL)
1298       journal_write("forget", file);
1299
1300     return send_response(sock, RESP_OK, "Gone!\n");
1301   }
1302   else
1303     return send_response(sock, RESP_ERR, "cannot forget: %s\n",
1304                          status < 0 ? "Internal error" : rrd_strerror(status));
1305
1306   /* NOTREACHED */
1307   assert(1==0);
1308 } /* }}} static int handle_request_forget */
1309
1310 static int handle_request_update (listen_socket_t *sock, /* {{{ */
1311     char *buffer, size_t buffer_size)
1312 {
1313   char *file;
1314   int values_num = 0;
1315   int bad_timestamps = 0;
1316   int status;
1317   char orig_buf[CMD_MAX];
1318
1319   time_t now;
1320   cache_item_t *ci;
1321
1322   now = time (NULL);
1323
1324   status = has_privilege(sock, PRIV_HIGH);
1325   if (status <= 0)
1326     return status;
1327
1328   /* save it for the journal later */
1329   strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1330
1331   status = buffer_get_field (&buffer, &buffer_size, &file);
1332   if (status != 0)
1333     return send_response(sock, RESP_ERR,
1334                          "Usage: UPDATE <filename> <values> [<values> ...]\n");
1335
1336   pthread_mutex_lock(&stats_lock);
1337   stats_updates_received++;
1338   pthread_mutex_unlock(&stats_lock);
1339
1340   if (!check_file_access(file, sock)) return 0;
1341
1342   pthread_mutex_lock (&cache_lock);
1343   ci = g_tree_lookup (cache_tree, file);
1344
1345   if (ci == NULL) /* {{{ */
1346   {
1347     struct stat statbuf;
1348
1349     /* don't hold the lock while we setup; stat(2) might block */
1350     pthread_mutex_unlock(&cache_lock);
1351
1352     memset (&statbuf, 0, sizeof (statbuf));
1353     status = stat (file, &statbuf);
1354     if (status != 0)
1355     {
1356       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1357
1358       status = errno;
1359       if (status == ENOENT)
1360         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1361       else
1362         return send_response(sock, RESP_ERR,
1363                              "stat failed with error %i.\n", status);
1364     }
1365     if (!S_ISREG (statbuf.st_mode))
1366       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1367
1368     if (access(file, R_OK|W_OK) != 0)
1369       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1370                            file, rrd_strerror(errno));
1371
1372     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1373     if (ci == NULL)
1374     {
1375       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1376
1377       return send_response(sock, RESP_ERR, "malloc failed.\n");
1378     }
1379     memset (ci, 0, sizeof (cache_item_t));
1380
1381     ci->file = strdup (file);
1382     if (ci->file == NULL)
1383     {
1384       free (ci);
1385       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1386
1387       return send_response(sock, RESP_ERR, "strdup failed.\n");
1388     }
1389
1390     wipe_ci_values(ci, now);
1391     ci->flags = CI_FLAGS_IN_TREE;
1392
1393     pthread_mutex_lock(&cache_lock);
1394     g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1395   } /* }}} */
1396   assert (ci != NULL);
1397
1398   /* don't re-write updates in replay mode */
1399   if (sock != NULL)
1400     journal_write("update", orig_buf);
1401
1402   while (buffer_size > 0)
1403   {
1404     char **temp;
1405     char *value;
1406     time_t stamp;
1407     char *eostamp;
1408
1409     status = buffer_get_field (&buffer, &buffer_size, &value);
1410     if (status != 0)
1411     {
1412       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1413       break;
1414     }
1415
1416     /* make sure update time is always moving forward */
1417     stamp = strtol(value, &eostamp, 10);
1418     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1419     {
1420       ++bad_timestamps;
1421       add_response_info(sock, "Cannot find timestamp in '%s'!\n", value);
1422       continue;
1423     }
1424     else if (stamp <= ci->last_update_stamp)
1425     {
1426       ++bad_timestamps;
1427       add_response_info(sock,
1428                         "illegal attempt to update using time %ld when"
1429                         " last update time is %ld (minimum one second step)\n",
1430                         stamp, ci->last_update_stamp);
1431       continue;
1432     }
1433     else
1434       ci->last_update_stamp = stamp;
1435
1436     temp = (char **) realloc (ci->values,
1437         sizeof (char *) * (ci->values_num + 1));
1438     if (temp == NULL)
1439     {
1440       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1441       continue;
1442     }
1443     ci->values = temp;
1444
1445     ci->values[ci->values_num] = strdup (value);
1446     if (ci->values[ci->values_num] == NULL)
1447     {
1448       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1449       continue;
1450     }
1451     ci->values_num++;
1452
1453     values_num++;
1454   }
1455
1456   if (((now - ci->last_flush_time) >= config_write_interval)
1457       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1458       && (ci->values_num > 0))
1459   {
1460     enqueue_cache_item (ci, TAIL);
1461   }
1462
1463   pthread_mutex_unlock (&cache_lock);
1464
1465   if (values_num < 1)
1466   {
1467     /* if we had only one update attempt, then return the full
1468        error message... try to get the most information out
1469        of the limited error space allowed by the protocol
1470     */
1471     if (bad_timestamps == 1)
1472       return send_response(sock, RESP_ERR, "%s", sock->wbuf);
1473     else
1474       return send_response(sock, RESP_ERR,
1475                            "No values updated (%d bad timestamps).\n",
1476                            bad_timestamps);
1477   }
1478   else
1479     return send_response(sock, RESP_OK,
1480                          "errors, enqueued %i value(s).\n", values_num);
1481
1482   /* NOTREACHED */
1483   assert(1==0);
1484
1485 } /* }}} int handle_request_update */
1486
1487 /* we came across a "WROTE" entry during journal replay.
1488  * throw away any values that we have accumulated for this file
1489  */
1490 static int handle_request_wrote (const char *buffer) /* {{{ */
1491 {
1492   int i;
1493   cache_item_t *ci;
1494   const char *file = buffer;
1495
1496   pthread_mutex_lock(&cache_lock);
1497
1498   ci = g_tree_lookup(cache_tree, file);
1499   if (ci == NULL)
1500   {
1501     pthread_mutex_unlock(&cache_lock);
1502     return (0);
1503   }
1504
1505   if (ci->values)
1506   {
1507     for (i=0; i < ci->values_num; i++)
1508       free(ci->values[i]);
1509
1510     free(ci->values);
1511   }
1512
1513   wipe_ci_values(ci, time(NULL));
1514   remove_from_queue(ci);
1515
1516   pthread_mutex_unlock(&cache_lock);
1517   return (0);
1518 } /* }}} int handle_request_wrote */
1519
1520 /* start "BATCH" processing */
1521 static int batch_start (listen_socket_t *sock) /* {{{ */
1522 {
1523   int status;
1524   if (sock->batch_mode)
1525     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1526
1527   status = send_response(sock, RESP_OK,
1528                          "Go ahead.  End with dot '.' on its own line.\n");
1529   sock->batch_mode = 1;
1530   sock->batch_cmd = 0;
1531
1532   return status;
1533 } /* }}} static int batch_start */
1534
1535 /* finish "BATCH" processing and return results to the client */
1536 static int batch_done (listen_socket_t *sock) /* {{{ */
1537 {
1538   assert(sock->batch_mode);
1539   sock->batch_mode = 0;
1540   sock->batch_cmd  = 0;
1541   return send_response(sock, RESP_OK, "errors\n");
1542 } /* }}} static int batch_done */
1543
1544 /* if sock==NULL, we are in journal replay mode */
1545 static int handle_request (listen_socket_t *sock, /* {{{ */
1546                            char *buffer, size_t buffer_size)
1547 {
1548   char *buffer_ptr;
1549   char *command;
1550   int status;
1551
1552   assert (buffer[buffer_size - 1] == '\0');
1553
1554   buffer_ptr = buffer;
1555   command = NULL;
1556   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1557   if (status != 0)
1558   {
1559     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1560     return (-1);
1561   }
1562
1563   if (sock != NULL && sock->batch_mode)
1564     sock->batch_cmd++;
1565
1566   if (strcasecmp (command, "update") == 0)
1567     return (handle_request_update (sock, buffer_ptr, buffer_size));
1568   else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
1569   {
1570     /* this is only valid in replay mode */
1571     return (handle_request_wrote (buffer_ptr));
1572   }
1573   else if (strcasecmp (command, "flush") == 0)
1574     return (handle_request_flush (sock, buffer_ptr, buffer_size));
1575   else if (strcasecmp (command, "flushall") == 0)
1576     return (handle_request_flushall(sock));
1577   else if (strcasecmp (command, "pending") == 0)
1578     return (handle_request_pending(sock, buffer_ptr, buffer_size));
1579   else if (strcasecmp (command, "forget") == 0)
1580     return (handle_request_forget(sock, buffer_ptr, buffer_size));
1581   else if (strcasecmp (command, "stats") == 0)
1582     return (handle_request_stats (sock));
1583   else if (strcasecmp (command, "help") == 0)
1584     return (handle_request_help (sock, buffer_ptr, buffer_size));
1585   else if (strcasecmp (command, "batch") == 0 && sock != NULL)
1586     return batch_start(sock);
1587   else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_mode)
1588     return batch_done(sock);
1589   else
1590     return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
1591
1592   /* NOTREACHED */
1593   assert(1==0);
1594 } /* }}} int handle_request */
1595
1596 /* MUST NOT hold journal_lock before calling this */
1597 static void journal_rotate(void) /* {{{ */
1598 {
1599   FILE *old_fh = NULL;
1600   int new_fd;
1601
1602   if (journal_cur == NULL || journal_old == NULL)
1603     return;
1604
1605   pthread_mutex_lock(&journal_lock);
1606
1607   /* we rotate this way (rename before close) so that the we can release
1608    * the journal lock as fast as possible.  Journal writes to the new
1609    * journal can proceed immediately after the new file is opened.  The
1610    * fclose can then block without affecting new updates.
1611    */
1612   if (journal_fh != NULL)
1613   {
1614     old_fh = journal_fh;
1615     journal_fh = NULL;
1616     rename(journal_cur, journal_old);
1617     ++stats_journal_rotate;
1618   }
1619
1620   new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1621                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1622   if (new_fd >= 0)
1623   {
1624     journal_fh = fdopen(new_fd, "a");
1625     if (journal_fh == NULL)
1626       close(new_fd);
1627   }
1628
1629   pthread_mutex_unlock(&journal_lock);
1630
1631   if (old_fh != NULL)
1632     fclose(old_fh);
1633
1634   if (journal_fh == NULL)
1635   {
1636     RRDD_LOG(LOG_CRIT,
1637              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1638              journal_cur, rrd_strerror(errno));
1639
1640     RRDD_LOG(LOG_ERR,
1641              "JOURNALING DISABLED: All values will be flushed at shutdown");
1642     config_flush_at_shutdown = 1;
1643   }
1644
1645 } /* }}} static void journal_rotate */
1646
1647 static void journal_done(void) /* {{{ */
1648 {
1649   if (journal_cur == NULL)
1650     return;
1651
1652   pthread_mutex_lock(&journal_lock);
1653   if (journal_fh != NULL)
1654   {
1655     fclose(journal_fh);
1656     journal_fh = NULL;
1657   }
1658
1659   if (config_flush_at_shutdown)
1660   {
1661     RRDD_LOG(LOG_INFO, "removing journals");
1662     unlink(journal_old);
1663     unlink(journal_cur);
1664   }
1665   else
1666   {
1667     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1668              "journals will be used at next startup");
1669   }
1670
1671   pthread_mutex_unlock(&journal_lock);
1672
1673 } /* }}} static void journal_done */
1674
1675 static int journal_write(char *cmd, char *args) /* {{{ */
1676 {
1677   int chars;
1678
1679   if (journal_fh == NULL)
1680     return 0;
1681
1682   pthread_mutex_lock(&journal_lock);
1683   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1684   pthread_mutex_unlock(&journal_lock);
1685
1686   if (chars > 0)
1687   {
1688     pthread_mutex_lock(&stats_lock);
1689     stats_journal_bytes += chars;
1690     pthread_mutex_unlock(&stats_lock);
1691   }
1692
1693   return chars;
1694 } /* }}} static int journal_write */
1695
1696 static int journal_replay (const char *file) /* {{{ */
1697 {
1698   FILE *fh;
1699   int entry_cnt = 0;
1700   int fail_cnt = 0;
1701   uint64_t line = 0;
1702   char entry[CMD_MAX];
1703
1704   if (file == NULL) return 0;
1705
1706   {
1707     char *reason;
1708     int status = 0;
1709     struct stat statbuf;
1710
1711     memset(&statbuf, 0, sizeof(statbuf));
1712     if (stat(file, &statbuf) != 0)
1713     {
1714       if (errno == ENOENT)
1715         return 0;
1716
1717       reason = "stat error";
1718       status = errno;
1719     }
1720     else if (!S_ISREG(statbuf.st_mode))
1721     {
1722       reason = "not a regular file";
1723       status = EPERM;
1724     }
1725     if (statbuf.st_uid != daemon_uid)
1726     {
1727       reason = "not owned by daemon user";
1728       status = EACCES;
1729     }
1730     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1731     {
1732       reason = "must not be user/group writable";
1733       status = EACCES;
1734     }
1735
1736     if (status != 0)
1737     {
1738       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1739                file, rrd_strerror(status), reason);
1740       return 0;
1741     }
1742   }
1743
1744   fh = fopen(file, "r");
1745   if (fh == NULL)
1746   {
1747     if (errno != ENOENT)
1748       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1749                file, rrd_strerror(errno));
1750     return 0;
1751   }
1752   else
1753     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1754
1755   while(!feof(fh))
1756   {
1757     size_t entry_len;
1758
1759     ++line;
1760     if (fgets(entry, sizeof(entry), fh) == NULL)
1761       break;
1762     entry_len = strlen(entry);
1763
1764     /* check \n termination in case journal writing crashed mid-line */
1765     if (entry_len == 0)
1766       continue;
1767     else if (entry[entry_len - 1] != '\n')
1768     {
1769       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1770       ++fail_cnt;
1771       continue;
1772     }
1773
1774     entry[entry_len - 1] = '\0';
1775
1776     if (handle_request(NULL, entry, entry_len) == 0)
1777       ++entry_cnt;
1778     else
1779       ++fail_cnt;
1780   }
1781
1782   fclose(fh);
1783
1784   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1785            entry_cnt, fail_cnt);
1786
1787   return entry_cnt > 0 ? 1 : 0;
1788 } /* }}} static int journal_replay */
1789
1790 static void journal_init(void) /* {{{ */
1791 {
1792   int had_journal = 0;
1793
1794   if (journal_cur == NULL) return;
1795
1796   pthread_mutex_lock(&journal_lock);
1797
1798   RRDD_LOG(LOG_INFO, "checking for journal files");
1799
1800   had_journal += journal_replay(journal_old);
1801   had_journal += journal_replay(journal_cur);
1802
1803   /* it must have been a crash.  start a flush */
1804   if (had_journal && config_flush_at_shutdown)
1805     flush_old_values(-1);
1806
1807   pthread_mutex_unlock(&journal_lock);
1808   journal_rotate();
1809
1810   RRDD_LOG(LOG_INFO, "journal processing complete");
1811
1812 } /* }}} static void journal_init */
1813
1814 static void close_connection(listen_socket_t *sock)
1815 {
1816   close(sock->fd) ;  sock->fd   = -1;
1817   free(sock->rbuf);  sock->rbuf = NULL;
1818   free(sock->wbuf);  sock->wbuf = NULL;
1819
1820   free(sock);
1821 }
1822
1823 static void *connection_thread_main (void *args) /* {{{ */
1824 {
1825   pthread_t self;
1826   listen_socket_t *sock;
1827   int i;
1828   int fd;
1829
1830   sock = (listen_socket_t *) args;
1831   fd = sock->fd;
1832
1833   /* init read buffers */
1834   sock->next_read = sock->next_cmd = 0;
1835   sock->rbuf = malloc(RBUF_SIZE);
1836   if (sock->rbuf == NULL)
1837   {
1838     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1839     close_connection(sock);
1840     return NULL;
1841   }
1842
1843   pthread_mutex_lock (&connection_threads_lock);
1844   {
1845     pthread_t *temp;
1846
1847     temp = (pthread_t *) realloc (connection_threads,
1848         sizeof (pthread_t) * (connection_threads_num + 1));
1849     if (temp == NULL)
1850     {
1851       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1852     }
1853     else
1854     {
1855       connection_threads = temp;
1856       connection_threads[connection_threads_num] = pthread_self ();
1857       connection_threads_num++;
1858     }
1859   }
1860   pthread_mutex_unlock (&connection_threads_lock);
1861
1862   while (do_shutdown == 0)
1863   {
1864     char *cmd;
1865     ssize_t cmd_len;
1866     ssize_t rbytes;
1867
1868     struct pollfd pollfd;
1869     int status;
1870
1871     pollfd.fd = fd;
1872     pollfd.events = POLLIN | POLLPRI;
1873     pollfd.revents = 0;
1874
1875     status = poll (&pollfd, 1, /* timeout = */ 500);
1876     if (do_shutdown)
1877       break;
1878     else if (status == 0) /* timeout */
1879       continue;
1880     else if (status < 0) /* error */
1881     {
1882       status = errno;
1883       if (status != EINTR)
1884         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1885       continue;
1886     }
1887
1888     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1889       break;
1890     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1891     {
1892       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1893           "poll(2) returned something unexpected: %#04hx",
1894           pollfd.revents);
1895       break;
1896     }
1897
1898     rbytes = read(fd, sock->rbuf + sock->next_read,
1899                   RBUF_SIZE - sock->next_read);
1900     if (rbytes < 0)
1901     {
1902       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
1903       break;
1904     }
1905     else if (rbytes == 0)
1906       break; /* eof */
1907
1908     sock->next_read += rbytes;
1909
1910     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
1911     {
1912       status = handle_request (sock, cmd, cmd_len+1);
1913       if (status != 0)
1914         goto out_close;
1915     }
1916   }
1917
1918 out_close:
1919   close_connection(sock);
1920
1921   self = pthread_self ();
1922   /* Remove this thread from the connection threads list */
1923   pthread_mutex_lock (&connection_threads_lock);
1924   /* Find out own index in the array */
1925   for (i = 0; i < connection_threads_num; i++)
1926     if (pthread_equal (connection_threads[i], self) != 0)
1927       break;
1928   assert (i < connection_threads_num);
1929
1930   /* Move the trailing threads forward. */
1931   if (i < (connection_threads_num - 1))
1932   {
1933     memmove (connection_threads + i,
1934         connection_threads + i + 1,
1935         sizeof (pthread_t) * (connection_threads_num - i - 1));
1936   }
1937
1938   connection_threads_num--;
1939   pthread_mutex_unlock (&connection_threads_lock);
1940
1941   return (NULL);
1942 } /* }}} void *connection_thread_main */
1943
1944 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
1945 {
1946   int fd;
1947   struct sockaddr_un sa;
1948   listen_socket_t *temp;
1949   int status;
1950   const char *path;
1951
1952   path = sock->addr;
1953   if (strncmp(path, "unix:", strlen("unix:")) == 0)
1954     path += strlen("unix:");
1955
1956   temp = (listen_socket_t *) realloc (listen_fds,
1957       sizeof (listen_fds[0]) * (listen_fds_num + 1));
1958   if (temp == NULL)
1959   {
1960     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1961     return (-1);
1962   }
1963   listen_fds = temp;
1964   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
1965
1966   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
1967   if (fd < 0)
1968   {
1969     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
1970     return (-1);
1971   }
1972
1973   memset (&sa, 0, sizeof (sa));
1974   sa.sun_family = AF_UNIX;
1975   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
1976
1977   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
1978   if (status != 0)
1979   {
1980     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1981     close (fd);
1982     unlink (path);
1983     return (-1);
1984   }
1985
1986   status = listen (fd, /* backlog = */ 10);
1987   if (status != 0)
1988   {
1989     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
1990     close (fd);
1991     unlink (path);
1992     return (-1);
1993   }
1994
1995   listen_fds[listen_fds_num].fd = fd;
1996   listen_fds[listen_fds_num].family = PF_UNIX;
1997   strncpy(listen_fds[listen_fds_num].addr, path,
1998           sizeof (listen_fds[listen_fds_num].addr) - 1);
1999   listen_fds_num++;
2000
2001   return (0);
2002 } /* }}} int open_listen_socket_unix */
2003
2004 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2005 {
2006   struct addrinfo ai_hints;
2007   struct addrinfo *ai_res;
2008   struct addrinfo *ai_ptr;
2009   char addr_copy[NI_MAXHOST];
2010   char *addr;
2011   char *port;
2012   int status;
2013
2014   strncpy (addr_copy, sock->addr, sizeof (addr_copy));
2015   addr_copy[sizeof (addr_copy) - 1] = 0;
2016   addr = addr_copy;
2017
2018   memset (&ai_hints, 0, sizeof (ai_hints));
2019   ai_hints.ai_flags = 0;
2020 #ifdef AI_ADDRCONFIG
2021   ai_hints.ai_flags |= AI_ADDRCONFIG;
2022 #endif
2023   ai_hints.ai_family = AF_UNSPEC;
2024   ai_hints.ai_socktype = SOCK_STREAM;
2025
2026   port = NULL;
2027   if (*addr == '[') /* IPv6+port format */
2028   {
2029     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2030     addr++;
2031
2032     port = strchr (addr, ']');
2033     if (port == NULL)
2034     {
2035       RRDD_LOG (LOG_ERR, "open_listen_socket_network: Malformed address: %s",
2036           sock->addr);
2037       return (-1);
2038     }
2039     *port = 0;
2040     port++;
2041
2042     if (*port == ':')
2043       port++;
2044     else if (*port == 0)
2045       port = NULL;
2046     else
2047     {
2048       RRDD_LOG (LOG_ERR, "open_listen_socket_network: Garbage after address: %s",
2049           port);
2050       return (-1);
2051     }
2052   } /* if (*addr = ']') */
2053   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2054   {
2055     port = rindex(addr, ':');
2056     if (port != NULL)
2057     {
2058       *port = 0;
2059       port++;
2060     }
2061   }
2062   ai_res = NULL;
2063   status = getaddrinfo (addr,
2064                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2065                         &ai_hints, &ai_res);
2066   if (status != 0)
2067   {
2068     RRDD_LOG (LOG_ERR, "open_listen_socket_network: getaddrinfo(%s) failed: "
2069         "%s", addr, gai_strerror (status));
2070     return (-1);
2071   }
2072
2073   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2074   {
2075     int fd;
2076     listen_socket_t *temp;
2077     int one = 1;
2078
2079     temp = (listen_socket_t *) realloc (listen_fds,
2080         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2081     if (temp == NULL)
2082     {
2083       RRDD_LOG (LOG_ERR, "open_listen_socket_network: realloc failed.");
2084       continue;
2085     }
2086     listen_fds = temp;
2087     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2088
2089     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2090     if (fd < 0)
2091     {
2092       RRDD_LOG (LOG_ERR, "open_listen_socket_network: socket(2) failed.");
2093       continue;
2094     }
2095
2096     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2097
2098     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2099     if (status != 0)
2100     {
2101       RRDD_LOG (LOG_ERR, "open_listen_socket_network: bind(2) failed.");
2102       close (fd);
2103       continue;
2104     }
2105
2106     status = listen (fd, /* backlog = */ 10);
2107     if (status != 0)
2108     {
2109       RRDD_LOG (LOG_ERR, "open_listen_socket_network: listen(2) failed.");
2110       close (fd);
2111       return (-1);
2112     }
2113
2114     listen_fds[listen_fds_num].fd = fd;
2115     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2116     listen_fds_num++;
2117   } /* for (ai_ptr) */
2118
2119   return (0);
2120 } /* }}} static int open_listen_socket_network */
2121
2122 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2123 {
2124   assert(sock != NULL);
2125   assert(sock->addr != NULL);
2126
2127   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2128       || sock->addr[0] == '/')
2129     return (open_listen_socket_unix(sock));
2130   else
2131     return (open_listen_socket_network(sock));
2132 } /* }}} int open_listen_socket */
2133
2134 static int close_listen_sockets (void) /* {{{ */
2135 {
2136   size_t i;
2137
2138   for (i = 0; i < listen_fds_num; i++)
2139   {
2140     close (listen_fds[i].fd);
2141
2142     if (listen_fds[i].family == PF_UNIX)
2143       unlink(listen_fds[i].addr);
2144   }
2145
2146   free (listen_fds);
2147   listen_fds = NULL;
2148   listen_fds_num = 0;
2149
2150   return (0);
2151 } /* }}} int close_listen_sockets */
2152
2153 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2154 {
2155   struct pollfd *pollfds;
2156   int pollfds_num;
2157   int status;
2158   int i;
2159
2160   for (i = 0; i < config_listen_address_list_len; i++)
2161     open_listen_socket (config_listen_address_list[i]);
2162
2163   if (config_listen_address_list_len < 1)
2164   {
2165     listen_socket_t sock;
2166     memset(&sock, 0, sizeof(sock));
2167     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
2168     open_listen_socket (&sock);
2169   }
2170
2171   if (listen_fds_num < 1)
2172   {
2173     RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
2174         "could be opened. Sorry.");
2175     return (NULL);
2176   }
2177
2178   pollfds_num = listen_fds_num;
2179   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2180   if (pollfds == NULL)
2181   {
2182     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2183     return (NULL);
2184   }
2185   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2186
2187   RRDD_LOG(LOG_INFO, "listening for connections");
2188
2189   while (do_shutdown == 0)
2190   {
2191     assert (pollfds_num == ((int) listen_fds_num));
2192     for (i = 0; i < pollfds_num; i++)
2193     {
2194       pollfds[i].fd = listen_fds[i].fd;
2195       pollfds[i].events = POLLIN | POLLPRI;
2196       pollfds[i].revents = 0;
2197     }
2198
2199     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2200     if (do_shutdown)
2201       break;
2202     else if (status == 0) /* timeout */
2203       continue;
2204     else if (status < 0) /* error */
2205     {
2206       status = errno;
2207       if (status != EINTR)
2208       {
2209         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2210       }
2211       continue;
2212     }
2213
2214     for (i = 0; i < pollfds_num; i++)
2215     {
2216       listen_socket_t *client_sock;
2217       struct sockaddr_storage client_sa;
2218       socklen_t client_sa_size;
2219       pthread_t tid;
2220       pthread_attr_t attr;
2221
2222       if (pollfds[i].revents == 0)
2223         continue;
2224
2225       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2226       {
2227         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2228             "poll(2) returned something unexpected for listen FD #%i.",
2229             pollfds[i].fd);
2230         continue;
2231       }
2232
2233       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2234       if (client_sock == NULL)
2235       {
2236         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2237         continue;
2238       }
2239       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2240
2241       client_sa_size = sizeof (client_sa);
2242       client_sock->fd = accept (pollfds[i].fd,
2243           (struct sockaddr *) &client_sa, &client_sa_size);
2244       if (client_sock->fd < 0)
2245       {
2246         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2247         free(client_sock);
2248         continue;
2249       }
2250
2251       pthread_attr_init (&attr);
2252       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2253
2254       status = pthread_create (&tid, &attr, connection_thread_main,
2255                                client_sock);
2256       if (status != 0)
2257       {
2258         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2259         close_connection(client_sock);
2260         continue;
2261       }
2262     } /* for (pollfds_num) */
2263   } /* while (do_shutdown == 0) */
2264
2265   RRDD_LOG(LOG_INFO, "starting shutdown");
2266
2267   close_listen_sockets ();
2268
2269   pthread_mutex_lock (&connection_threads_lock);
2270   while (connection_threads_num > 0)
2271   {
2272     pthread_t wait_for;
2273
2274     wait_for = connection_threads[0];
2275
2276     pthread_mutex_unlock (&connection_threads_lock);
2277     pthread_join (wait_for, /* retval = */ NULL);
2278     pthread_mutex_lock (&connection_threads_lock);
2279   }
2280   pthread_mutex_unlock (&connection_threads_lock);
2281
2282   return (NULL);
2283 } /* }}} void *listen_thread_main */
2284
2285 static int daemonize (void) /* {{{ */
2286 {
2287   int status;
2288   int fd;
2289   char *base_dir;
2290
2291   daemon_uid = geteuid();
2292
2293   fd = open_pidfile();
2294   if (fd < 0) return fd;
2295
2296   if (!stay_foreground)
2297   {
2298     pid_t child;
2299
2300     child = fork ();
2301     if (child < 0)
2302     {
2303       fprintf (stderr, "daemonize: fork(2) failed.\n");
2304       return (-1);
2305     }
2306     else if (child > 0)
2307     {
2308       return (1);
2309     }
2310
2311     /* Become session leader */
2312     setsid ();
2313
2314     /* Open the first three file descriptors to /dev/null */
2315     close (2);
2316     close (1);
2317     close (0);
2318
2319     open ("/dev/null", O_RDWR);
2320     dup (0);
2321     dup (0);
2322   } /* if (!stay_foreground) */
2323
2324   /* Change into the /tmp directory. */
2325   base_dir = (config_base_dir != NULL)
2326     ? config_base_dir
2327     : "/tmp";
2328   status = chdir (base_dir);
2329   if (status != 0)
2330   {
2331     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2332     return (-1);
2333   }
2334
2335   install_signal_handlers();
2336
2337   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2338   RRDD_LOG(LOG_INFO, "starting up");
2339
2340   cache_tree = g_tree_new ((GCompareFunc) strcmp);
2341   if (cache_tree == NULL)
2342   {
2343     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2344     return (-1);
2345   }
2346
2347   status = write_pidfile (fd);
2348   return status;
2349 } /* }}} int daemonize */
2350
2351 static int cleanup (void) /* {{{ */
2352 {
2353   do_shutdown++;
2354
2355   pthread_cond_signal (&cache_cond);
2356   pthread_join (queue_thread, /* return = */ NULL);
2357
2358   remove_pidfile ();
2359
2360   RRDD_LOG(LOG_INFO, "goodbye");
2361   closelog ();
2362
2363   return (0);
2364 } /* }}} int cleanup */
2365
2366 static int read_options (int argc, char **argv) /* {{{ */
2367 {
2368   int option;
2369   int status = 0;
2370
2371   while ((option = getopt(argc, argv, "gl:L:f:w:b:Bz:p:j:h?F")) != -1)
2372   {
2373     switch (option)
2374     {
2375       case 'g':
2376         stay_foreground=1;
2377         break;
2378
2379       case 'L':
2380       case 'l':
2381       {
2382         listen_socket_t **temp;
2383         listen_socket_t *new;
2384
2385         new = malloc(sizeof(listen_socket_t));
2386         if (new == NULL)
2387         {
2388           fprintf(stderr, "read_options: malloc failed.\n");
2389           return(2);
2390         }
2391         memset(new, 0, sizeof(listen_socket_t));
2392
2393         temp = (listen_socket_t **) realloc (config_listen_address_list,
2394             sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2395         if (temp == NULL)
2396         {
2397           fprintf (stderr, "read_options: realloc failed.\n");
2398           return (2);
2399         }
2400         config_listen_address_list = temp;
2401
2402         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2403         new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2404
2405         temp[config_listen_address_list_len] = new;
2406         config_listen_address_list_len++;
2407       }
2408       break;
2409
2410       case 'f':
2411       {
2412         int temp;
2413
2414         temp = atoi (optarg);
2415         if (temp > 0)
2416           config_flush_interval = temp;
2417         else
2418         {
2419           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2420           status = 3;
2421         }
2422       }
2423       break;
2424
2425       case 'w':
2426       {
2427         int temp;
2428
2429         temp = atoi (optarg);
2430         if (temp > 0)
2431           config_write_interval = temp;
2432         else
2433         {
2434           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2435           status = 2;
2436         }
2437       }
2438       break;
2439
2440       case 'z':
2441       {
2442         int temp;
2443
2444         temp = atoi(optarg);
2445         if (temp > 0)
2446           config_write_jitter = temp;
2447         else
2448         {
2449           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2450           status = 2;
2451         }
2452
2453         break;
2454       }
2455
2456       case 'B':
2457         config_write_base_only = 1;
2458         break;
2459
2460       case 'b':
2461       {
2462         size_t len;
2463
2464         if (config_base_dir != NULL)
2465           free (config_base_dir);
2466         config_base_dir = strdup (optarg);
2467         if (config_base_dir == NULL)
2468         {
2469           fprintf (stderr, "read_options: strdup failed.\n");
2470           return (3);
2471         }
2472
2473         len = strlen (config_base_dir);
2474         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2475         {
2476           config_base_dir[len - 1] = 0;
2477           len--;
2478         }
2479
2480         if (len < 1)
2481         {
2482           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2483           return (4);
2484         }
2485
2486         _config_base_dir_len = len;
2487       }
2488       break;
2489
2490       case 'p':
2491       {
2492         if (config_pid_file != NULL)
2493           free (config_pid_file);
2494         config_pid_file = strdup (optarg);
2495         if (config_pid_file == NULL)
2496         {
2497           fprintf (stderr, "read_options: strdup failed.\n");
2498           return (3);
2499         }
2500       }
2501       break;
2502
2503       case 'F':
2504         config_flush_at_shutdown = 1;
2505         break;
2506
2507       case 'j':
2508       {
2509         struct stat statbuf;
2510         const char *dir = optarg;
2511
2512         status = stat(dir, &statbuf);
2513         if (status != 0)
2514         {
2515           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2516           return 6;
2517         }
2518
2519         if (!S_ISDIR(statbuf.st_mode)
2520             || access(dir, R_OK|W_OK|X_OK) != 0)
2521         {
2522           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2523                   errno ? rrd_strerror(errno) : "");
2524           return 6;
2525         }
2526
2527         journal_cur = malloc(PATH_MAX + 1);
2528         journal_old = malloc(PATH_MAX + 1);
2529         if (journal_cur == NULL || journal_old == NULL)
2530         {
2531           fprintf(stderr, "malloc failure for journal files\n");
2532           return 6;
2533         }
2534         else 
2535         {
2536           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2537           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2538         }
2539       }
2540       break;
2541
2542       case 'h':
2543       case '?':
2544         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2545             "\n"
2546             "Usage: rrdcached [options]\n"
2547             "\n"
2548             "Valid options are:\n"
2549             "  -l <address>  Socket address to listen to.\n"
2550             "  -L <address>  Socket address to listen to ('FLUSH' only).\n"
2551             "  -w <seconds>  Interval in which to write data.\n"
2552             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2553             "  -f <seconds>  Interval in which to flush dead data.\n"
2554             "  -p <file>     Location of the PID-file.\n"
2555             "  -b <dir>      Base directory to change to.\n"
2556             "  -B            Restrict file access to paths within -b <dir>\n"
2557             "  -g            Do not fork and run in the foreground.\n"
2558             "  -j <dir>      Directory in which to create the journal files.\n"
2559             "  -F            Always flush all updates at shutdown\n"
2560             "\n"
2561             "For more information and a detailed description of all options "
2562             "please refer\n"
2563             "to the rrdcached(1) manual page.\n",
2564             VERSION);
2565         status = -1;
2566         break;
2567     } /* switch (option) */
2568   } /* while (getopt) */
2569
2570   /* advise the user when values are not sane */
2571   if (config_flush_interval < 2 * config_write_interval)
2572     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2573             " 2x write interval (-w) !\n");
2574   if (config_write_jitter > config_write_interval)
2575     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2576             " write interval (-w) !\n");
2577
2578   if (config_write_base_only && config_base_dir == NULL)
2579     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2580             "  Consult the rrdcached documentation\n");
2581
2582   if (journal_cur == NULL)
2583     config_flush_at_shutdown = 1;
2584
2585   return (status);
2586 } /* }}} int read_options */
2587
2588 int main (int argc, char **argv)
2589 {
2590   int status;
2591
2592   status = read_options (argc, argv);
2593   if (status != 0)
2594   {
2595     if (status < 0)
2596       status = 0;
2597     return (status);
2598   }
2599
2600   status = daemonize ();
2601   if (status == 1)
2602   {
2603     struct sigaction sigchld;
2604
2605     memset (&sigchld, 0, sizeof (sigchld));
2606     sigchld.sa_handler = SIG_IGN;
2607     sigaction (SIGCHLD, &sigchld, NULL);
2608
2609     return (0);
2610   }
2611   else if (status != 0)
2612   {
2613     fprintf (stderr, "daemonize failed, exiting.\n");
2614     return (1);
2615   }
2616
2617   journal_init();
2618
2619   /* start the queue thread */
2620   memset (&queue_thread, 0, sizeof (queue_thread));
2621   status = pthread_create (&queue_thread,
2622                            NULL, /* attr */
2623                            queue_thread_main,
2624                            NULL); /* args */
2625   if (status != 0)
2626   {
2627     RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2628     cleanup();
2629     return (1);
2630   }
2631
2632   listen_thread_main (NULL);
2633   cleanup ();
2634
2635   return (0);
2636 } /* int main */
2637
2638 /*
2639  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2640  */