clearer way of advancing the flush time
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
67
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
76
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
91
92 #include <glib-2.0/glib.h>
93 /* }}} */
94
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
96
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
100
101 /*
102  * Types
103  */
104 typedef enum
105 {
106   PRIV_LOW,
107   PRIV_HIGH
108 } socket_privilege;
109
110 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
111
112 struct listen_socket_s
113 {
114   int fd;
115   char addr[PATH_MAX + 1];
116   int family;
117   socket_privilege privilege;
118
119   /* state for BATCH processing */
120   time_t batch_start;
121   int batch_cmd;
122
123   /* buffered IO */
124   char *rbuf;
125   off_t next_cmd;
126   off_t next_read;
127
128   char *wbuf;
129   ssize_t wbuf_len;
130 };
131 typedef struct listen_socket_s listen_socket_t;
132
133 struct cache_item_s;
134 typedef struct cache_item_s cache_item_t;
135 struct cache_item_s
136 {
137   char *file;
138   char **values;
139   int values_num;
140   time_t last_flush_time;
141   time_t last_update_stamp;
142 #define CI_FLAGS_IN_TREE  (1<<0)
143 #define CI_FLAGS_IN_QUEUE (1<<1)
144   int flags;
145   pthread_cond_t  flushed;
146   cache_item_t *prev;
147   cache_item_t *next;
148 };
149
150 struct callback_flush_data_s
151 {
152   time_t now;
153   time_t abs_timeout;
154   char **keys;
155   size_t keys_num;
156 };
157 typedef struct callback_flush_data_s callback_flush_data_t;
158
159 enum queue_side_e
160 {
161   HEAD,
162   TAIL
163 };
164 typedef enum queue_side_e queue_side_t;
165
166 /* max length of socket command or response */
167 #define CMD_MAX 4096
168 #define RBUF_SIZE (CMD_MAX*2)
169
170 /*
171  * Variables
172  */
173 static int stay_foreground = 0;
174 static uid_t daemon_uid;
175
176 static listen_socket_t *listen_fds = NULL;
177 static size_t listen_fds_num = 0;
178
179 static int do_shutdown = 0;
180
181 static pthread_t queue_thread;
182
183 static pthread_t *connection_threads = NULL;
184 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
185 static int connection_threads_num = 0;
186
187 /* Cache stuff */
188 static GTree          *cache_tree = NULL;
189 static cache_item_t   *cache_queue_head = NULL;
190 static cache_item_t   *cache_queue_tail = NULL;
191 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
192 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
193
194 static int config_write_interval = 300;
195 static int config_write_jitter   = 0;
196 static int config_flush_interval = 3600;
197 static int config_flush_at_shutdown = 0;
198 static char *config_pid_file = NULL;
199 static char *config_base_dir = NULL;
200 static size_t _config_base_dir_len = 0;
201 static int config_write_base_only = 0;
202
203 static listen_socket_t **config_listen_address_list = NULL;
204 static int config_listen_address_list_len = 0;
205
206 static uint64_t stats_queue_length = 0;
207 static uint64_t stats_updates_received = 0;
208 static uint64_t stats_flush_received = 0;
209 static uint64_t stats_updates_written = 0;
210 static uint64_t stats_data_sets_written = 0;
211 static uint64_t stats_journal_bytes = 0;
212 static uint64_t stats_journal_rotate = 0;
213 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
214
215 /* Journaled updates */
216 static char *journal_cur = NULL;
217 static char *journal_old = NULL;
218 static FILE *journal_fh = NULL;
219 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
220 static int journal_write(char *cmd, char *args);
221 static void journal_done(void);
222 static void journal_rotate(void);
223
224 /* 
225  * Functions
226  */
227 static void sig_common (const char *sig) /* {{{ */
228 {
229   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
230   do_shutdown++;
231   pthread_cond_broadcast(&cache_cond);
232 } /* }}} void sig_common */
233
234 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
235 {
236   sig_common("INT");
237 } /* }}} void sig_int_handler */
238
239 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
240 {
241   sig_common("TERM");
242 } /* }}} void sig_term_handler */
243
244 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
245 {
246   config_flush_at_shutdown = 1;
247   sig_common("USR1");
248 } /* }}} void sig_usr1_handler */
249
250 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
251 {
252   config_flush_at_shutdown = 0;
253   sig_common("USR2");
254 } /* }}} void sig_usr2_handler */
255
256 static void install_signal_handlers(void) /* {{{ */
257 {
258   /* These structures are static, because `sigaction' behaves weird if the are
259    * overwritten.. */
260   static struct sigaction sa_int;
261   static struct sigaction sa_term;
262   static struct sigaction sa_pipe;
263   static struct sigaction sa_usr1;
264   static struct sigaction sa_usr2;
265
266   /* Install signal handlers */
267   memset (&sa_int, 0, sizeof (sa_int));
268   sa_int.sa_handler = sig_int_handler;
269   sigaction (SIGINT, &sa_int, NULL);
270
271   memset (&sa_term, 0, sizeof (sa_term));
272   sa_term.sa_handler = sig_term_handler;
273   sigaction (SIGTERM, &sa_term, NULL);
274
275   memset (&sa_pipe, 0, sizeof (sa_pipe));
276   sa_pipe.sa_handler = SIG_IGN;
277   sigaction (SIGPIPE, &sa_pipe, NULL);
278
279   memset (&sa_pipe, 0, sizeof (sa_usr1));
280   sa_usr1.sa_handler = sig_usr1_handler;
281   sigaction (SIGUSR1, &sa_usr1, NULL);
282
283   memset (&sa_usr2, 0, sizeof (sa_usr2));
284   sa_usr2.sa_handler = sig_usr2_handler;
285   sigaction (SIGUSR2, &sa_usr2, NULL);
286
287 } /* }}} void install_signal_handlers */
288
289 static int open_pidfile(void) /* {{{ */
290 {
291   int fd;
292   char *file;
293
294   file = (config_pid_file != NULL)
295     ? config_pid_file
296     : LOCALSTATEDIR "/run/rrdcached.pid";
297
298   fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
299   if (fd < 0)
300     fprintf(stderr, "FATAL: cannot create '%s' (%s)\n",
301             file, rrd_strerror(errno));
302
303   return(fd);
304 } /* }}} static int open_pidfile */
305
306 static int write_pidfile (int fd) /* {{{ */
307 {
308   pid_t pid;
309   FILE *fh;
310
311   pid = getpid ();
312
313   fh = fdopen (fd, "w");
314   if (fh == NULL)
315   {
316     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
317     close(fd);
318     return (-1);
319   }
320
321   fprintf (fh, "%i\n", (int) pid);
322   fclose (fh);
323
324   return (0);
325 } /* }}} int write_pidfile */
326
327 static int remove_pidfile (void) /* {{{ */
328 {
329   char *file;
330   int status;
331
332   file = (config_pid_file != NULL)
333     ? config_pid_file
334     : LOCALSTATEDIR "/run/rrdcached.pid";
335
336   status = unlink (file);
337   if (status == 0)
338     return (0);
339   return (errno);
340 } /* }}} int remove_pidfile */
341
342 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
343 {
344   char *eol;
345
346   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
347                sock->next_read - sock->next_cmd);
348
349   if (eol == NULL)
350   {
351     /* no commands left, move remainder back to front of rbuf */
352     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
353             sock->next_read - sock->next_cmd);
354     sock->next_read -= sock->next_cmd;
355     sock->next_cmd = 0;
356     *len = 0;
357     return NULL;
358   }
359   else
360   {
361     char *cmd = sock->rbuf + sock->next_cmd;
362     *eol = '\0';
363
364     sock->next_cmd = eol - sock->rbuf + 1;
365
366     if (eol > sock->rbuf && *(eol-1) == '\r')
367       *(--eol) = '\0'; /* handle "\r\n" EOL */
368
369     *len = eol - cmd;
370
371     return cmd;
372   }
373
374   /* NOTREACHED */
375   assert(1==0);
376 }
377
378 /* add the characters directly to the write buffer */
379 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
380 {
381   char *new_buf;
382
383   assert(sock != NULL);
384
385   new_buf = realloc(sock->wbuf, sock->wbuf_len + len + 1);
386   if (new_buf == NULL)
387   {
388     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
389     return -1;
390   }
391
392   strncpy(new_buf + sock->wbuf_len, str, len + 1);
393
394   sock->wbuf = new_buf;
395   sock->wbuf_len += len;
396
397   return 0;
398 } /* }}} static int add_to_wbuf */
399
400 /* add the text to the "extra" info that's sent after the status line */
401 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
402 {
403   va_list argp;
404   char buffer[CMD_MAX];
405   int len;
406
407   if (sock == NULL) return 0; /* journal replay mode */
408   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
409
410   va_start(argp, fmt);
411 #ifdef HAVE_VSNPRINTF
412   len = vsnprintf(buffer, sizeof(buffer)-1, fmt, argp);
413 #else
414   len = vsprintf(buffer, fmt, argp);
415 #endif
416   va_end(argp);
417   if (len < 0)
418   {
419     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
420     return -1;
421   }
422
423   return add_to_wbuf(sock, buffer, len);
424 } /* }}} static int add_response_info */
425
426 static int count_lines(char *str) /* {{{ */
427 {
428   int lines = 0;
429
430   if (str != NULL)
431   {
432     while ((str = strchr(str, '\n')) != NULL)
433     {
434       ++lines;
435       ++str;
436     }
437   }
438
439   return lines;
440 } /* }}} static int count_lines */
441
442 /* send the response back to the user.
443  * returns 0 on success, -1 on error
444  * write buffer is always zeroed after this call */
445 static int send_response (listen_socket_t *sock, response_code rc,
446                           char *fmt, ...) /* {{{ */
447 {
448   va_list argp;
449   char buffer[CMD_MAX];
450   int lines;
451   ssize_t wrote;
452   int rclen, len;
453
454   if (sock == NULL) return rc;  /* journal replay mode */
455
456   if (sock->batch_start)
457   {
458     if (rc == RESP_OK)
459       return rc; /* no response on success during BATCH */
460     lines = sock->batch_cmd;
461   }
462   else if (rc == RESP_OK)
463     lines = count_lines(sock->wbuf);
464   else
465     lines = -1;
466
467   rclen = sprintf(buffer, "%d ", lines);
468   va_start(argp, fmt);
469 #ifdef HAVE_VSNPRINTF
470   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen-1, fmt, argp);
471 #else
472   len = vsprintf(buffer+rclen, fmt, argp);
473 #endif
474   va_end(argp);
475   if (len < 0)
476     return -1;
477
478   len += rclen;
479
480   /* append the result to the wbuf, don't write to the user */
481   if (sock->batch_start)
482     return add_to_wbuf(sock, buffer, len);
483
484   /* first write must be complete */
485   if (len != write(sock->fd, buffer, len))
486   {
487     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
488     return -1;
489   }
490
491   if (sock->wbuf != NULL && rc == RESP_OK)
492   {
493     wrote = 0;
494     while (wrote < sock->wbuf_len)
495     {
496       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
497       if (wb <= 0)
498       {
499         RRDD_LOG(LOG_INFO, "send_response: could not write results");
500         return -1;
501       }
502       wrote += wb;
503     }
504   }
505
506   free(sock->wbuf); sock->wbuf = NULL;
507   sock->wbuf_len = 0;
508
509   return 0;
510 } /* }}} */
511
512 static void wipe_ci_values(cache_item_t *ci, time_t when)
513 {
514   ci->values = NULL;
515   ci->values_num = 0;
516
517   ci->last_flush_time = when;
518   if (config_write_jitter > 0)
519     ci->last_flush_time += (random() % config_write_jitter);
520 }
521
522 /* remove_from_queue
523  * remove a "cache_item_t" item from the queue.
524  * must hold 'cache_lock' when calling this
525  */
526 static void remove_from_queue(cache_item_t *ci) /* {{{ */
527 {
528   if (ci == NULL) return;
529
530   if (ci->prev == NULL)
531     cache_queue_head = ci->next; /* reset head */
532   else
533     ci->prev->next = ci->next;
534
535   if (ci->next == NULL)
536     cache_queue_tail = ci->prev; /* reset the tail */
537   else
538     ci->next->prev = ci->prev;
539
540   ci->next = ci->prev = NULL;
541   ci->flags &= ~CI_FLAGS_IN_QUEUE;
542 } /* }}} static void remove_from_queue */
543
544 /* remove an entry from the tree and free all its resources.
545  * must hold 'cache lock' while calling this.
546  * returns 0 on success, otherwise errno */
547 static int forget_file(const char *file)
548 {
549   cache_item_t *ci;
550
551   ci = g_tree_lookup(cache_tree, file);
552   if (ci == NULL)
553     return ENOENT;
554
555   g_tree_remove (cache_tree, file);
556   remove_from_queue(ci);
557
558   for (int i=0; i < ci->values_num; i++)
559     free(ci->values[i]);
560
561   free (ci->values);
562   free (ci->file);
563
564   /* in case anyone is waiting */
565   pthread_cond_broadcast(&ci->flushed);
566
567   free (ci);
568
569   return 0;
570 } /* }}} static int forget_file */
571
572 /*
573  * enqueue_cache_item:
574  * `cache_lock' must be acquired before calling this function!
575  */
576 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
577     queue_side_t side)
578 {
579   if (ci == NULL)
580     return (-1);
581
582   if (ci->values_num == 0)
583     return (0);
584
585   if (side == HEAD)
586   {
587     if (cache_queue_head == ci)
588       return 0;
589
590     /* remove from the double linked list */
591     if (ci->flags & CI_FLAGS_IN_QUEUE)
592       remove_from_queue(ci);
593
594     ci->prev = NULL;
595     ci->next = cache_queue_head;
596     if (ci->next != NULL)
597       ci->next->prev = ci;
598     cache_queue_head = ci;
599
600     if (cache_queue_tail == NULL)
601       cache_queue_tail = cache_queue_head;
602   }
603   else /* (side == TAIL) */
604   {
605     /* We don't move values back in the list.. */
606     if (ci->flags & CI_FLAGS_IN_QUEUE)
607       return (0);
608
609     assert (ci->next == NULL);
610     assert (ci->prev == NULL);
611
612     ci->prev = cache_queue_tail;
613
614     if (cache_queue_tail == NULL)
615       cache_queue_head = ci;
616     else
617       cache_queue_tail->next = ci;
618
619     cache_queue_tail = ci;
620   }
621
622   ci->flags |= CI_FLAGS_IN_QUEUE;
623
624   pthread_cond_broadcast(&cache_cond);
625   pthread_mutex_lock (&stats_lock);
626   stats_queue_length++;
627   pthread_mutex_unlock (&stats_lock);
628
629   return (0);
630 } /* }}} int enqueue_cache_item */
631
632 /*
633  * tree_callback_flush:
634  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
635  * while this is in progress.
636  */
637 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
638     gpointer data)
639 {
640   cache_item_t *ci;
641   callback_flush_data_t *cfd;
642
643   ci = (cache_item_t *) value;
644   cfd = (callback_flush_data_t *) data;
645
646   if ((ci->last_flush_time <= cfd->abs_timeout)
647       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
648       && (ci->values_num > 0))
649   {
650     enqueue_cache_item (ci, TAIL);
651   }
652   else if ((do_shutdown != 0)
653       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
654       && (ci->values_num > 0))
655   {
656     enqueue_cache_item (ci, TAIL);
657   }
658   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
659       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
660       && (ci->values_num <= 0))
661   {
662     char **temp;
663
664     temp = (char **) realloc (cfd->keys,
665         sizeof (char *) * (cfd->keys_num + 1));
666     if (temp == NULL)
667     {
668       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
669       return (FALSE);
670     }
671     cfd->keys = temp;
672     /* Make really sure this points to the _same_ place */
673     assert ((char *) key == ci->file);
674     cfd->keys[cfd->keys_num] = (char *) key;
675     cfd->keys_num++;
676   }
677
678   return (FALSE);
679 } /* }}} gboolean tree_callback_flush */
680
681 static int flush_old_values (int max_age)
682 {
683   callback_flush_data_t cfd;
684   size_t k;
685
686   memset (&cfd, 0, sizeof (cfd));
687   /* Pass the current time as user data so that we don't need to call
688    * `time' for each node. */
689   cfd.now = time (NULL);
690   cfd.keys = NULL;
691   cfd.keys_num = 0;
692
693   if (max_age > 0)
694     cfd.abs_timeout = cfd.now - max_age;
695   else
696     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
697
698   /* `tree_callback_flush' will return the keys of all values that haven't
699    * been touched in the last `config_flush_interval' seconds in `cfd'.
700    * The char*'s in this array point to the same memory as ci->file, so we
701    * don't need to free them separately. */
702   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
703
704   for (k = 0; k < cfd.keys_num; k++)
705   {
706     /* should never fail, since we have held the cache_lock
707      * the entire time */
708     assert( forget_file(cfd.keys[k]) == 0 );
709   }
710
711   if (cfd.keys != NULL)
712   {
713     free (cfd.keys);
714     cfd.keys = NULL;
715   }
716
717   return (0);
718 } /* int flush_old_values */
719
720 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
721 {
722   struct timeval now;
723   struct timespec next_flush;
724   int final_flush = 0; /* make sure we only flush once on shutdown */
725
726   gettimeofday (&now, NULL);
727   next_flush.tv_sec = now.tv_sec + config_flush_interval;
728   next_flush.tv_nsec = 1000 * now.tv_usec;
729
730   pthread_mutex_lock (&cache_lock);
731   while ((do_shutdown == 0) || (cache_queue_head != NULL))
732   {
733     cache_item_t *ci;
734     char *file;
735     char **values;
736     int values_num;
737     int status;
738     int i;
739
740     /* First, check if it's time to do the cache flush. */
741     gettimeofday (&now, NULL);
742     if ((now.tv_sec > next_flush.tv_sec)
743         || ((now.tv_sec == next_flush.tv_sec)
744           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
745     {
746       /* Flush all values that haven't been written in the last
747        * `config_write_interval' seconds. */
748       flush_old_values (config_write_interval);
749
750       /* Determine the time of the next cache flush. */
751       next_flush.tv_sec =
752         now.tv_sec + next_flush.tv_sec % config_flush_interval;
753
754       /* unlock the cache while we rotate so we don't block incoming
755        * updates if the fsync() blocks on disk I/O */
756       pthread_mutex_unlock(&cache_lock);
757       journal_rotate();
758       pthread_mutex_lock(&cache_lock);
759     }
760
761     /* Now, check if there's something to store away. If not, wait until
762      * something comes in or it's time to do the cache flush.  if we are
763      * shutting down, do not wait around.  */
764     if (cache_queue_head == NULL && !do_shutdown)
765     {
766       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
767       if ((status != 0) && (status != ETIMEDOUT))
768       {
769         RRDD_LOG (LOG_ERR, "queue_thread_main: "
770             "pthread_cond_timedwait returned %i.", status);
771       }
772     }
773
774     /* We're about to shut down */
775     if (do_shutdown != 0 && !final_flush++)
776     {
777       if (config_flush_at_shutdown)
778         flush_old_values (-1); /* flush everything */
779       else
780         break;
781     }
782
783     /* Check if a value has arrived. This may be NULL if we timed out or there
784      * was an interrupt such as a signal. */
785     if (cache_queue_head == NULL)
786       continue;
787
788     ci = cache_queue_head;
789
790     /* copy the relevant parts */
791     file = strdup (ci->file);
792     if (file == NULL)
793     {
794       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
795       continue;
796     }
797
798     assert(ci->values != NULL);
799     assert(ci->values_num > 0);
800
801     values = ci->values;
802     values_num = ci->values_num;
803
804     wipe_ci_values(ci, time(NULL));
805     remove_from_queue(ci);
806
807     pthread_mutex_lock (&stats_lock);
808     assert (stats_queue_length > 0);
809     stats_queue_length--;
810     pthread_mutex_unlock (&stats_lock);
811
812     pthread_mutex_unlock (&cache_lock);
813
814     rrd_clear_error ();
815     status = rrd_update_r (file, NULL, values_num, (void *) values);
816     if (status != 0)
817     {
818       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
819           "rrd_update_r (%s) failed with status %i. (%s)",
820           file, status, rrd_get_error());
821     }
822
823     journal_write("wrote", file);
824     pthread_cond_broadcast(&ci->flushed);
825
826     for (i = 0; i < values_num; i++)
827       free (values[i]);
828
829     free(values);
830     free(file);
831
832     if (status == 0)
833     {
834       pthread_mutex_lock (&stats_lock);
835       stats_updates_written++;
836       stats_data_sets_written += values_num;
837       pthread_mutex_unlock (&stats_lock);
838     }
839
840     pthread_mutex_lock (&cache_lock);
841
842     /* We're about to shut down */
843     if (do_shutdown != 0 && !final_flush++)
844     {
845       if (config_flush_at_shutdown)
846           flush_old_values (-1); /* flush everything */
847       else
848         break;
849     }
850   } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
851   pthread_mutex_unlock (&cache_lock);
852
853   if (config_flush_at_shutdown)
854   {
855     assert(cache_queue_head == NULL);
856     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
857   }
858
859   journal_done();
860
861   return (NULL);
862 } /* }}} void *queue_thread_main */
863
864 static int buffer_get_field (char **buffer_ret, /* {{{ */
865     size_t *buffer_size_ret, char **field_ret)
866 {
867   char *buffer;
868   size_t buffer_pos;
869   size_t buffer_size;
870   char *field;
871   size_t field_size;
872   int status;
873
874   buffer = *buffer_ret;
875   buffer_pos = 0;
876   buffer_size = *buffer_size_ret;
877   field = *buffer_ret;
878   field_size = 0;
879
880   if (buffer_size <= 0)
881     return (-1);
882
883   /* This is ensured by `handle_request'. */
884   assert (buffer[buffer_size - 1] == '\0');
885
886   status = -1;
887   while (buffer_pos < buffer_size)
888   {
889     /* Check for end-of-field or end-of-buffer */
890     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
891     {
892       field[field_size] = 0;
893       field_size++;
894       buffer_pos++;
895       status = 0;
896       break;
897     }
898     /* Handle escaped characters. */
899     else if (buffer[buffer_pos] == '\\')
900     {
901       if (buffer_pos >= (buffer_size - 1))
902         break;
903       buffer_pos++;
904       field[field_size] = buffer[buffer_pos];
905       field_size++;
906       buffer_pos++;
907     }
908     /* Normal operation */ 
909     else
910     {
911       field[field_size] = buffer[buffer_pos];
912       field_size++;
913       buffer_pos++;
914     }
915   } /* while (buffer_pos < buffer_size) */
916
917   if (status != 0)
918     return (status);
919
920   *buffer_ret = buffer + buffer_pos;
921   *buffer_size_ret = buffer_size - buffer_pos;
922   *field_ret = field;
923
924   return (0);
925 } /* }}} int buffer_get_field */
926
927 /* if we're restricting writes to the base directory,
928  * check whether the file falls within the dir
929  * returns 1 if OK, otherwise 0
930  */
931 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
932 {
933   assert(file != NULL);
934
935   if (!config_write_base_only
936       || sock == NULL /* journal replay */
937       || config_base_dir == NULL)
938     return 1;
939
940   if (strstr(file, "../") != NULL) goto err;
941
942   /* relative paths without "../" are ok */
943   if (*file != '/') return 1;
944
945   /* file must be of the format base + "/" + <1+ char filename> */
946   if (strlen(file) < _config_base_dir_len + 2) goto err;
947   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
948   if (*(file + _config_base_dir_len) != '/') goto err;
949
950   return 1;
951
952 err:
953   if (sock != NULL && sock->fd >= 0)
954     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
955
956   return 0;
957 } /* }}} static int check_file_access */
958
959 /* returns 1 if we have the required privilege level,
960  * otherwise issue an error to the user on sock */
961 static int has_privilege (listen_socket_t *sock, /* {{{ */
962                           socket_privilege priv)
963 {
964   if (sock == NULL) /* journal replay */
965     return 1;
966
967   if (sock->privilege >= priv)
968     return 1;
969
970   return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
971 } /* }}} static int has_privilege */
972
973 static int flush_file (const char *filename) /* {{{ */
974 {
975   cache_item_t *ci;
976
977   pthread_mutex_lock (&cache_lock);
978
979   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
980   if (ci == NULL)
981   {
982     pthread_mutex_unlock (&cache_lock);
983     return (ENOENT);
984   }
985
986   if (ci->values_num > 0)
987   {
988     /* Enqueue at head */
989     enqueue_cache_item (ci, HEAD);
990     pthread_cond_wait(&ci->flushed, &cache_lock);
991   }
992
993   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
994    * may have been purged during our cond_wait() */
995
996   pthread_mutex_unlock(&cache_lock);
997
998   return (0);
999 } /* }}} int flush_file */
1000
1001 static int handle_request_help (listen_socket_t *sock, /* {{{ */
1002     char *buffer, size_t buffer_size)
1003 {
1004   int status;
1005   char **help_text;
1006   char *command;
1007
1008   char *help_help[2] =
1009   {
1010     "Command overview\n"
1011     ,
1012     "HELP [<command>]\n"
1013     "FLUSH <filename>\n"
1014     "FLUSHALL\n"
1015     "PENDING <filename>\n"
1016     "FORGET <filename>\n"
1017     "UPDATE <filename> <values> [<values> ...]\n"
1018     "BATCH\n"
1019     "STATS\n"
1020   };
1021
1022   char *help_flush[2] =
1023   {
1024     "Help for FLUSH\n"
1025     ,
1026     "Usage: FLUSH <filename>\n"
1027     "\n"
1028     "Adds the given filename to the head of the update queue and returns\n"
1029     "after is has been dequeued.\n"
1030   };
1031
1032   char *help_flushall[2] =
1033   {
1034     "Help for FLUSHALL\n"
1035     ,
1036     "Usage: FLUSHALL\n"
1037     "\n"
1038     "Triggers writing of all pending updates.  Returns immediately.\n"
1039   };
1040
1041   char *help_pending[2] =
1042   {
1043     "Help for PENDING\n"
1044     ,
1045     "Usage: PENDING <filename>\n"
1046     "\n"
1047     "Shows any 'pending' updates for a file, in order.\n"
1048     "The updates shown have not yet been written to the underlying RRD file.\n"
1049   };
1050
1051   char *help_forget[2] =
1052   {
1053     "Help for FORGET\n"
1054     ,
1055     "Usage: FORGET <filename>\n"
1056     "\n"
1057     "Removes the file completely from the cache.\n"
1058     "Any pending updates for the file will be lost.\n"
1059   };
1060
1061   char *help_update[2] =
1062   {
1063     "Help for UPDATE\n"
1064     ,
1065     "Usage: UPDATE <filename> <values> [<values> ...]\n"
1066     "\n"
1067     "Adds the given file to the internal cache if it is not yet known and\n"
1068     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1069     "for details.\n"
1070     "\n"
1071     "Each <values> has the following form:\n"
1072     "  <values> = <time>:<value>[:<value>[...]]\n"
1073     "See the rrdupdate(1) manpage for details.\n"
1074   };
1075
1076   char *help_stats[2] =
1077   {
1078     "Help for STATS\n"
1079     ,
1080     "Usage: STATS\n"
1081     "\n"
1082     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1083     "a description of the values.\n"
1084   };
1085
1086   char *help_batch[2] =
1087   {
1088     "Help for BATCH\n"
1089     ,
1090     "The 'BATCH' command permits the client to initiate a bulk load\n"
1091     "   of commands to rrdcached.\n"
1092     "\n"
1093     "Usage:\n"
1094     "\n"
1095     "    client: BATCH\n"
1096     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1097     "    client: command #1\n"
1098     "    client: command #2\n"
1099     "    client: ... and so on\n"
1100     "    client: .\n"
1101     "    server: 2 errors\n"
1102     "    server: 7 message for command #7\n"
1103     "    server: 9 message for command #9\n"
1104     "\n"
1105     "For more information, consult the rrdcached(1) documentation.\n"
1106   };
1107
1108   status = buffer_get_field (&buffer, &buffer_size, &command);
1109   if (status != 0)
1110     help_text = help_help;
1111   else
1112   {
1113     if (strcasecmp (command, "update") == 0)
1114       help_text = help_update;
1115     else if (strcasecmp (command, "flush") == 0)
1116       help_text = help_flush;
1117     else if (strcasecmp (command, "flushall") == 0)
1118       help_text = help_flushall;
1119     else if (strcasecmp (command, "pending") == 0)
1120       help_text = help_pending;
1121     else if (strcasecmp (command, "forget") == 0)
1122       help_text = help_forget;
1123     else if (strcasecmp (command, "stats") == 0)
1124       help_text = help_stats;
1125     else if (strcasecmp (command, "batch") == 0)
1126       help_text = help_batch;
1127     else
1128       help_text = help_help;
1129   }
1130
1131   add_response_info(sock, help_text[1]);
1132   return send_response(sock, RESP_OK, help_text[0]);
1133 } /* }}} int handle_request_help */
1134
1135 static int handle_request_stats (listen_socket_t *sock) /* {{{ */
1136 {
1137   uint64_t copy_queue_length;
1138   uint64_t copy_updates_received;
1139   uint64_t copy_flush_received;
1140   uint64_t copy_updates_written;
1141   uint64_t copy_data_sets_written;
1142   uint64_t copy_journal_bytes;
1143   uint64_t copy_journal_rotate;
1144
1145   uint64_t tree_nodes_number;
1146   uint64_t tree_depth;
1147
1148   pthread_mutex_lock (&stats_lock);
1149   copy_queue_length       = stats_queue_length;
1150   copy_updates_received   = stats_updates_received;
1151   copy_flush_received     = stats_flush_received;
1152   copy_updates_written    = stats_updates_written;
1153   copy_data_sets_written  = stats_data_sets_written;
1154   copy_journal_bytes      = stats_journal_bytes;
1155   copy_journal_rotate     = stats_journal_rotate;
1156   pthread_mutex_unlock (&stats_lock);
1157
1158   pthread_mutex_lock (&cache_lock);
1159   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1160   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1161   pthread_mutex_unlock (&cache_lock);
1162
1163   add_response_info(sock,
1164                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1165   add_response_info(sock,
1166                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1167   add_response_info(sock,
1168                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1169   add_response_info(sock,
1170                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1171   add_response_info(sock,
1172                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1173   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1174   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1175   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1176   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1177
1178   send_response(sock, RESP_OK, "Statistics follow\n");
1179
1180   return (0);
1181 } /* }}} int handle_request_stats */
1182
1183 static int handle_request_flush (listen_socket_t *sock, /* {{{ */
1184     char *buffer, size_t buffer_size)
1185 {
1186   char *file;
1187   int status;
1188
1189   status = buffer_get_field (&buffer, &buffer_size, &file);
1190   if (status != 0)
1191   {
1192     return send_response(sock, RESP_ERR, "Usage: flush <filename>\n");
1193   }
1194   else
1195   {
1196     pthread_mutex_lock(&stats_lock);
1197     stats_flush_received++;
1198     pthread_mutex_unlock(&stats_lock);
1199
1200     if (!check_file_access(file, sock)) return 0;
1201
1202     status = flush_file (file);
1203     if (status == 0)
1204       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1205     else if (status == ENOENT)
1206     {
1207       /* no file in our tree; see whether it exists at all */
1208       struct stat statbuf;
1209
1210       memset(&statbuf, 0, sizeof(statbuf));
1211       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1212         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1213       else
1214         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1215     }
1216     else if (status < 0)
1217       return send_response(sock, RESP_ERR, "Internal error.\n");
1218     else
1219       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1220   }
1221
1222   /* NOTREACHED */
1223   assert(1==0);
1224 } /* }}} int handle_request_flush */
1225
1226 static int handle_request_flushall(listen_socket_t *sock) /* {{{ */
1227 {
1228   int status;
1229
1230   status = has_privilege(sock, PRIV_HIGH);
1231   if (status <= 0)
1232     return status;
1233
1234   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1235
1236   pthread_mutex_lock(&cache_lock);
1237   flush_old_values(-1);
1238   pthread_mutex_unlock(&cache_lock);
1239
1240   return send_response(sock, RESP_OK, "Started flush.\n");
1241 } /* }}} static int handle_request_flushall */
1242
1243 static int handle_request_pending(listen_socket_t *sock, /* {{{ */
1244                                   char *buffer, size_t buffer_size)
1245 {
1246   int status;
1247   char *file;
1248   cache_item_t *ci;
1249
1250   status = buffer_get_field(&buffer, &buffer_size, &file);
1251   if (status != 0)
1252     return send_response(sock, RESP_ERR,
1253                          "Usage: PENDING <filename>\n");
1254
1255   status = has_privilege(sock, PRIV_HIGH);
1256   if (status <= 0)
1257     return status;
1258
1259   pthread_mutex_lock(&cache_lock);
1260   ci = g_tree_lookup(cache_tree, file);
1261   if (ci == NULL)
1262   {
1263     pthread_mutex_unlock(&cache_lock);
1264     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1265   }
1266
1267   for (int i=0; i < ci->values_num; i++)
1268     add_response_info(sock, "%s\n", ci->values[i]);
1269
1270   pthread_mutex_unlock(&cache_lock);
1271   return send_response(sock, RESP_OK, "updates pending\n");
1272 } /* }}} static int handle_request_pending */
1273
1274 static int handle_request_forget(listen_socket_t *sock, /* {{{ */
1275                                  char *buffer, size_t buffer_size)
1276 {
1277   int status;
1278   char *file;
1279
1280   status = buffer_get_field(&buffer, &buffer_size, &file);
1281   if (status != 0)
1282     return send_response(sock, RESP_ERR,
1283                          "Usage: FORGET <filename>\n");
1284
1285   status = has_privilege(sock, PRIV_HIGH);
1286   if (status <= 0)
1287     return status;
1288
1289   if (!check_file_access(file, sock)) return 0;
1290
1291   pthread_mutex_lock(&cache_lock);
1292   status = forget_file(file);
1293   pthread_mutex_unlock(&cache_lock);
1294
1295   if (status == 0)
1296   {
1297     if (sock != NULL)
1298       journal_write("forget", file);
1299
1300     return send_response(sock, RESP_OK, "Gone!\n");
1301   }
1302   else
1303     return send_response(sock, RESP_ERR, "cannot forget: %s\n",
1304                          status < 0 ? "Internal error" : rrd_strerror(status));
1305
1306   /* NOTREACHED */
1307   assert(1==0);
1308 } /* }}} static int handle_request_forget */
1309
1310 static int handle_request_update (listen_socket_t *sock, /* {{{ */
1311                                   time_t now,
1312                                   char *buffer, size_t buffer_size)
1313 {
1314   char *file;
1315   int values_num = 0;
1316   int bad_timestamps = 0;
1317   int status;
1318   char orig_buf[CMD_MAX];
1319
1320   cache_item_t *ci;
1321
1322   status = has_privilege(sock, PRIV_HIGH);
1323   if (status <= 0)
1324     return status;
1325
1326   /* save it for the journal later */
1327   strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1328
1329   status = buffer_get_field (&buffer, &buffer_size, &file);
1330   if (status != 0)
1331     return send_response(sock, RESP_ERR,
1332                          "Usage: UPDATE <filename> <values> [<values> ...]\n");
1333
1334   pthread_mutex_lock(&stats_lock);
1335   stats_updates_received++;
1336   pthread_mutex_unlock(&stats_lock);
1337
1338   if (!check_file_access(file, sock)) return 0;
1339
1340   pthread_mutex_lock (&cache_lock);
1341   ci = g_tree_lookup (cache_tree, file);
1342
1343   if (ci == NULL) /* {{{ */
1344   {
1345     struct stat statbuf;
1346
1347     /* don't hold the lock while we setup; stat(2) might block */
1348     pthread_mutex_unlock(&cache_lock);
1349
1350     memset (&statbuf, 0, sizeof (statbuf));
1351     status = stat (file, &statbuf);
1352     if (status != 0)
1353     {
1354       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1355
1356       status = errno;
1357       if (status == ENOENT)
1358         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1359       else
1360         return send_response(sock, RESP_ERR,
1361                              "stat failed with error %i.\n", status);
1362     }
1363     if (!S_ISREG (statbuf.st_mode))
1364       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1365
1366     if (access(file, R_OK|W_OK) != 0)
1367       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1368                            file, rrd_strerror(errno));
1369
1370     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1371     if (ci == NULL)
1372     {
1373       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1374
1375       return send_response(sock, RESP_ERR, "malloc failed.\n");
1376     }
1377     memset (ci, 0, sizeof (cache_item_t));
1378
1379     ci->file = strdup (file);
1380     if (ci->file == NULL)
1381     {
1382       free (ci);
1383       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1384
1385       return send_response(sock, RESP_ERR, "strdup failed.\n");
1386     }
1387
1388     wipe_ci_values(ci, now);
1389     ci->flags = CI_FLAGS_IN_TREE;
1390
1391     pthread_mutex_lock(&cache_lock);
1392     g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1393   } /* }}} */
1394   assert (ci != NULL);
1395
1396   /* don't re-write updates in replay mode */
1397   if (sock != NULL)
1398     journal_write("update", orig_buf);
1399
1400   while (buffer_size > 0)
1401   {
1402     char **temp;
1403     char *value;
1404     time_t stamp;
1405     char *eostamp;
1406
1407     status = buffer_get_field (&buffer, &buffer_size, &value);
1408     if (status != 0)
1409     {
1410       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1411       break;
1412     }
1413
1414     /* make sure update time is always moving forward */
1415     stamp = strtol(value, &eostamp, 10);
1416     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1417     {
1418       ++bad_timestamps;
1419       add_response_info(sock, "Cannot find timestamp in '%s'!\n", value);
1420       continue;
1421     }
1422     else if (stamp <= ci->last_update_stamp)
1423     {
1424       ++bad_timestamps;
1425       add_response_info(sock,
1426                         "illegal attempt to update using time %ld when"
1427                         " last update time is %ld (minimum one second step)\n",
1428                         stamp, ci->last_update_stamp);
1429       continue;
1430     }
1431     else
1432       ci->last_update_stamp = stamp;
1433
1434     temp = (char **) realloc (ci->values,
1435         sizeof (char *) * (ci->values_num + 1));
1436     if (temp == NULL)
1437     {
1438       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1439       continue;
1440     }
1441     ci->values = temp;
1442
1443     ci->values[ci->values_num] = strdup (value);
1444     if (ci->values[ci->values_num] == NULL)
1445     {
1446       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1447       continue;
1448     }
1449     ci->values_num++;
1450
1451     values_num++;
1452   }
1453
1454   if (((now - ci->last_flush_time) >= config_write_interval)
1455       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1456       && (ci->values_num > 0))
1457   {
1458     enqueue_cache_item (ci, TAIL);
1459   }
1460
1461   pthread_mutex_unlock (&cache_lock);
1462
1463   if (values_num < 1)
1464   {
1465     /* if we had only one update attempt, then return the full
1466        error message... try to get the most information out
1467        of the limited error space allowed by the protocol
1468     */
1469     if (bad_timestamps == 1)
1470       return send_response(sock, RESP_ERR, "%s", sock->wbuf);
1471     else
1472       return send_response(sock, RESP_ERR,
1473                            "No values updated (%d bad timestamps).\n",
1474                            bad_timestamps);
1475   }
1476   else
1477     return send_response(sock, RESP_OK,
1478                          "errors, enqueued %i value(s).\n", values_num);
1479
1480   /* NOTREACHED */
1481   assert(1==0);
1482
1483 } /* }}} int handle_request_update */
1484
1485 /* we came across a "WROTE" entry during journal replay.
1486  * throw away any values that we have accumulated for this file
1487  */
1488 static int handle_request_wrote (const char *buffer, time_t now) /* {{{ */
1489 {
1490   int i;
1491   cache_item_t *ci;
1492   const char *file = buffer;
1493
1494   pthread_mutex_lock(&cache_lock);
1495
1496   ci = g_tree_lookup(cache_tree, file);
1497   if (ci == NULL)
1498   {
1499     pthread_mutex_unlock(&cache_lock);
1500     return (0);
1501   }
1502
1503   if (ci->values)
1504   {
1505     for (i=0; i < ci->values_num; i++)
1506       free(ci->values[i]);
1507
1508     free(ci->values);
1509   }
1510
1511   wipe_ci_values(ci, now);
1512   remove_from_queue(ci);
1513
1514   pthread_mutex_unlock(&cache_lock);
1515   return (0);
1516 } /* }}} int handle_request_wrote */
1517
1518 /* start "BATCH" processing */
1519 static int batch_start (listen_socket_t *sock) /* {{{ */
1520 {
1521   int status;
1522   if (sock->batch_start)
1523     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1524
1525   status = send_response(sock, RESP_OK,
1526                          "Go ahead.  End with dot '.' on its own line.\n");
1527   sock->batch_start = time(NULL);
1528   sock->batch_cmd = 0;
1529
1530   return status;
1531 } /* }}} static int batch_start */
1532
1533 /* finish "BATCH" processing and return results to the client */
1534 static int batch_done (listen_socket_t *sock) /* {{{ */
1535 {
1536   assert(sock->batch_start);
1537   sock->batch_start = 0;
1538   sock->batch_cmd  = 0;
1539   return send_response(sock, RESP_OK, "errors\n");
1540 } /* }}} static int batch_done */
1541
1542 /* if sock==NULL, we are in journal replay mode */
1543 static int handle_request (listen_socket_t *sock, /* {{{ */
1544                            time_t now,
1545                            char *buffer, size_t buffer_size)
1546 {
1547   char *buffer_ptr;
1548   char *command;
1549   int status;
1550
1551   assert (buffer[buffer_size - 1] == '\0');
1552
1553   buffer_ptr = buffer;
1554   command = NULL;
1555   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1556   if (status != 0)
1557   {
1558     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1559     return (-1);
1560   }
1561
1562   if (sock != NULL && sock->batch_start)
1563     sock->batch_cmd++;
1564
1565   if (strcasecmp (command, "update") == 0)
1566     return (handle_request_update (sock, now, buffer_ptr, buffer_size));
1567   else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
1568   {
1569     /* this is only valid in replay mode */
1570     return (handle_request_wrote (buffer_ptr, now));
1571   }
1572   else if (strcasecmp (command, "flush") == 0)
1573     return (handle_request_flush (sock, buffer_ptr, buffer_size));
1574   else if (strcasecmp (command, "flushall") == 0)
1575     return (handle_request_flushall(sock));
1576   else if (strcasecmp (command, "pending") == 0)
1577     return (handle_request_pending(sock, buffer_ptr, buffer_size));
1578   else if (strcasecmp (command, "forget") == 0)
1579     return (handle_request_forget(sock, buffer_ptr, buffer_size));
1580   else if (strcasecmp (command, "stats") == 0)
1581     return (handle_request_stats (sock));
1582   else if (strcasecmp (command, "help") == 0)
1583     return (handle_request_help (sock, buffer_ptr, buffer_size));
1584   else if (strcasecmp (command, "batch") == 0 && sock != NULL)
1585     return batch_start(sock);
1586   else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_start)
1587     return batch_done(sock);
1588   else
1589     return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
1590
1591   /* NOTREACHED */
1592   assert(1==0);
1593 } /* }}} int handle_request */
1594
1595 /* MUST NOT hold journal_lock before calling this */
1596 static void journal_rotate(void) /* {{{ */
1597 {
1598   FILE *old_fh = NULL;
1599   int new_fd;
1600
1601   if (journal_cur == NULL || journal_old == NULL)
1602     return;
1603
1604   pthread_mutex_lock(&journal_lock);
1605
1606   /* we rotate this way (rename before close) so that the we can release
1607    * the journal lock as fast as possible.  Journal writes to the new
1608    * journal can proceed immediately after the new file is opened.  The
1609    * fclose can then block without affecting new updates.
1610    */
1611   if (journal_fh != NULL)
1612   {
1613     old_fh = journal_fh;
1614     journal_fh = NULL;
1615     rename(journal_cur, journal_old);
1616     ++stats_journal_rotate;
1617   }
1618
1619   new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1620                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1621   if (new_fd >= 0)
1622   {
1623     journal_fh = fdopen(new_fd, "a");
1624     if (journal_fh == NULL)
1625       close(new_fd);
1626   }
1627
1628   pthread_mutex_unlock(&journal_lock);
1629
1630   if (old_fh != NULL)
1631     fclose(old_fh);
1632
1633   if (journal_fh == NULL)
1634   {
1635     RRDD_LOG(LOG_CRIT,
1636              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1637              journal_cur, rrd_strerror(errno));
1638
1639     RRDD_LOG(LOG_ERR,
1640              "JOURNALING DISABLED: All values will be flushed at shutdown");
1641     config_flush_at_shutdown = 1;
1642   }
1643
1644 } /* }}} static void journal_rotate */
1645
1646 static void journal_done(void) /* {{{ */
1647 {
1648   if (journal_cur == NULL)
1649     return;
1650
1651   pthread_mutex_lock(&journal_lock);
1652   if (journal_fh != NULL)
1653   {
1654     fclose(journal_fh);
1655     journal_fh = NULL;
1656   }
1657
1658   if (config_flush_at_shutdown)
1659   {
1660     RRDD_LOG(LOG_INFO, "removing journals");
1661     unlink(journal_old);
1662     unlink(journal_cur);
1663   }
1664   else
1665   {
1666     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1667              "journals will be used at next startup");
1668   }
1669
1670   pthread_mutex_unlock(&journal_lock);
1671
1672 } /* }}} static void journal_done */
1673
1674 static int journal_write(char *cmd, char *args) /* {{{ */
1675 {
1676   int chars;
1677
1678   if (journal_fh == NULL)
1679     return 0;
1680
1681   pthread_mutex_lock(&journal_lock);
1682   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1683   pthread_mutex_unlock(&journal_lock);
1684
1685   if (chars > 0)
1686   {
1687     pthread_mutex_lock(&stats_lock);
1688     stats_journal_bytes += chars;
1689     pthread_mutex_unlock(&stats_lock);
1690   }
1691
1692   return chars;
1693 } /* }}} static int journal_write */
1694
1695 static int journal_replay (const char *file) /* {{{ */
1696 {
1697   FILE *fh;
1698   int entry_cnt = 0;
1699   int fail_cnt = 0;
1700   uint64_t line = 0;
1701   char entry[CMD_MAX];
1702   time_t now;
1703
1704   if (file == NULL) return 0;
1705
1706   {
1707     char *reason;
1708     int status = 0;
1709     struct stat statbuf;
1710
1711     memset(&statbuf, 0, sizeof(statbuf));
1712     if (stat(file, &statbuf) != 0)
1713     {
1714       if (errno == ENOENT)
1715         return 0;
1716
1717       reason = "stat error";
1718       status = errno;
1719     }
1720     else if (!S_ISREG(statbuf.st_mode))
1721     {
1722       reason = "not a regular file";
1723       status = EPERM;
1724     }
1725     if (statbuf.st_uid != daemon_uid)
1726     {
1727       reason = "not owned by daemon user";
1728       status = EACCES;
1729     }
1730     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1731     {
1732       reason = "must not be user/group writable";
1733       status = EACCES;
1734     }
1735
1736     if (status != 0)
1737     {
1738       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1739                file, rrd_strerror(status), reason);
1740       return 0;
1741     }
1742   }
1743
1744   fh = fopen(file, "r");
1745   if (fh == NULL)
1746   {
1747     if (errno != ENOENT)
1748       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1749                file, rrd_strerror(errno));
1750     return 0;
1751   }
1752   else
1753     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1754
1755   now = time(NULL);
1756
1757   while(!feof(fh))
1758   {
1759     size_t entry_len;
1760
1761     ++line;
1762     if (fgets(entry, sizeof(entry), fh) == NULL)
1763       break;
1764     entry_len = strlen(entry);
1765
1766     /* check \n termination in case journal writing crashed mid-line */
1767     if (entry_len == 0)
1768       continue;
1769     else if (entry[entry_len - 1] != '\n')
1770     {
1771       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1772       ++fail_cnt;
1773       continue;
1774     }
1775
1776     entry[entry_len - 1] = '\0';
1777
1778     if (handle_request(NULL, now, entry, entry_len) == 0)
1779       ++entry_cnt;
1780     else
1781       ++fail_cnt;
1782   }
1783
1784   fclose(fh);
1785
1786   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1787            entry_cnt, fail_cnt);
1788
1789   return entry_cnt > 0 ? 1 : 0;
1790 } /* }}} static int journal_replay */
1791
1792 static void journal_init(void) /* {{{ */
1793 {
1794   int had_journal = 0;
1795
1796   if (journal_cur == NULL) return;
1797
1798   pthread_mutex_lock(&journal_lock);
1799
1800   RRDD_LOG(LOG_INFO, "checking for journal files");
1801
1802   had_journal += journal_replay(journal_old);
1803   had_journal += journal_replay(journal_cur);
1804
1805   /* it must have been a crash.  start a flush */
1806   if (had_journal && config_flush_at_shutdown)
1807     flush_old_values(-1);
1808
1809   pthread_mutex_unlock(&journal_lock);
1810   journal_rotate();
1811
1812   RRDD_LOG(LOG_INFO, "journal processing complete");
1813
1814 } /* }}} static void journal_init */
1815
1816 static void close_connection(listen_socket_t *sock)
1817 {
1818   close(sock->fd) ;  sock->fd   = -1;
1819   free(sock->rbuf);  sock->rbuf = NULL;
1820   free(sock->wbuf);  sock->wbuf = NULL;
1821
1822   free(sock);
1823 }
1824
1825 static void *connection_thread_main (void *args) /* {{{ */
1826 {
1827   pthread_t self;
1828   listen_socket_t *sock;
1829   int i;
1830   int fd;
1831
1832   sock = (listen_socket_t *) args;
1833   fd = sock->fd;
1834
1835   /* init read buffers */
1836   sock->next_read = sock->next_cmd = 0;
1837   sock->rbuf = malloc(RBUF_SIZE);
1838   if (sock->rbuf == NULL)
1839   {
1840     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1841     close_connection(sock);
1842     return NULL;
1843   }
1844
1845   pthread_mutex_lock (&connection_threads_lock);
1846   {
1847     pthread_t *temp;
1848
1849     temp = (pthread_t *) realloc (connection_threads,
1850         sizeof (pthread_t) * (connection_threads_num + 1));
1851     if (temp == NULL)
1852     {
1853       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1854     }
1855     else
1856     {
1857       connection_threads = temp;
1858       connection_threads[connection_threads_num] = pthread_self ();
1859       connection_threads_num++;
1860     }
1861   }
1862   pthread_mutex_unlock (&connection_threads_lock);
1863
1864   while (do_shutdown == 0)
1865   {
1866     char *cmd;
1867     ssize_t cmd_len;
1868     ssize_t rbytes;
1869     time_t now;
1870
1871     struct pollfd pollfd;
1872     int status;
1873
1874     pollfd.fd = fd;
1875     pollfd.events = POLLIN | POLLPRI;
1876     pollfd.revents = 0;
1877
1878     status = poll (&pollfd, 1, /* timeout = */ 500);
1879     if (do_shutdown)
1880       break;
1881     else if (status == 0) /* timeout */
1882       continue;
1883     else if (status < 0) /* error */
1884     {
1885       status = errno;
1886       if (status != EINTR)
1887         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1888       continue;
1889     }
1890
1891     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1892       break;
1893     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1894     {
1895       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1896           "poll(2) returned something unexpected: %#04hx",
1897           pollfd.revents);
1898       break;
1899     }
1900
1901     rbytes = read(fd, sock->rbuf + sock->next_read,
1902                   RBUF_SIZE - sock->next_read);
1903     if (rbytes < 0)
1904     {
1905       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
1906       break;
1907     }
1908     else if (rbytes == 0)
1909       break; /* eof */
1910
1911     sock->next_read += rbytes;
1912
1913     if (sock->batch_start)
1914       now = sock->batch_start;
1915     else
1916       now = time(NULL);
1917
1918     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
1919     {
1920       status = handle_request (sock, now, cmd, cmd_len+1);
1921       if (status != 0)
1922         goto out_close;
1923     }
1924   }
1925
1926 out_close:
1927   close_connection(sock);
1928
1929   self = pthread_self ();
1930   /* Remove this thread from the connection threads list */
1931   pthread_mutex_lock (&connection_threads_lock);
1932   /* Find out own index in the array */
1933   for (i = 0; i < connection_threads_num; i++)
1934     if (pthread_equal (connection_threads[i], self) != 0)
1935       break;
1936   assert (i < connection_threads_num);
1937
1938   /* Move the trailing threads forward. */
1939   if (i < (connection_threads_num - 1))
1940   {
1941     memmove (connection_threads + i,
1942         connection_threads + i + 1,
1943         sizeof (pthread_t) * (connection_threads_num - i - 1));
1944   }
1945
1946   connection_threads_num--;
1947   pthread_mutex_unlock (&connection_threads_lock);
1948
1949   return (NULL);
1950 } /* }}} void *connection_thread_main */
1951
1952 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
1953 {
1954   int fd;
1955   struct sockaddr_un sa;
1956   listen_socket_t *temp;
1957   int status;
1958   const char *path;
1959
1960   path = sock->addr;
1961   if (strncmp(path, "unix:", strlen("unix:")) == 0)
1962     path += strlen("unix:");
1963
1964   temp = (listen_socket_t *) realloc (listen_fds,
1965       sizeof (listen_fds[0]) * (listen_fds_num + 1));
1966   if (temp == NULL)
1967   {
1968     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1969     return (-1);
1970   }
1971   listen_fds = temp;
1972   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
1973
1974   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
1975   if (fd < 0)
1976   {
1977     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
1978     return (-1);
1979   }
1980
1981   memset (&sa, 0, sizeof (sa));
1982   sa.sun_family = AF_UNIX;
1983   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
1984
1985   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
1986   if (status != 0)
1987   {
1988     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1989     close (fd);
1990     unlink (path);
1991     return (-1);
1992   }
1993
1994   status = listen (fd, /* backlog = */ 10);
1995   if (status != 0)
1996   {
1997     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
1998     close (fd);
1999     unlink (path);
2000     return (-1);
2001   }
2002
2003   listen_fds[listen_fds_num].fd = fd;
2004   listen_fds[listen_fds_num].family = PF_UNIX;
2005   strncpy(listen_fds[listen_fds_num].addr, path,
2006           sizeof (listen_fds[listen_fds_num].addr) - 1);
2007   listen_fds_num++;
2008
2009   return (0);
2010 } /* }}} int open_listen_socket_unix */
2011
2012 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2013 {
2014   struct addrinfo ai_hints;
2015   struct addrinfo *ai_res;
2016   struct addrinfo *ai_ptr;
2017   char addr_copy[NI_MAXHOST];
2018   char *addr;
2019   char *port;
2020   int status;
2021
2022   strncpy (addr_copy, sock->addr, sizeof (addr_copy));
2023   addr_copy[sizeof (addr_copy) - 1] = 0;
2024   addr = addr_copy;
2025
2026   memset (&ai_hints, 0, sizeof (ai_hints));
2027   ai_hints.ai_flags = 0;
2028 #ifdef AI_ADDRCONFIG
2029   ai_hints.ai_flags |= AI_ADDRCONFIG;
2030 #endif
2031   ai_hints.ai_family = AF_UNSPEC;
2032   ai_hints.ai_socktype = SOCK_STREAM;
2033
2034   port = NULL;
2035   if (*addr == '[') /* IPv6+port format */
2036   {
2037     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2038     addr++;
2039
2040     port = strchr (addr, ']');
2041     if (port == NULL)
2042     {
2043       RRDD_LOG (LOG_ERR, "open_listen_socket_network: Malformed address: %s",
2044           sock->addr);
2045       return (-1);
2046     }
2047     *port = 0;
2048     port++;
2049
2050     if (*port == ':')
2051       port++;
2052     else if (*port == 0)
2053       port = NULL;
2054     else
2055     {
2056       RRDD_LOG (LOG_ERR, "open_listen_socket_network: Garbage after address: %s",
2057           port);
2058       return (-1);
2059     }
2060   } /* if (*addr = ']') */
2061   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2062   {
2063     port = rindex(addr, ':');
2064     if (port != NULL)
2065     {
2066       *port = 0;
2067       port++;
2068     }
2069   }
2070   ai_res = NULL;
2071   status = getaddrinfo (addr,
2072                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2073                         &ai_hints, &ai_res);
2074   if (status != 0)
2075   {
2076     RRDD_LOG (LOG_ERR, "open_listen_socket_network: getaddrinfo(%s) failed: "
2077         "%s", addr, gai_strerror (status));
2078     return (-1);
2079   }
2080
2081   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2082   {
2083     int fd;
2084     listen_socket_t *temp;
2085     int one = 1;
2086
2087     temp = (listen_socket_t *) realloc (listen_fds,
2088         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2089     if (temp == NULL)
2090     {
2091       RRDD_LOG (LOG_ERR, "open_listen_socket_network: realloc failed.");
2092       continue;
2093     }
2094     listen_fds = temp;
2095     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2096
2097     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2098     if (fd < 0)
2099     {
2100       RRDD_LOG (LOG_ERR, "open_listen_socket_network: socket(2) failed.");
2101       continue;
2102     }
2103
2104     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2105
2106     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2107     if (status != 0)
2108     {
2109       RRDD_LOG (LOG_ERR, "open_listen_socket_network: bind(2) failed.");
2110       close (fd);
2111       continue;
2112     }
2113
2114     status = listen (fd, /* backlog = */ 10);
2115     if (status != 0)
2116     {
2117       RRDD_LOG (LOG_ERR, "open_listen_socket_network: listen(2) failed.");
2118       close (fd);
2119       return (-1);
2120     }
2121
2122     listen_fds[listen_fds_num].fd = fd;
2123     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2124     listen_fds_num++;
2125   } /* for (ai_ptr) */
2126
2127   return (0);
2128 } /* }}} static int open_listen_socket_network */
2129
2130 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2131 {
2132   assert(sock != NULL);
2133   assert(sock->addr != NULL);
2134
2135   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2136       || sock->addr[0] == '/')
2137     return (open_listen_socket_unix(sock));
2138   else
2139     return (open_listen_socket_network(sock));
2140 } /* }}} int open_listen_socket */
2141
2142 static int close_listen_sockets (void) /* {{{ */
2143 {
2144   size_t i;
2145
2146   for (i = 0; i < listen_fds_num; i++)
2147   {
2148     close (listen_fds[i].fd);
2149
2150     if (listen_fds[i].family == PF_UNIX)
2151       unlink(listen_fds[i].addr);
2152   }
2153
2154   free (listen_fds);
2155   listen_fds = NULL;
2156   listen_fds_num = 0;
2157
2158   return (0);
2159 } /* }}} int close_listen_sockets */
2160
2161 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2162 {
2163   struct pollfd *pollfds;
2164   int pollfds_num;
2165   int status;
2166   int i;
2167
2168   for (i = 0; i < config_listen_address_list_len; i++)
2169     open_listen_socket (config_listen_address_list[i]);
2170
2171   if (config_listen_address_list_len < 1)
2172   {
2173     listen_socket_t sock;
2174     memset(&sock, 0, sizeof(sock));
2175     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
2176     open_listen_socket (&sock);
2177   }
2178
2179   if (listen_fds_num < 1)
2180   {
2181     RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
2182         "could be opened. Sorry.");
2183     return (NULL);
2184   }
2185
2186   pollfds_num = listen_fds_num;
2187   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2188   if (pollfds == NULL)
2189   {
2190     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2191     return (NULL);
2192   }
2193   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2194
2195   RRDD_LOG(LOG_INFO, "listening for connections");
2196
2197   while (do_shutdown == 0)
2198   {
2199     assert (pollfds_num == ((int) listen_fds_num));
2200     for (i = 0; i < pollfds_num; i++)
2201     {
2202       pollfds[i].fd = listen_fds[i].fd;
2203       pollfds[i].events = POLLIN | POLLPRI;
2204       pollfds[i].revents = 0;
2205     }
2206
2207     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2208     if (do_shutdown)
2209       break;
2210     else if (status == 0) /* timeout */
2211       continue;
2212     else if (status < 0) /* error */
2213     {
2214       status = errno;
2215       if (status != EINTR)
2216       {
2217         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2218       }
2219       continue;
2220     }
2221
2222     for (i = 0; i < pollfds_num; i++)
2223     {
2224       listen_socket_t *client_sock;
2225       struct sockaddr_storage client_sa;
2226       socklen_t client_sa_size;
2227       pthread_t tid;
2228       pthread_attr_t attr;
2229
2230       if (pollfds[i].revents == 0)
2231         continue;
2232
2233       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2234       {
2235         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2236             "poll(2) returned something unexpected for listen FD #%i.",
2237             pollfds[i].fd);
2238         continue;
2239       }
2240
2241       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2242       if (client_sock == NULL)
2243       {
2244         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2245         continue;
2246       }
2247       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2248
2249       client_sa_size = sizeof (client_sa);
2250       client_sock->fd = accept (pollfds[i].fd,
2251           (struct sockaddr *) &client_sa, &client_sa_size);
2252       if (client_sock->fd < 0)
2253       {
2254         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2255         free(client_sock);
2256         continue;
2257       }
2258
2259       pthread_attr_init (&attr);
2260       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2261
2262       status = pthread_create (&tid, &attr, connection_thread_main,
2263                                client_sock);
2264       if (status != 0)
2265       {
2266         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2267         close_connection(client_sock);
2268         continue;
2269       }
2270     } /* for (pollfds_num) */
2271   } /* while (do_shutdown == 0) */
2272
2273   RRDD_LOG(LOG_INFO, "starting shutdown");
2274
2275   close_listen_sockets ();
2276
2277   pthread_mutex_lock (&connection_threads_lock);
2278   while (connection_threads_num > 0)
2279   {
2280     pthread_t wait_for;
2281
2282     wait_for = connection_threads[0];
2283
2284     pthread_mutex_unlock (&connection_threads_lock);
2285     pthread_join (wait_for, /* retval = */ NULL);
2286     pthread_mutex_lock (&connection_threads_lock);
2287   }
2288   pthread_mutex_unlock (&connection_threads_lock);
2289
2290   return (NULL);
2291 } /* }}} void *listen_thread_main */
2292
2293 static int daemonize (void) /* {{{ */
2294 {
2295   int status;
2296   int fd;
2297   char *base_dir;
2298
2299   daemon_uid = geteuid();
2300
2301   fd = open_pidfile();
2302   if (fd < 0) return fd;
2303
2304   if (!stay_foreground)
2305   {
2306     pid_t child;
2307
2308     child = fork ();
2309     if (child < 0)
2310     {
2311       fprintf (stderr, "daemonize: fork(2) failed.\n");
2312       return (-1);
2313     }
2314     else if (child > 0)
2315     {
2316       return (1);
2317     }
2318
2319     /* Become session leader */
2320     setsid ();
2321
2322     /* Open the first three file descriptors to /dev/null */
2323     close (2);
2324     close (1);
2325     close (0);
2326
2327     open ("/dev/null", O_RDWR);
2328     dup (0);
2329     dup (0);
2330   } /* if (!stay_foreground) */
2331
2332   /* Change into the /tmp directory. */
2333   base_dir = (config_base_dir != NULL)
2334     ? config_base_dir
2335     : "/tmp";
2336   status = chdir (base_dir);
2337   if (status != 0)
2338   {
2339     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2340     return (-1);
2341   }
2342
2343   install_signal_handlers();
2344
2345   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2346   RRDD_LOG(LOG_INFO, "starting up");
2347
2348   cache_tree = g_tree_new ((GCompareFunc) strcmp);
2349   if (cache_tree == NULL)
2350   {
2351     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2352     return (-1);
2353   }
2354
2355   status = write_pidfile (fd);
2356   return status;
2357 } /* }}} int daemonize */
2358
2359 static int cleanup (void) /* {{{ */
2360 {
2361   do_shutdown++;
2362
2363   pthread_cond_signal (&cache_cond);
2364   pthread_join (queue_thread, /* return = */ NULL);
2365
2366   remove_pidfile ();
2367
2368   RRDD_LOG(LOG_INFO, "goodbye");
2369   closelog ();
2370
2371   return (0);
2372 } /* }}} int cleanup */
2373
2374 static int read_options (int argc, char **argv) /* {{{ */
2375 {
2376   int option;
2377   int status = 0;
2378
2379   while ((option = getopt(argc, argv, "gl:L:f:w:b:Bz:p:j:h?F")) != -1)
2380   {
2381     switch (option)
2382     {
2383       case 'g':
2384         stay_foreground=1;
2385         break;
2386
2387       case 'L':
2388       case 'l':
2389       {
2390         listen_socket_t **temp;
2391         listen_socket_t *new;
2392
2393         new = malloc(sizeof(listen_socket_t));
2394         if (new == NULL)
2395         {
2396           fprintf(stderr, "read_options: malloc failed.\n");
2397           return(2);
2398         }
2399         memset(new, 0, sizeof(listen_socket_t));
2400
2401         temp = (listen_socket_t **) realloc (config_listen_address_list,
2402             sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2403         if (temp == NULL)
2404         {
2405           fprintf (stderr, "read_options: realloc failed.\n");
2406           return (2);
2407         }
2408         config_listen_address_list = temp;
2409
2410         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2411         new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2412
2413         temp[config_listen_address_list_len] = new;
2414         config_listen_address_list_len++;
2415       }
2416       break;
2417
2418       case 'f':
2419       {
2420         int temp;
2421
2422         temp = atoi (optarg);
2423         if (temp > 0)
2424           config_flush_interval = temp;
2425         else
2426         {
2427           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2428           status = 3;
2429         }
2430       }
2431       break;
2432
2433       case 'w':
2434       {
2435         int temp;
2436
2437         temp = atoi (optarg);
2438         if (temp > 0)
2439           config_write_interval = temp;
2440         else
2441         {
2442           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2443           status = 2;
2444         }
2445       }
2446       break;
2447
2448       case 'z':
2449       {
2450         int temp;
2451
2452         temp = atoi(optarg);
2453         if (temp > 0)
2454           config_write_jitter = temp;
2455         else
2456         {
2457           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2458           status = 2;
2459         }
2460
2461         break;
2462       }
2463
2464       case 'B':
2465         config_write_base_only = 1;
2466         break;
2467
2468       case 'b':
2469       {
2470         size_t len;
2471
2472         if (config_base_dir != NULL)
2473           free (config_base_dir);
2474         config_base_dir = strdup (optarg);
2475         if (config_base_dir == NULL)
2476         {
2477           fprintf (stderr, "read_options: strdup failed.\n");
2478           return (3);
2479         }
2480
2481         len = strlen (config_base_dir);
2482         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2483         {
2484           config_base_dir[len - 1] = 0;
2485           len--;
2486         }
2487
2488         if (len < 1)
2489         {
2490           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2491           return (4);
2492         }
2493
2494         _config_base_dir_len = len;
2495       }
2496       break;
2497
2498       case 'p':
2499       {
2500         if (config_pid_file != NULL)
2501           free (config_pid_file);
2502         config_pid_file = strdup (optarg);
2503         if (config_pid_file == NULL)
2504         {
2505           fprintf (stderr, "read_options: strdup failed.\n");
2506           return (3);
2507         }
2508       }
2509       break;
2510
2511       case 'F':
2512         config_flush_at_shutdown = 1;
2513         break;
2514
2515       case 'j':
2516       {
2517         struct stat statbuf;
2518         const char *dir = optarg;
2519
2520         status = stat(dir, &statbuf);
2521         if (status != 0)
2522         {
2523           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2524           return 6;
2525         }
2526
2527         if (!S_ISDIR(statbuf.st_mode)
2528             || access(dir, R_OK|W_OK|X_OK) != 0)
2529         {
2530           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2531                   errno ? rrd_strerror(errno) : "");
2532           return 6;
2533         }
2534
2535         journal_cur = malloc(PATH_MAX + 1);
2536         journal_old = malloc(PATH_MAX + 1);
2537         if (journal_cur == NULL || journal_old == NULL)
2538         {
2539           fprintf(stderr, "malloc failure for journal files\n");
2540           return 6;
2541         }
2542         else 
2543         {
2544           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2545           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2546         }
2547       }
2548       break;
2549
2550       case 'h':
2551       case '?':
2552         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2553             "\n"
2554             "Usage: rrdcached [options]\n"
2555             "\n"
2556             "Valid options are:\n"
2557             "  -l <address>  Socket address to listen to.\n"
2558             "  -L <address>  Socket address to listen to ('FLUSH' only).\n"
2559             "  -w <seconds>  Interval in which to write data.\n"
2560             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2561             "  -f <seconds>  Interval in which to flush dead data.\n"
2562             "  -p <file>     Location of the PID-file.\n"
2563             "  -b <dir>      Base directory to change to.\n"
2564             "  -B            Restrict file access to paths within -b <dir>\n"
2565             "  -g            Do not fork and run in the foreground.\n"
2566             "  -j <dir>      Directory in which to create the journal files.\n"
2567             "  -F            Always flush all updates at shutdown\n"
2568             "\n"
2569             "For more information and a detailed description of all options "
2570             "please refer\n"
2571             "to the rrdcached(1) manual page.\n",
2572             VERSION);
2573         status = -1;
2574         break;
2575     } /* switch (option) */
2576   } /* while (getopt) */
2577
2578   /* advise the user when values are not sane */
2579   if (config_flush_interval < 2 * config_write_interval)
2580     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2581             " 2x write interval (-w) !\n");
2582   if (config_write_jitter > config_write_interval)
2583     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2584             " write interval (-w) !\n");
2585
2586   if (config_write_base_only && config_base_dir == NULL)
2587     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2588             "  Consult the rrdcached documentation\n");
2589
2590   if (journal_cur == NULL)
2591     config_flush_at_shutdown = 1;
2592
2593   return (status);
2594 } /* }}} int read_options */
2595
2596 int main (int argc, char **argv)
2597 {
2598   int status;
2599
2600   status = read_options (argc, argv);
2601   if (status != 0)
2602   {
2603     if (status < 0)
2604       status = 0;
2605     return (status);
2606   }
2607
2608   status = daemonize ();
2609   if (status == 1)
2610   {
2611     struct sigaction sigchld;
2612
2613     memset (&sigchld, 0, sizeof (sigchld));
2614     sigchld.sa_handler = SIG_IGN;
2615     sigaction (SIGCHLD, &sigchld, NULL);
2616
2617     return (0);
2618   }
2619   else if (status != 0)
2620   {
2621     fprintf (stderr, "daemonize failed, exiting.\n");
2622     return (1);
2623   }
2624
2625   journal_init();
2626
2627   /* start the queue thread */
2628   memset (&queue_thread, 0, sizeof (queue_thread));
2629   status = pthread_create (&queue_thread,
2630                            NULL, /* attr */
2631                            queue_thread_main,
2632                            NULL); /* args */
2633   if (status != 0)
2634   {
2635     RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2636     cleanup();
2637     return (1);
2638   }
2639
2640   listen_thread_main (NULL);
2641   cleanup ();
2642
2643   return (0);
2644 } /* int main */
2645
2646 /*
2647  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2648  */